Skip to content

Commit eb49eb8

Browse files
authored
Merge pull request #1006 from art22m/inf-reconnection-fix
Fix topic writer infinite reconnections
2 parents 0b4d369 + 2690bff commit eb49eb8

File tree

4 files changed

+55
-6
lines changed

4 files changed

+55
-6
lines changed

internal/topic/retriable_error.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import (
77
"time"
88

99
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
1213
)
1314

1415
const (
15-
DefaultStartTimeout = time.Minute
16+
DefaultStartTimeout = time.Minute
17+
connectionEstablishedTimeout = time.Minute
1618
)
1719

1820
type RetrySettings struct {
@@ -44,6 +46,9 @@ var (
4446

4547
func CheckResetReconnectionCounters(lastTry, now time.Time, connectionTimeout time.Duration) bool {
4648
const resetAttemptEmpiricalCoefficient = 10
49+
if connectionTimeout == value.InfiniteDuration {
50+
return now.Sub(lastTry) > connectionEstablishedTimeout
51+
}
4752
return now.Sub(lastTry) > connectionTimeout*resetAttemptEmpiricalCoefficient
4853
}
4954

@@ -87,9 +92,10 @@ func CheckRetryMode(err error, settings RetrySettings, retriesDuration time.Dura
8792
return nil, false
8893
}
8994

90-
if mode.BackoffType() == backoff.TypeFast {
95+
switch mode.BackoffType() {
96+
case backoff.TypeFast:
9197
return backoff.Fast, true
98+
default:
99+
return backoff.Slow, true
92100
}
93-
94-
return backoff.Slow, true
95101
}

internal/topic/retriable_error_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
grpcStatus "google.golang.org/grpc/status"
1313

1414
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1617
)
1718

@@ -232,3 +233,45 @@ func TestCheckRetryMode(t *testing.T) {
232233
})
233234
}
234235
}
236+
237+
func TestCheckResetReconnectionCounters(t *testing.T) {
238+
now := time.Now()
239+
table := []struct {
240+
name string
241+
lastTry time.Time
242+
connectionTimeout time.Duration
243+
shouldReset bool
244+
}{
245+
{
246+
name: "RecentLastTryWithInfiniteConnectionTimeout",
247+
lastTry: now.Add(-30 * time.Second),
248+
connectionTimeout: value.InfiniteDuration,
249+
shouldReset: false,
250+
},
251+
{
252+
name: "OldLastTryWithInfiniteConnectionTimeout",
253+
lastTry: now.Add(-30 * time.Minute),
254+
connectionTimeout: value.InfiniteDuration,
255+
shouldReset: true,
256+
},
257+
{
258+
name: "LastTryLessThanConnectionTimeout",
259+
lastTry: now.Add(-30 * time.Second),
260+
connectionTimeout: time.Minute,
261+
shouldReset: false,
262+
},
263+
{
264+
name: "LastTryGreaterThanConnectionTimeout",
265+
lastTry: now.Add(-time.Hour),
266+
connectionTimeout: time.Minute,
267+
shouldReset: true,
268+
},
269+
}
270+
271+
for _, test := range table {
272+
t.Run(test.name, func(t *testing.T) {
273+
shouldReset := CheckResetReconnectionCounters(test.lastTry, now, test.connectionTimeout)
274+
require.Equal(t, test.shouldReset, shouldReset)
275+
})
276+
}
277+
}

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
369369
streamCtx, streamCtxCancel = createStreamContext()
370370

371371
now := time.Now()
372-
if topic.CheckResetReconnectionCounters(prevAttemptTime, now, w.cfg.connectTimeout) {
372+
if startOfRetries.IsZero() || topic.CheckResetReconnectionCounters(prevAttemptTime, now, w.cfg.connectTimeout) {
373373
attempt = 0
374374
startOfRetries = w.clock.Now()
375375
} else {

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ func TestWriterImpl_Reconnect(t *testing.T) {
524524
err := w.Write(ctx, newTestMessages(1))
525525
require.NoError(t, err)
526526

527-
xtest.WaitChannelClosed(t, connectionLoopStopped)
527+
xtest.WaitChannelClosedWithTimeout(t, connectionLoopStopped, 4*time.Second)
528528
})
529529
}
530530

0 commit comments

Comments
 (0)