@@ -31,7 +31,6 @@ import (
3131 "github.com/aws/aws-sdk-go/service/sqs"
3232 awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
3333 "github.com/cortexlabs/cortex/pkg/lib/random"
34- "github.com/cortexlabs/cortex/pkg/lib/sets/strset"
3534 "github.com/ory/dockertest/v3"
3635 dc "github.com/ory/dockertest/v3/docker"
3736 "github.com/stretchr/testify/require"
@@ -390,75 +389,73 @@ func TestSQSDequeuer_Start_HandlerError(t *testing.T) {
390389 }, 5 * time .Second , time .Second )
391390}
392391
393- func TestSQSDequeuer_MultipleWorkers (t * testing.T ) {
394- // this test seems to be non-deterministically timing out
395- // it seems to be an issue with the test, not the deqeueur
396- return
397-
398- t .Parallel ()
399-
400- awsClient := testAWSClient (t )
401- queueURL := createQueue (t , awsClient )
402-
403- numMessages := 3
404- expectedMsgs := make ([]string , numMessages )
405- for i := 0 ; i < numMessages ; i ++ {
406- message := fmt .Sprintf ("%d" , i )
407- expectedMsgs [i ] = message
408- _ , err := awsClient .SQS ().SendMessage (& sqs.SendMessageInput {
409- MessageBody : aws .String (message ),
410- MessageDeduplicationId : aws .String (message ),
411- MessageGroupId : aws .String (message ),
412- QueueUrl : aws .String (queueURL ),
413- })
414- require .NoError (t , err )
415- }
416-
417- logger := newLogger (t )
418- defer func () { _ = logger .Sync () }()
419-
420- dq , err := NewSQSDequeuer (
421- SQSDequeuerConfig {
422- Region : _localStackDefaultRegion ,
423- QueueURL : queueURL ,
424- StopIfNoMessages : true ,
425- Workers : numMessages ,
426- }, awsClient , logger ,
427- )
428- require .NoError (t , err )
429-
430- dq .waitTimeSeconds = aws .Int64 (0 )
431- dq .notFoundSleepTime = 0
432-
433- msgCh := make (chan string , numMessages )
434- handler := NewMessageHandlerFunc (
435- func (message * sqs.Message ) error {
436- msgCh <- * message .Body
437- return nil
438- },
439- )
440-
441- errCh := make (chan error )
442- go func () {
443- errCh <- dq .Start (handler , func () bool { return true })
444- }()
445-
446- receivedMessages := make ([]string , numMessages )
447- for i := 0 ; i < numMessages ; i ++ {
448- receivedMessages [i ] = <- msgCh
449- }
450- dq .Shutdown ()
451-
452- // timeout test after 30 seconds
453- time .AfterFunc (30 * time .Second , func () {
454- close (msgCh )
455- errCh <- errors .New ("test timed out" )
456- })
457-
458- require .Len (t , receivedMessages , numMessages )
459-
460- set := strset .FromSlice (receivedMessages )
461- require .True (t , set .Has (expectedMsgs ... ))
462-
463- require .NoError (t , <- errCh )
464- }
392+ // this test seems to be non-deterministically timing out
393+ // it seems to be an issue with the test, not the deqeueur
394+ // func TestSQSDequeuer_MultipleWorkers(t *testing.T) {
395+ // t.Parallel()
396+
397+ // awsClient := testAWSClient(t)
398+ // queueURL := createQueue(t, awsClient)
399+
400+ // numMessages := 3
401+ // expectedMsgs := make([]string, numMessages)
402+ // for i := 0; i < numMessages; i++ {
403+ // message := fmt.Sprintf("%d", i)
404+ // expectedMsgs[i] = message
405+ // _, err := awsClient.SQS().SendMessage(&sqs.SendMessageInput{
406+ // MessageBody: aws.String(message),
407+ // MessageDeduplicationId: aws.String(message),
408+ // MessageGroupId: aws.String(message),
409+ // QueueUrl: aws.String(queueURL),
410+ // })
411+ // require.NoError(t, err)
412+ // }
413+
414+ // logger := newLogger(t)
415+ // defer func() { _ = logger.Sync() }()
416+
417+ // dq, err := NewSQSDequeuer(
418+ // SQSDequeuerConfig{
419+ // Region: _localStackDefaultRegion,
420+ // QueueURL: queueURL,
421+ // StopIfNoMessages: true,
422+ // Workers: numMessages,
423+ // }, awsClient, logger,
424+ // )
425+ // require.NoError(t, err)
426+
427+ // dq.waitTimeSeconds = aws.Int64(0)
428+ // dq.notFoundSleepTime = 0
429+
430+ // msgCh := make(chan string, numMessages)
431+ // handler := NewMessageHandlerFunc(
432+ // func(message *sqs.Message) error {
433+ // msgCh <- *message.Body
434+ // return nil
435+ // },
436+ // )
437+
438+ // errCh := make(chan error)
439+ // go func() {
440+ // errCh <- dq.Start(handler, func() bool { return true })
441+ // }()
442+
443+ // receivedMessages := make([]string, numMessages)
444+ // for i := 0; i < numMessages; i++ {
445+ // receivedMessages[i] = <-msgCh
446+ // }
447+ // dq.Shutdown()
448+
449+ // // timeout test after 30 seconds
450+ // time.AfterFunc(30*time.Second, func() {
451+ // close(msgCh)
452+ // errCh <- errors.New("test timed out")
453+ // })
454+
455+ // require.Len(t, receivedMessages, numMessages)
456+
457+ // set := strset.FromSlice(receivedMessages)
458+ // require.True(t, set.Has(expectedMsgs...))
459+
460+ // require.NoError(t, <-errCh)
461+ // }
0 commit comments