@@ -11,7 +11,6 @@ import (
1111 "github.com/go-redis/redis/v8/internal/pool"
1212 "github.com/go-redis/redis/v8/internal/proto"
1313 "go.opentelemetry.io/otel/attribute"
14- "go.opentelemetry.io/otel/trace"
1514)
1615
1716// Nil reply returned by Redis when key does not exist.
@@ -214,10 +213,7 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
214213 return cn , nil
215214 }
216215
217- err = internal .WithSpan (ctx , "redis.init_conn" , func (ctx context.Context , span trace.Span ) error {
218- return c .initConn (ctx , cn )
219- })
220- if err != nil {
216+ if err := c .initConn (ctx , cn ); err != nil {
221217 c .connPool .Remove (ctx , cn , err )
222218 if err := errors .Unwrap (err ); err != nil {
223219 return nil , err
@@ -241,6 +237,9 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
241237 return nil
242238 }
243239
240+ ctx , span := internal .StartSpan (ctx , "redis.init_conn" )
241+ defer span .End ()
242+
244243 connPool := pool .NewSingleConnPool (c .connPool , cn )
245244 conn := newConn (ctx , c .opt , connPool )
246245
@@ -288,91 +287,95 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error)
288287func (c * baseClient ) withConn (
289288 ctx context.Context , fn func (context.Context , * pool.Conn ) error ,
290289) error {
291- return internal .WithSpan (ctx , "redis.with_conn" , func (ctx context.Context , span trace.Span ) error {
292- cn , err := c .getConn (ctx )
293- if err != nil {
294- return err
295- }
290+ ctx , span := internal .StartSpan (ctx , "redis.with_conn" )
291+ defer span .End ()
296292
297- if span .IsRecording () {
298- if remoteAddr := cn .RemoteAddr (); remoteAddr != nil {
299- span .SetAttributes (attribute .String ("net.peer.ip" , remoteAddr .String ()))
300- }
293+ cn , err := c .getConn (ctx )
294+ if err != nil {
295+ return err
296+ }
297+
298+ if span .IsRecording () {
299+ if remoteAddr := cn .RemoteAddr (); remoteAddr != nil {
300+ span .SetAttributes (attribute .String ("net.peer.ip" , remoteAddr .String ()))
301301 }
302+ }
302303
303- defer func () {
304- c .releaseConn (ctx , cn , err )
305- }()
304+ defer func () {
305+ c .releaseConn (ctx , cn , err )
306+ }()
306307
307- done := ctx .Done ()
308- if done == nil {
309- err = fn (ctx , cn )
310- return err
311- }
308+ done := ctx .Done ()
309+ if done == nil {
310+ err = fn (ctx , cn )
311+ return err
312+ }
312313
313- errc := make (chan error , 1 )
314- go func () { errc <- fn (ctx , cn ) }()
314+ errc := make (chan error , 1 )
315+ go func () { errc <- fn (ctx , cn ) }()
315316
316- select {
317- case <- done :
318- _ = cn .Close ()
319- // Wait for the goroutine to finish and send something.
320- <- errc
317+ select {
318+ case <- done :
319+ _ = cn .Close ()
320+ // Wait for the goroutine to finish and send something.
321+ <- errc
321322
322- err = ctx .Err ()
323- return err
324- case err = <- errc :
325- return err
326- }
327- })
323+ err = ctx .Err ()
324+ return err
325+ case err = <- errc :
326+ return err
327+ }
328328}
329329
330330func (c * baseClient ) process (ctx context.Context , cmd Cmder ) error {
331331 var lastErr error
332332 for attempt := 0 ; attempt <= c .opt .MaxRetries ; attempt ++ {
333333 attempt := attempt
334334
335- var retry bool
336- err := internal .WithSpan (ctx , "redis.process" , func (ctx context.Context , span trace.Span ) error {
337- if attempt > 0 {
338- if err := internal .Sleep (ctx , c .retryBackoff (attempt )); err != nil {
339- return err
340- }
341- }
342-
343- retryTimeout := uint32 (1 )
344- err := c .withConn (ctx , func (ctx context.Context , cn * pool.Conn ) error {
345- err := cn .WithWriter (ctx , c .opt .WriteTimeout , func (wr * proto.Writer ) error {
346- return writeCmd (wr , cmd )
347- })
348- if err != nil {
349- return err
350- }
351-
352- err = cn .WithReader (ctx , c .cmdTimeout (cmd ), cmd .readReply )
353- if err != nil {
354- if cmd .readTimeout () == nil {
355- atomic .StoreUint32 (& retryTimeout , 1 )
356- }
357- return err
358- }
359-
360- return nil
361- })
362- if err == nil {
363- return nil
364- }
365- retry = shouldRetry (err , atomic .LoadUint32 (& retryTimeout ) == 1 )
366- return err
367- })
335+ retry , err := c ._process (ctx , cmd , attempt )
368336 if err == nil || ! retry {
369337 return err
370338 }
339+
371340 lastErr = err
372341 }
373342 return lastErr
374343}
375344
345+ func (c * baseClient ) _process (ctx context.Context , cmd Cmder , attempt int ) (bool , error ) {
346+ if attempt > 0 {
347+ if err := internal .Sleep (ctx , c .retryBackoff (attempt )); err != nil {
348+ return false , err
349+ }
350+ }
351+
352+ retryTimeout := uint32 (1 )
353+ err := c .withConn (ctx , func (ctx context.Context , cn * pool.Conn ) error {
354+ err := cn .WithWriter (ctx , c .opt .WriteTimeout , func (wr * proto.Writer ) error {
355+ return writeCmd (wr , cmd )
356+ })
357+ if err != nil {
358+ return err
359+ }
360+
361+ err = cn .WithReader (ctx , c .cmdTimeout (cmd ), cmd .readReply )
362+ if err != nil {
363+ if cmd .readTimeout () == nil {
364+ atomic .StoreUint32 (& retryTimeout , 1 )
365+ }
366+ return err
367+ }
368+
369+ return nil
370+ })
371+ if err == nil {
372+ return false , nil
373+ }
374+
375+ retry := shouldRetry (err , atomic .LoadUint32 (& retryTimeout ) == 1 )
376+ return retry , err
377+ }
378+
376379func (c * baseClient ) retryBackoff (attempt int ) time.Duration {
377380 return internal .RetryBackoff (attempt , c .opt .MinRetryBackoff , c .opt .MaxRetryBackoff )
378381}
0 commit comments