Skip to content

Commit f067019

Browse files
authored
Merge pull request #1291 Fixed handle unretryable reconnection error in writer reconnection loop.
2 parents 1c58211 + af61249 commit f067019

File tree

8 files changed

+102
-31
lines changed

8 files changed

+102
-31
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed handle reconnection timeout error
12
* Fixed experimental topic listener handle stop partition event
23

34
## v3.76.1

internal/topic/retriable_error.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const (
1717
connectionEstablishedTimeout = time.Minute
1818
)
1919

20+
var errNil = xerrors.Wrap(errors.New("nil error is not retrieable"))
21+
2022
type RetrySettings struct {
2123
StartTimeout time.Duration // Full retry timeout
2224
CheckError PublicCheckErrorRetryFunction
@@ -53,50 +55,58 @@ func CheckResetReconnectionCounters(lastTry, now time.Time, connectionTimeout ti
5355
return now.Sub(lastTry) > connectionTimeout*resetAttemptEmpiricalCoefficient
5456
}
5557

56-
func CheckRetryMode(err error, settings RetrySettings, retriesDuration time.Duration) (
58+
// RetryDecision check if err is retriable.
59+
// if return nil stopRetryReason - err can be retried
60+
// if return non nil stopRetryReason - err is not retriable and stopRetryReason contains reason,
61+
// which should be used instead of err
62+
func RetryDecision(checkErr error, settings RetrySettings, retriesDuration time.Duration) (
5763
_ backoff.Backoff,
58-
isRetriable bool,
64+
stopRetryReason error,
5965
) {
6066
// nil is not error and doesn't need retry it.
61-
if err == nil {
62-
return nil, false
67+
if checkErr == nil {
68+
return nil, xerrors.WithStackTrace(errNil)
6369
}
6470

6571
// eof is retriable for topic
66-
if errors.Is(err, io.EOF) && xerrors.RetryableError(err) == nil {
67-
err = xerrors.Retryable(err, xerrors.WithName("TopicEOF"))
72+
if errors.Is(checkErr, io.EOF) && xerrors.RetryableError(checkErr) == nil {
73+
checkErr = xerrors.Retryable(checkErr, xerrors.WithName("TopicEOF"))
6874
}
6975

7076
if retriesDuration > settings.StartTimeout {
71-
return nil, false
77+
return nil, fmt.Errorf("ydb: topic reader reconnection timeout, last error: %w", xerrors.Unretryable(checkErr))
7278
}
7379

74-
mode := retry.Check(err)
80+
mode := retry.Check(checkErr)
7581

7682
decision := PublicRetryDecisionDefault
7783
if settings.CheckError != nil {
78-
decision = settings.CheckError(NewCheckRetryArgs(err))
84+
decision = settings.CheckError(NewCheckRetryArgs(checkErr))
7985
}
8086

8187
switch decision {
8288
case PublicRetryDecisionDefault:
83-
isRetriable = mode.MustRetry(true)
89+
isRetriable := mode.MustRetry(true)
90+
if !isRetriable {
91+
return nil, fmt.Errorf("ydb: topic reader unretriable error: %w", xerrors.Unretryable(checkErr))
92+
}
8493
case PublicRetryDecisionRetry:
85-
isRetriable = true
94+
// pass
8695
case PublicRetryDecisionStop:
87-
isRetriable = false
96+
return nil, fmt.Errorf(
97+
"ydb: topic reader unretriable error by check error callback: %w",
98+
xerrors.Unretryable(checkErr),
99+
)
88100
default:
89101
panic(fmt.Errorf("unexpected retry decision: %v", decision))
90102
}
91103

92-
if !isRetriable {
93-
return nil, false
94-
}
104+
// checkErr is retryable error
95105

96106
switch mode.BackoffType() {
97107
case backoff.TypeFast:
98-
return backoff.Fast, true
108+
return backoff.Fast, nil
99109
default:
100-
return backoff.Slow, true
110+
return backoff.Slow, nil
101111
}
102112
}

internal/topic/retriable_error_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,9 @@ func TestCheckRetryMode(t *testing.T) {
227227

228228
for _, test := range table {
229229
t.Run(test.name, func(t *testing.T) {
230-
resBackoff, retriable := CheckRetryMode(test.err, test.settings, test.duration)
230+
resBackoff, stopReason := RetryDecision(test.err, test.settings, test.duration)
231231
require.Equal(t, test.resBackoff, resBackoff)
232-
require.Equal(t, test.resRetriable, retriable)
232+
require.Equal(t, test.resRetriable, stopReason == nil)
233233
})
234234
}
235235
}

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,14 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) {
216216
}
217217
}
218218

219+
onReconnectionDone := trace.TopicOnReaderReconnect(r.tracer, request.reason)
220+
219221
if request.reason != nil {
220-
if retryBackoff, isRetriableErr := r.checkErrRetryMode(
222+
retryBackoff, stopRetryReason := r.checkErrRetryMode(
221223
request.reason,
222224
r.clock.Since(retriesStarted),
223-
); isRetriableErr {
225+
)
226+
if stopRetryReason == nil {
224227
if err := func() error {
225228
t := r.clock.NewTimer(retryBackoff.Delay(attempt))
226229
defer t.Stop()
@@ -234,10 +237,16 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) {
234237
}(); err != nil {
235238
return
236239
}
240+
} else {
241+
_ = r.CloseWithError(ctx, stopRetryReason)
242+
onReconnectionDone(stopRetryReason)
243+
244+
return
237245
}
238246
}
239247

240-
_ = r.reconnect(ctx, request.reason, request.oldReader)
248+
err := r.reconnect(ctx, request.reason, request.oldReader)
249+
onReconnectionDone(err)
241250
}
242251
}
243252

@@ -309,16 +318,16 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
309318
}
310319

311320
func (r *readerReconnector) isRetriableError(err error) bool {
312-
_, res := topic.CheckRetryMode(err, r.retrySettings, 0)
321+
_, stopReason := topic.RetryDecision(err, r.retrySettings, 0)
313322

314-
return res
323+
return stopReason == nil
315324
}
316325

317326
func (r *readerReconnector) checkErrRetryMode(err error, retriesDuration time.Duration) (
318327
backoffType backoff.Backoff,
319-
isRetriableErr bool,
328+
stopRetryReason error,
320329
) {
321-
return topic.CheckRetryMode(err, r.retrySettings, retriesDuration)
330+
return topic.RetryDecision(err, r.retrySettings, retriesDuration)
322331
}
323332

324333
func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, _ context.CancelCauseFunc, err error) {

internal/topic/topicreaderinternal/stream_reconnector_test.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/ydb-platform/ydb-go-sdk/v3/internal/background"
1313
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreadercommon"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
@@ -65,6 +66,9 @@ func TestTopicReaderReconnectorReadMessageBatch(t *testing.T) {
6566

6667
return baseReader, nil
6768
},
69+
retrySettings: topic.RetrySettings{
70+
StartTimeout: topic.DefaultStartTimeout,
71+
},
6872
streamErr: errUnconnected,
6973
tracer: &trace.Topic{},
7074
}
@@ -109,6 +113,9 @@ func TestTopicReaderReconnectorReadMessageBatch(t *testing.T) {
109113

110114
return readers[connectCalled-1], nil
111115
},
116+
retrySettings: topic.RetrySettings{
117+
StartTimeout: topic.DefaultStartTimeout,
118+
},
112119
streamErr: errUnconnected,
113120
tracer: &trace.Topic{},
114121
}
@@ -228,9 +235,12 @@ func TestTopicReaderReconnectorConnectionLoop(t *testing.T) {
228235
newStream2.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).MinTimes(1)
229236

230237
reconnector := &readerReconnector{
231-
connectTimeout: value.InfiniteDuration,
232-
background: *background.NewWorker(ctx, "test-worker, "+t.Name()),
238+
background: *background.NewWorker(ctx, "test-worker, "+t.Name()),
239+
retrySettings: topic.RetrySettings{
240+
StartTimeout: value.InfiniteDuration,
241+
},
233242
tracer: &trace.Topic{},
243+
connectTimeout: value.InfiniteDuration,
234244
}
235245
reconnector.initChannelsAndClock()
236246

@@ -499,7 +509,7 @@ func TestTopicReaderReconnectorReconnectWithError(t *testing.T) {
499509
tracer: &trace.Topic{},
500510
}
501511
reconnector.initChannelsAndClock()
502-
err := reconnector.reconnect(ctx, nil, nil)
512+
err := reconnector.reconnect(ctx, errors.New("test-reconnect"), nil)
503513
require.ErrorIs(t, err, testErr)
504514
require.ErrorIs(t, reconnector.streamErr, testErr)
505515
}

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,11 @@ func (w *WriterReconnector) handleReconnectRetry(
431431
startOfRetries time.Time,
432432
) bool {
433433
retryDuration := w.cfg.clock.Since(startOfRetries)
434-
if backoff, retry := topic.CheckRetryMode(reconnectReason, w.retrySettings, retryDuration); retry {
434+
if backoff, stopRetryReason := topic.RetryDecision(
435+
reconnectReason,
436+
w.retrySettings,
437+
retryDuration,
438+
); stopRetryReason == nil {
435439
delay := backoff.Delay(attempt)
436440
delayTimer := w.cfg.clock.NewTimer(delay)
437441
select {
@@ -444,7 +448,7 @@ func (w *WriterReconnector) handleReconnectRetry(
444448
// pass
445449
}
446450
} else {
447-
_ = w.close(ctx, fmt.Errorf("%w, was retried (%v)", reconnectReason, retryDuration))
451+
_ = w.close(ctx, fmt.Errorf("%w, was retried (%v)", stopRetryReason, retryDuration))
448452

449453
return true
450454
}

internal/xerrors/retryable.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,27 @@ func Retryable(err error, opts ...RetryableErrorOption) error {
123123

124124
// RetryableError return Error if err is retriable error, else nil
125125
func RetryableError(err error) Error {
126+
var unretriableErr unretryableError
127+
if errors.As(err, &unretriableErr) {
128+
return nil
129+
}
130+
126131
var e *retryableError
127132
if errors.As(err, &e) {
128133
return e
129134
}
130135

131136
return nil
132137
}
138+
139+
func Unretryable(err error) unretryableError {
140+
return unretryableError{err}
141+
}
142+
143+
type unretryableError struct {
144+
error
145+
}
146+
147+
func (e unretryableError) Unwrap() error {
148+
return e.error
149+
}

internal/xerrors/retryable_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package xerrors
22

33
import (
4+
"errors"
45
"fmt"
56
"testing"
67

@@ -51,3 +52,22 @@ func TestRetryableCode(t *testing.T) {
5152
})
5253
}
5354
}
55+
56+
func TestRetriableError(t *testing.T) {
57+
t.Run("retryable", func(t *testing.T) {
58+
retriable := Retryable(errors.New("test"))
59+
wrapped := fmt.Errorf("wrap: %w", retriable)
60+
require.Equal(t, retriable, RetryableError(retriable))
61+
require.Equal(t, retriable, RetryableError(wrapped))
62+
})
63+
t.Run("unretryable", func(t *testing.T) {
64+
require.NoError(t, RetryableError(errors.New("test")))
65+
require.NoError(t, RetryableError(Unretryable(Retryable(errors.New("test")))))
66+
})
67+
}
68+
69+
func TestUnretryableUnwrap(t *testing.T) {
70+
test := errors.New("test")
71+
wrapped := Unretryable(test)
72+
require.ErrorIs(t, wrapped, test)
73+
}

0 commit comments

Comments
 (0)