Skip to content

Commit d74f9fa

Browse files
committed
fix inf reconnections
1 parent adb5de8 commit d74f9fa

File tree

3 files changed

+43
-1
lines changed

3 files changed

+43
-1
lines changed

internal/topic/retriable_error.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
1213
)
@@ -44,6 +45,9 @@ var (
4445

4546
func CheckResetReconnectionCounters(lastTry, now time.Time, connectionTimeout time.Duration) bool {
4647
const resetAttemptEmpiricalCoefficient = 10
48+
if connectionTimeout == value.InfiniteDuration {
49+
return false
50+
}
4751
return now.Sub(lastTry) > connectionTimeout*resetAttemptEmpiricalCoefficient
4852
}
4953

internal/topic/retriable_error_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
grpcStatus "google.golang.org/grpc/status"
1313

1414
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1617
)
1718

@@ -232,3 +233,40 @@ func TestCheckRetryMode(t *testing.T) {
232233
})
233234
}
234235
}
236+
237+
func TestCheckResetReconnectionCounters(t *testing.T) {
238+
now := time.Now()
239+
fmt.Println(now.Sub(now.Add(-61 * time.Second)))
240+
table := []struct {
241+
name string
242+
lastTry time.Time
243+
connectionTimeout time.Duration
244+
shouldReset bool
245+
}{
246+
{
247+
name: "InfiniteConnectionTimeout",
248+
lastTry: time.Time{},
249+
connectionTimeout: value.InfiniteDuration,
250+
shouldReset: false,
251+
},
252+
{
253+
name: "LastTryLessThanConnectionTimeout",
254+
lastTry: now.Add(-30 * time.Second),
255+
connectionTimeout: time.Minute,
256+
shouldReset: false,
257+
},
258+
{
259+
name: "LastTryGreaterThanConnectionTimeout",
260+
lastTry: now.Add(-time.Hour),
261+
connectionTimeout: time.Minute,
262+
shouldReset: true,
263+
},
264+
}
265+
266+
for _, test := range table {
267+
t.Run(test.name, func(t *testing.T) {
268+
shouldReset := CheckResetReconnectionCounters(test.lastTry, now, test.connectionTimeout)
269+
require.Equal(t, test.shouldReset, shouldReset)
270+
})
271+
}
272+
}

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
369369
streamCtx, streamCtxCancel = createStreamContext()
370370

371371
now := time.Now()
372-
if topic.CheckResetReconnectionCounters(prevAttemptTime, now, w.cfg.connectTimeout) {
372+
if startOfRetries.IsZero() || topic.CheckResetReconnectionCounters(prevAttemptTime, now, w.cfg.connectTimeout) {
373373
attempt = 0
374374
startOfRetries = w.clock.Now()
375375
} else {

0 commit comments

Comments
 (0)