Skip to content

Commit 0539319

Browse files
authored
Fixed topic traces (#1873)
1 parent 30b2a2d commit 0539319

File tree

3 files changed

+67
-80
lines changed

3 files changed

+67
-80
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
Fixed race in integration test `TestTopicWriterLogMessagesWithoutData`
1+
* Fixed traces handling in `topic.Reader`
2+
* Fixed race in integration test `TestTopicWriterLogMessagesWithoutData`
23

34
## v3.117.0
45
* Fixed `conn/pool.Get()` behaviour for YDB databases with public IPs. Bug was introduced in v3.116.2

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 65 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -198,16 +198,17 @@ func (r *topicStreamReaderImpl) PopMessagesBatchTx(
198198
tx tx.Transaction,
199199
opts ReadMessageBatchOptions,
200200
) (_ *topicreadercommon.PublicBatch, resErr error) {
201+
logCtx := r.cfg.BaseContext
202+
onDone := trace.TopicOnReaderStreamPopBatchTx(
203+
r.cfg.Trace,
204+
&logCtx,
205+
r.readerID,
206+
r.readConnectionID,
207+
tx.SessionID(),
208+
tx,
209+
)
201210
defer func() {
202-
logCtx := r.cfg.BaseContext
203-
trace.TopicOnReaderStreamPopBatchTx(
204-
r.cfg.Trace,
205-
&logCtx,
206-
r.readerID,
207-
r.readConnectionID,
208-
tx.SessionID(),
209-
tx,
210-
)(resErr)
211+
onDone(resErr)
211212
}()
212213

213214
batch, err := r.ReadMessageBatch(ctx, opts)
@@ -258,16 +259,17 @@ func (r *topicStreamReaderImpl) commitWithTransaction(
258259
r.addOnTransactionCompletedHandler(ctx, tx, batch)
259260
} else {
260261
_ = retry.Retry(ctx, func(ctx context.Context) (err error) {
262+
logCtx := r.cfg.BaseContext
263+
onDone := trace.TopicOnReaderTransactionRollback(
264+
r.cfg.Trace,
265+
&logCtx,
266+
r.readerID,
267+
r.readConnectionID,
268+
tx.SessionID(),
269+
tx,
270+
)
261271
defer func() {
262-
logCtx := r.cfg.BaseContext
263-
trace.TopicOnReaderTransactionRollback(
264-
r.cfg.Trace,
265-
&logCtx,
266-
r.readerID,
267-
r.readConnectionID,
268-
tx.SessionID(),
269-
tx,
270-
)(err)
272+
onDone(err)
271273
}()
272274

273275
return tx.Rollback(ctx)
@@ -290,18 +292,17 @@ func (r *topicStreamReaderImpl) addOnTransactionCompletedHandler(
290292
) {
291293
commitRange := topicreadercommon.GetCommitRange(batch)
292294
tx.OnCompleted(func(transactionResult error) {
293-
defer func() {
294-
logCtx := r.cfg.BaseContext
295-
trace.TopicOnReaderTransactionCompleted(
296-
r.cfg.Trace,
297-
&logCtx,
298-
r.readerID,
299-
r.readConnectionID,
300-
tx.SessionID(),
301-
tx,
302-
transactionResult,
303-
)
304-
}()
295+
logCtx := r.cfg.BaseContext
296+
onDone := trace.TopicOnReaderTransactionCompleted(
297+
r.cfg.Trace,
298+
&logCtx,
299+
r.readerID,
300+
r.readConnectionID,
301+
tx.SessionID(),
302+
tx,
303+
transactionResult,
304+
)
305+
defer onDone()
305306

306307
if transactionResult == nil {
307308
topicreadercommon.BatchGetPartitionSession(batch).SetCommittedOffsetForward(commitRange.CommitOffsetEnd)
@@ -350,38 +351,28 @@ func (r *topicStreamReaderImpl) ReadMessageBatch(
350351
ctx context.Context,
351352
opts ReadMessageBatchOptions,
352353
) (batch *topicreadercommon.PublicBatch, err error) {
354+
mergeCtx := xcontext.MergeContexts(ctx, r.cfg.BaseContext)
355+
onDone := trace.TopicOnReaderReadMessages(
356+
r.cfg.Trace,
357+
&mergeCtx,
358+
opts.MinCount,
359+
opts.MaxCount,
360+
r.getRestBufferBytes(),
361+
)
353362
defer func() {
354-
traceFunc := func(
355-
messagesCount int,
356-
topic string,
357-
partitionID int64,
358-
partitionSessionID int64,
359-
offsetStart int64,
360-
offsetEnd int64,
361-
freeBufferCapacity int,
362-
) {
363-
mergeCtx := xcontext.MergeContexts(ctx, r.cfg.BaseContext)
364-
onDone := trace.TopicOnReaderReadMessages(
365-
r.cfg.Trace,
366-
&mergeCtx,
367-
opts.MinCount,
368-
opts.MaxCount,
369-
r.getRestBufferBytes(),
370-
)
371-
onDone(messagesCount, topic, partitionID, partitionSessionID, offsetStart, offsetEnd, freeBufferCapacity, nil)
372-
}
373363
if batch == nil {
374-
traceFunc(0, "", -1, -1, -1, -1, r.getRestBufferBytes())
364+
onDone(0, "", -1, -1, -1, -1, r.getRestBufferBytes(), err)
375365
} else {
376366
commitRange := topicreadercommon.GetCommitRange(batch)
377-
traceFunc(
367+
onDone(
378368
len(batch.Messages),
379369
batch.Topic(),
380370
batch.PartitionID(),
381371
topicreadercommon.BatchGetPartitionSession(batch).StreamPartitionSessionID.ToInt64(),
382372
commitRange.CommitOffsetStart.ToInt64(),
383373
commitRange.CommitOffsetEnd.ToInt64(),
384374
r.getRestBufferBytes(),
375+
err,
385376
)
386377
}
387378
}()
@@ -541,18 +532,20 @@ func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange topicrea
541532
return xerrors.WithStackTrace(errCommitWithNilPartitionSession)
542533
}
543534

535+
session := commitRange.PartitionSession
536+
mergeCtx := xcontext.MergeContexts(ctx, r.cfg.BaseContext)
537+
onDone := trace.TopicOnReaderCommit(
538+
r.cfg.Trace,
539+
&mergeCtx,
540+
session.Topic,
541+
session.PartitionID,
542+
session.StreamPartitionSessionID.ToInt64(),
543+
commitRange.CommitOffsetStart.ToInt64(),
544+
commitRange.CommitOffsetEnd.ToInt64(),
545+
)
546+
544547
defer func() {
545-
session := commitRange.PartitionSession
546-
mergeCtx := xcontext.MergeContexts(ctx, r.cfg.BaseContext)
547-
trace.TopicOnReaderCommit(
548-
r.cfg.Trace,
549-
&mergeCtx,
550-
session.Topic,
551-
session.PartitionID,
552-
session.StreamPartitionSessionID.ToInt64(),
553-
commitRange.CommitOffsetStart.ToInt64(),
554-
commitRange.CommitOffsetEnd.ToInt64(),
555-
)
548+
onDone(err)
556549
}()
557550

558551
if err = r.checkCommitRange(commitRange); err != nil {
@@ -631,9 +624,10 @@ func (r *topicStreamReaderImpl) setStarted() error {
631624
func (r *topicStreamReaderImpl) initSession() (err error) {
632625
initMessage := topicreadercommon.CreateInitMessage(r.cfg.Consumer, r.cfg.EnableSplitMergeSupport, r.cfg.ReadSelectors)
633626

627+
logCtx := r.cfg.BaseContext
628+
onDone := trace.TopicOnReaderInit(r.cfg.Trace, &logCtx, r.readConnectionID, initMessage)
629+
634630
defer func() {
635-
logCtx := r.cfg.BaseContext
636-
onDone := trace.TopicOnReaderInit(r.cfg.Trace, &logCtx, r.readConnectionID, initMessage)
637631
onDone(r.readConnectionID, err)
638632
}()
639633

@@ -822,9 +816,11 @@ func (r *topicStreamReaderImpl) updateTokenLoop(ctx context.Context) {
822816

823817
func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse) (err error) {
824818
resCapacity := r.addRestBufferBytes(-msg.BytesSize)
819+
820+
logCtx := r.cfg.BaseContext
821+
onDone := trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, &logCtx, r.readConnectionID, resCapacity, msg)
825822
defer func() {
826-
logCtx := r.cfg.BaseContext
827-
trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, &logCtx, r.readConnectionID, resCapacity, msg)
823+
onDone(err)
828824
}()
829825

830826
batches, err2 := topicreadercommon.ReadRawBatchesToPublicBatches(msg, &r.sessionController, r.cfg.Decoders)
@@ -842,9 +838,10 @@ func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse)
842838
}
843839

844840
func (r *topicStreamReaderImpl) CloseWithError(ctx context.Context, reason error) (closeErr error) {
841+
logCtx := r.cfg.BaseContext
842+
onDone := trace.TopicOnReaderClose(r.cfg.Trace, &logCtx, r.readConnectionID, reason)
845843
defer func() {
846-
logCtx := r.cfg.BaseContext
847-
trace.TopicOnReaderClose(r.cfg.Trace, &logCtx, r.readConnectionID, reason)
844+
onDone(closeErr)
848845
}()
849846

850847
isFirstClose := false

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,17 +1271,6 @@ func (e *streamEnv) Start() {
12711271
})
12721272
}
12731273

1274-
func (e *streamEnv) readerReceiveWaitClose(callback func()) {
1275-
e.stream.EXPECT().Recv().DoAndReturn(func() (rawtopicreader.ServerMessage, error) {
1276-
if callback != nil {
1277-
callback()
1278-
}
1279-
<-e.ctx.Done()
1280-
1281-
return nil, errors.New("test reader closed")
1282-
})
1283-
}
1284-
12851274
func (e *streamEnv) SendFromServer(msg rawtopicreader.ServerMessage) {
12861275
e.SendFromServerAndSetNextCallback(msg, nil)
12871276
}

0 commit comments

Comments
 (0)