Skip to content

Commit 5b39f4c

Browse files
authored
Optimize ReadN in CDC by collecting records in batches (#284)
1 parent 9063503 commit 5b39f4c

File tree

11 files changed

+470
-118
lines changed

11 files changed

+470
-118
lines changed

source.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,14 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error {
9999
fallthrough
100100
case source.CDCModeLogrepl:
101101
i, err := logrepl.NewCombinedIterator(ctx, s.pool, logrepl.Config{
102-
Position: pos,
103-
SlotName: s.config.LogreplSlotName,
104-
PublicationName: s.config.LogreplPublicationName,
105-
Tables: s.config.Tables,
106-
TableKeys: s.tableKeys,
107-
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
108-
WithAvroSchema: s.config.WithAvroSchema,
109-
SnapshotFetchSize: s.config.SnapshotFetchSize,
102+
Position: pos,
103+
SlotName: s.config.LogreplSlotName,
104+
PublicationName: s.config.LogreplPublicationName,
105+
Tables: s.config.Tables,
106+
TableKeys: s.tableKeys,
107+
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
108+
WithAvroSchema: s.config.WithAvroSchema,
109+
BatchSize: *s.config.BatchSize,
110110
})
111111
if err != nil {
112112
return fmt.Errorf("failed to create logical replication iterator: %w", err)

source/logrepl/cdc.go

Lines changed: 125 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
"time"
2122

2223
"github.com/conduitio/conduit-commons/opencdc"
2324
"github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
@@ -35,15 +36,28 @@ type CDCConfig struct {
3536
Tables []string
3637
TableKeys map[string]string
3738
WithAvroSchema bool
39+
// BatchSize is the maximum size of a batch that will be read from the DB
40+
// in one go and processed by the CDCHandler.
41+
BatchSize int
3842
}
3943

4044
// CDCIterator asynchronously listens for events from the logical replication
4145
// slot and returns them to the caller through NextN.
4246
type CDCIterator struct {
43-
config CDCConfig
44-
records chan opencdc.Record
47+
config CDCConfig
48+
sub *internal.Subscription
4549

46-
sub *internal.Subscription
50+
// batchesCh is a channel shared between this iterator and a CDCHandler,
51+
// to which the CDCHandler is sending batches of records.
52+
// Using a shared queue here would be the fastest option. However,
53+
// we also need to watch for a context that can get cancelled,
54+
// and for the subscription that can end, so using a channel is
55+
// the best option at the moment.
56+
batchesCh chan []opencdc.Record
57+
58+
// recordsForNextRead contains records from the previous batch (returned by the CDCHandler),
59+
// that weren't return by this iterator's ReadN method.
60+
recordsForNextRead []opencdc.Record
4761
}
4862

4963
// NewCDCIterator initializes logical replication by creating the publication and subscription manager.
@@ -64,8 +78,22 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
6478
Msgf("Publication %q already exists.", c.PublicationName)
6579
}
6680

67-
records := make(chan opencdc.Record)
68-
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records, c.WithAvroSchema)
81+
// Using a buffered channel here so that the handler can send a batch
82+
// to the channel and start building a new batch.
83+
// This is useful when the first batch in the channel didn't reach BatchSize (which is sdk.batch.size).
84+
// The handler can prepare the next batch, and the CDCIterator can use them
85+
// to return the maximum number of records.
86+
batchesCh := make(chan []opencdc.Record, 1)
87+
handler := NewCDCHandler(
88+
ctx,
89+
internal.NewRelationSet(),
90+
c.TableKeys,
91+
batchesCh,
92+
c.WithAvroSchema,
93+
c.BatchSize,
94+
// todo make configurable
95+
time.Second,
96+
)
6997

7098
sub, err := internal.CreateSubscription(
7199
ctx,
@@ -81,9 +109,9 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
81109
}
82110

83111
return &CDCIterator{
84-
config: c,
85-
records: records,
86-
sub: sub,
112+
config: c,
113+
batchesCh: batchesCh,
114+
sub: sub,
87115
}, nil
88116
}
89117

@@ -113,8 +141,9 @@ func (i *CDCIterator) StartSubscriber(ctx context.Context) error {
113141
return nil
114142
}
115143

116-
// NextN takes and returns up to n records from the queue. NextN is allowed to
117-
// block until either at least one record is available or the context gets canceled.
144+
// NextN returns up to n records from the internal channel with records.
145+
// NextN is allowed to block until either at least one record is available
146+
// or the context gets canceled.
118147
func (i *CDCIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error) {
119148
if !i.subscriberReady() {
120149
return nil, errors.New("logical replication has not been started")
@@ -124,9 +153,45 @@ func (i *CDCIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error
124153
return nil, fmt.Errorf("n must be greater than 0, got %d", n)
125154
}
126155

127-
var recs []opencdc.Record
156+
// First, we check if there are any records from the previous batch
157+
// that we can start with.
158+
recs := make([]opencdc.Record, len(i.recordsForNextRead), n)
159+
copy(recs, i.recordsForNextRead)
160+
i.recordsForNextRead = nil
161+
162+
// NextN needs to wait until at least 1 record is available.
163+
if len(recs) == 0 {
164+
batch, err := i.nextRecordsBatchBlocking(ctx)
165+
if err != nil {
166+
return nil, fmt.Errorf("failed to fetch next batch of records (blocking): %w", err)
167+
}
168+
recs = batch
169+
}
170+
171+
// We add any already available batches (i.e., we're not blocking waiting for any new batches to arrive)
172+
// to return at most n records.
173+
for len(recs) < n {
174+
batch, err := i.nextRecordsBatch(ctx)
175+
if err != nil {
176+
return nil, fmt.Errorf("failed to fetch next batch of records: %w", err)
177+
}
178+
if batch == nil {
179+
break
180+
}
181+
recs = i.appendRecordsWithLimit(recs, batch, n)
182+
}
183+
184+
sdk.Logger(ctx).Trace().
185+
Int("records", len(recs)).
186+
Int("records_for_next_read", len(i.recordsForNextRead)).
187+
Msg("CDCIterator.NextN returning records")
188+
return recs, nil
189+
}
128190

129-
// Block until at least one record is received or context is canceled
191+
// nextRecordsBatchBlocking waits for the next batch of records to arrive,
192+
// or for the context to be done, or for the subscription to be done,
193+
// whichever comes first.
194+
func (i *CDCIterator) nextRecordsBatchBlocking(ctx context.Context) ([]opencdc.Record, error) {
130195
select {
131196
case <-ctx.Done():
132197
return nil, ctx.Err()
@@ -142,33 +207,59 @@ func (i *CDCIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error
142207
// subscription stopped without an error and the context is still
143208
// open, this is a strange case, shouldn't actually happen
144209
return nil, fmt.Errorf("subscription stopped, no more data to fetch (this smells like a bug)")
145-
case rec := <-i.records:
146-
recs = append(recs, rec)
210+
case batch := <-i.batchesCh:
211+
sdk.Logger(ctx).Trace().
212+
Int("records", len(batch)).
213+
Msg("CDCIterator.NextN received batch of records (blocking)")
214+
return batch, nil
147215
}
216+
}
148217

149-
for len(recs) < n {
150-
select {
151-
case rec := <-i.records:
152-
recs = append(recs, rec)
153-
case <-ctx.Done():
154-
return nil, ctx.Err()
155-
case <-i.sub.Done():
156-
if err := i.sub.Err(); err != nil {
157-
return recs, fmt.Errorf("logical replication error: %w", err)
158-
}
159-
if err := ctx.Err(); err != nil {
160-
// Return what we have with context error
161-
return recs, err
162-
}
163-
// Return what we have with subscription stopped error
164-
return recs, fmt.Errorf("subscription stopped, no more data to fetch (this smells like a bug)")
165-
default:
166-
// No more records currently available
167-
return recs, nil
218+
func (i *CDCIterator) nextRecordsBatch(ctx context.Context) ([]opencdc.Record, error) {
219+
select {
220+
case <-ctx.Done():
221+
// Return what we have with the error
222+
return nil, ctx.Err()
223+
case <-i.sub.Done():
224+
if err := i.sub.Err(); err != nil {
225+
return nil, fmt.Errorf("logical replication error: %w", err)
168226
}
227+
if err := ctx.Err(); err != nil {
228+
// Return what we have with the context error
229+
return nil, err
230+
}
231+
// Return what we have with subscription stopped error
232+
return nil, fmt.Errorf("subscription stopped, no more data to fetch (this smells like a bug)")
233+
case batch := <-i.batchesCh:
234+
sdk.Logger(ctx).Trace().
235+
Int("records", len(batch)).
236+
Msg("CDCIterator.NextN received batch of records")
237+
238+
return batch, nil
239+
default:
240+
// No more records currently available
241+
return nil, nil
169242
}
243+
}
170244

171-
return recs, nil
245+
// appendRecordsWithLimit appends records to dst from src, until the given limit is reached,
246+
// or all records from src have been moved.
247+
// If some records from src are not moved (probably because they lack emotions),
248+
// they are saved to recordsForNextRead.
249+
func (i *CDCIterator) appendRecordsWithLimit(dst []opencdc.Record, src []opencdc.Record, limit int) []opencdc.Record {
250+
if len(src) == 0 || len(dst) > limit {
251+
return src
252+
}
253+
254+
needed := limit - len(dst)
255+
if needed > len(src) {
256+
needed = len(src)
257+
}
258+
259+
dst = append(dst, src[:needed]...)
260+
i.recordsForNextRead = src[needed:]
261+
262+
return dst
172263
}
173264

174265
// Ack forwards the acknowledgment to the subscription.

source/logrepl/cdc_test.go

Lines changed: 93 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,93 @@ func TestCDCIterator_Ack(t *testing.T) {
456456
})
457457
}
458458
}
459+
460+
func TestCDCIterator_NextN_InternalBatching(t *testing.T) {
461+
ctx := test.Context(t)
462+
pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
463+
table := test.SetupEmptyTestTable(ctx, t, pool)
464+
465+
is := is.New(t)
466+
underTest := testCDCIterator(ctx, t, pool, table, true)
467+
<-underTest.sub.Ready()
468+
469+
insertTestRows(ctx, is, pool, table, 1, 1)
470+
// wait until the CDCHandler flushes this one record
471+
// so that we force the CDCIterator to wait for another batch
472+
time.Sleep(time.Second * 2)
473+
insertTestRows(ctx, is, pool, table, 2, 5)
474+
475+
// we request 2 records, expect records 1 and 2
476+
got, err := underTest.NextN(ctx, 2)
477+
is.NoErr(err)
478+
verifyOpenCDCRecords(is, got, table, 1, 2)
479+
time.Sleep(200 * time.Millisecond)
480+
481+
// we request 2 records, expect records 3 and 4
482+
got, err = underTest.NextN(ctx, 2)
483+
is.NoErr(err)
484+
verifyOpenCDCRecords(is, got, table, 3, 4)
485+
time.Sleep(200 * time.Millisecond)
486+
487+
// we request 2 records, expect record 5
488+
got, err = underTest.NextN(ctx, 2)
489+
is.NoErr(err)
490+
verifyOpenCDCRecords(is, got, table, 5, 5)
491+
}
492+
493+
func insertTestRows(ctx context.Context, is *is.I, pool *pgxpool.Pool, table string, from int, to int) {
494+
for i := from; i <= to; i++ {
495+
_, err := pool.Exec(
496+
ctx,
497+
fmt.Sprintf(
498+
`INSERT INTO %s (id, column1, column2, column3, column4, column5)
499+
VALUES (%d, 'test-%d', %d, false, 12.3, 14)`, table, i+10, i, i*100,
500+
),
501+
)
502+
is.NoErr(err)
503+
}
504+
}
505+
506+
func verifyOpenCDCRecords(is *is.I, got []opencdc.Record, tableName string, fromID, toID int) {
507+
// Build the expected records slice
508+
var want []opencdc.Record
509+
510+
for i := fromID; i <= toID; i++ {
511+
id := int64(i + 10)
512+
record := opencdc.Record{
513+
Operation: opencdc.OperationCreate,
514+
Key: opencdc.StructuredData{
515+
"id": id,
516+
},
517+
Payload: opencdc.Change{
518+
After: opencdc.StructuredData{
519+
"id": id,
520+
"key": nil,
521+
"column1": fmt.Sprintf("test-%d", i),
522+
"column2": int32(i) * 100, //nolint:gosec // fine, we know the value is small enough
523+
"column3": false,
524+
"column4": 12.3,
525+
"column5": int64(14),
526+
"column6": nil,
527+
"column7": nil,
528+
"UppercaseColumn1": nil,
529+
},
530+
},
531+
Metadata: opencdc.Metadata{
532+
opencdc.MetadataCollection: tableName,
533+
},
534+
}
535+
536+
want = append(want, record)
537+
}
538+
539+
cmpOpts := []cmp.Option{
540+
cmpopts.IgnoreUnexported(opencdc.Record{}),
541+
cmpopts.IgnoreFields(opencdc.Record{}, "Position", "Metadata"),
542+
}
543+
is.Equal("", cmp.Diff(want, got, cmpOpts...)) // mismatch (-want +got)
544+
}
545+
459546
func TestCDCIterator_NextN(t *testing.T) {
460547
ctx := test.Context(t)
461548
pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
@@ -575,17 +662,12 @@ func TestCDCIterator_NextN(t *testing.T) {
575662
VALUES (30, 'test-1', 100, false, 12.3, 14)`, table))
576663
is.NoErr(err)
577664

578-
go func() {
579-
time.Sleep(100 * time.Millisecond)
580-
is.NoErr(i.Teardown(ctx))
581-
}()
665+
time.Sleep(100 * time.Millisecond)
666+
is.NoErr(i.Teardown(ctx))
582667

583-
records, err := i.NextN(ctx, 5)
584-
if err != nil {
585-
is.True(strings.Contains(err.Error(), "logical replication error"))
586-
} else {
587-
is.True(len(records) > 0)
588-
}
668+
_, err = i.NextN(ctx, 5)
669+
is.True(err != nil)
670+
is.True(strings.Contains(err.Error(), "logical replication error"))
589671
})
590672
}
591673

@@ -597,6 +679,7 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl
597679
PublicationName: table, // table is random, reuse for publication name
598680
SlotName: table, // table is random, reuse for slot name
599681
WithAvroSchema: true,
682+
BatchSize: 2,
600683
}
601684

602685
i, err := NewCDCIterator(ctx, pool, config)

0 commit comments

Comments
 (0)