@@ -462,8 +462,7 @@ var _ = Describe("ClusterClient", func() {
462462 Describe ("pipelining" , func () {
463463 var pipe * redis.Pipeline
464464
465- assertPipeline := func () {
466- keys := []string {"A" , "B" , "C" , "D" , "E" , "F" , "G" }
465+ assertPipeline := func (keys []string ) {
467466
468467 It ("follows redirects" , func () {
469468 if ! failover {
@@ -482,13 +481,12 @@ var _ = Describe("ClusterClient", func() {
482481 Expect (err ).NotTo (HaveOccurred ())
483482 Expect (cmds ).To (HaveLen (14 ))
484483
485- _ = client .ForEachShard (ctx , func (ctx context.Context , node * redis.Client ) error {
486- defer GinkgoRecover ()
487- Eventually (func () int64 {
488- return node .DBSize (ctx ).Val ()
489- }, 30 * time .Second ).ShouldNot (BeZero ())
490- return nil
491- })
484+ // Check that all keys are set.
485+ for _ , key := range keys {
486+ Eventually (func () string {
487+ return client .Get (ctx , key ).Val ()
488+ }, 30 * time .Second ).Should (Equal (key + "_value" ))
489+ }
492490
493491 if ! failover {
494492 for _ , key := range keys {
@@ -517,14 +515,14 @@ var _ = Describe("ClusterClient", func() {
517515 })
518516
519517 It ("works with missing keys" , func () {
520- pipe .Set (ctx , "A" , "A_value" , 0 )
521- pipe .Set (ctx , "C" , "C_value" , 0 )
518+ pipe .Set (ctx , "A{s} " , "A_value" , 0 )
519+ pipe .Set (ctx , "C{s} " , "C_value" , 0 )
522520 _ , err := pipe .Exec (ctx )
523521 Expect (err ).NotTo (HaveOccurred ())
524522
525- a := pipe .Get (ctx , "A" )
526- b := pipe .Get (ctx , "B" )
527- c := pipe .Get (ctx , "C" )
523+ a := pipe .Get (ctx , "A{s} " )
524+ b := pipe .Get (ctx , "B{s} " )
525+ c := pipe .Get (ctx , "C{s} " )
528526 cmds , err := pipe .Exec (ctx )
529527 Expect (err ).To (Equal (redis .Nil ))
530528 Expect (cmds ).To (HaveLen (3 ))
@@ -547,7 +545,8 @@ var _ = Describe("ClusterClient", func() {
547545
548546 AfterEach (func () {})
549547
550- assertPipeline ()
548+ keys := []string {"A" , "B" , "C" , "D" , "E" , "F" , "G" }
549+ assertPipeline (keys )
551550
552551 It ("doesn't fail node with context.Canceled error" , func () {
553552 ctx , cancel := context .WithCancel (context .Background ())
@@ -590,7 +589,25 @@ var _ = Describe("ClusterClient", func() {
590589
591590 AfterEach (func () {})
592591
593- assertPipeline ()
592+ // TxPipeline doesn't support cross slot commands.
593+ // Use hashtag to force all keys to the same slot.
594+ keys := []string {"A{s}" , "B{s}" , "C{s}" , "D{s}" , "E{s}" , "F{s}" , "G{s}" }
595+ assertPipeline (keys )
596+
597+ // make sure CrossSlot error is returned
598+ It ("returns CrossSlot error" , func () {
599+ pipe .Set (ctx , "A{s}" , "A_value" , 0 )
600+ pipe .Set (ctx , "B{t}" , "B_value" , 0 )
601+ Expect (hashtag .Slot ("A{s}" )).NotTo (Equal (hashtag .Slot ("B{t}" )))
602+ _ , err := pipe .Exec (ctx )
603+ Expect (err ).To (MatchError (redis .ErrCrossSlot ))
604+ })
605+
606+ // doesn't fail when no commands are queued
607+ It ("returns no error when there are no commands" , func () {
608+ _ , err := pipe .Exec (ctx )
609+ Expect (err ).NotTo (HaveOccurred ())
610+ })
594611 })
595612 })
596613
0 commit comments