@@ -3,23 +3,26 @@ package main
33import (
44 "context"
55 "encoding/json"
6+ "errors"
7+ "fmt"
68 "log"
79 "os"
10+ "os/signal"
11+ "syscall"
812 "time"
913
1014 "github.com/aws/aws-sdk-go-v2/aws"
1115 "github.com/aws/aws-sdk-go-v2/config"
1216 "github.com/aws/aws-sdk-go-v2/service/dynamodb"
1317 "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
1418 "github.com/aws/aws-sdk-go-v2/service/sqs"
15- uuid "github.com/satori/go.uuid"
1619)
1720
1821var cfg aws.Config
19- var ctx = context .Background ()
2022
2123const (
2224 visibilityTimeout = 60 * 10
25+ waitingTimeout = 20
2326)
2427
2528type MsgType struct {
@@ -35,32 +38,34 @@ func init() {
3538 log .Printf ("AWS_PROFILE: %s" , awsProfile )
3639
3740 if awsProfile != "" {
38- log .Println ("Use AWS profile" )
39- cfg , err = config .LoadDefaultConfig (ctx ,
41+ log .Printf ("Use AWS profile %s" , awsProfile )
42+ cfg , err = config .LoadDefaultConfig (context . Background () ,
4043 config .WithSharedConfigProfile (awsProfile ),
4144 )
4245 if err != nil {
43- log .Fatalf ("Error loading profile %v" , err )
46+ log .Fatalf ("error loading config %v" , err )
4447 }
4548
4649 } else {
4750 log .Println ("Use container role" )
48- cfg , err = config .LoadDefaultConfig (ctx )
51+ cfg , err = config .LoadDefaultConfig (context . Background () )
4952 if err != nil {
50- log .Fatalf ("Error loading profile %v" , err )
53+ log .Fatalf ("error loading config %v" , err )
5154 }
5255 }
5356}
5457
5558func main () {
56- log .Println ("Service started" )
59+ log .Println ("Service is started" )
60+ ctx , cancel := context .WithCancel (context .Background ())
5761
58- queueUrl := os .Getenv ("SQS_URL" )
62+ signalChan := make (chan os.Signal , 1 )
63+ signal .Notify (signalChan , os .Interrupt , syscall .SIGTERM )
5964
65+ queueUrl := os .Getenv ("SQS_URL" )
6066 log .Printf ("QUEUE_URL: %s" , queueUrl )
6167
6268 tableName := os .Getenv ("DDB_TABLE" )
63-
6469 log .Printf ("DDB_TABLE: %s" , tableName )
6570
6671 // Create S3 service client
@@ -69,27 +74,47 @@ func main() {
6974 // Create DDB service client
7075 ddbSvc := dynamodb .NewFromConfig (cfg )
7176
77+ defer func () {
78+ signal .Stop (signalChan )
79+ cancel ()
80+ }()
81+
82+ loop:
7283 for {
73- log .Printf ("waiting for new message..." )
74- _ , err := processSQS (sqsSvc , queueUrl , ddbSvc , tableName )
75- if err != nil {
76- log .Fatalf ("Error processing message: %#v" , err )
84+ select {
85+ case <- signalChan : //if get SIGTERM
86+ log .Println ("Got SIGTERM signal, cancelling the context" )
87+ cancel () //cancel context
88+
89+ default :
90+ _ , err := processSQS (ctx , sqsSvc , queueUrl , ddbSvc , tableName )
91+
92+ if err != nil {
93+ if errors .Is (err , context .Canceled ) {
94+ log .Printf ("stop processing, context is cancelled %v" , err )
95+ break loop
96+ }
97+
98+ log .Fatalf ("error processing SQS %v" , err )
99+ }
77100 }
78-
79101 }
102+ log .Println ("service is safely stopped" )
103+
80104}
81105
82- func processSQS (sqsSvc * sqs.Client , queueUrl string , ddbSvc * dynamodb.Client , tableName string ) (bool , error ) {
106+ func processSQS (ctx context. Context , sqsSvc * sqs.Client , queueUrl string , ddbSvc * dynamodb.Client , tableName string ) (bool , error ) {
83107 input := & sqs.ReceiveMessageInput {
84108 QueueUrl : & queueUrl ,
85109 MaxNumberOfMessages : 1 ,
86110 VisibilityTimeout : visibilityTimeout ,
87- WaitTimeSeconds : 20 , //Long polling 20 sec
111+ WaitTimeSeconds : waitingTimeout , // use long polling
88112 }
89113
90114 resp , err := sqsSvc .ReceiveMessage (ctx , input )
115+
91116 if err != nil {
92- log . Fatalf ( "Error receiving message %v " , err )
117+ return false , fmt . Errorf ( "error receiving message %w " , err )
93118 }
94119
95120 log .Printf ("received messages: %v" , len (resp .Messages ))
@@ -99,28 +124,30 @@ func processSQS(sqsSvc *sqs.Client, queueUrl string, ddbSvc *dynamodb.Client, ta
99124
100125 for _ , msg := range resp .Messages {
101126 var newMsg MsgType
127+ id := * msg .MessageId
102128
103129 err := json .Unmarshal ([]byte (* msg .Body ), & newMsg )
104130 if err != nil {
105- return false , err
131+ return false , fmt . Errorf ( "error unmarshalling %w" , err )
106132 }
107133
108- log .Printf ("message received: %#v" , newMsg .Message )
134+ log .Printf ("message id %s is received from SQS : %#v" , id , newMsg .Message )
109135
110- err = putToDDB (ddbSvc , tableName , newMsg .Message )
136+ err = putToDDB (ctx , ddbSvc , tableName , id , newMsg .Message )
111137 if err != nil {
112- return false , err
138+ return false , fmt . Errorf ( "error putting message to DDB %w" , err )
113139 }
114- log .Printf ("message is saved in DDB" )
140+ log .Printf ("message id %s is saved in DDB" , id )
115141
116142 _ , err = sqsSvc .DeleteMessage (ctx , & sqs.DeleteMessageInput {
117143 QueueUrl : & queueUrl ,
118144 ReceiptHandle : msg .ReceiptHandle ,
119145 })
146+
120147 if err != nil {
121- return false , err
148+ return false , fmt . Errorf ( "error deleting message from SQS %w" , err )
122149 }
123- log .Printf ("message is deleted from queue" )
150+ log .Printf ("message id %s is deleted from queue" , id )
124151
125152 }
126153 return true , nil
@@ -130,18 +157,11 @@ func processSQS(sqsSvc *sqs.Client, queueUrl string, ddbSvc *dynamodb.Client, ta
130157func GetUTCTimestampNow () string {
131158 t := time .Now ().UTC ()
132159 return t .Format ("2006-01-02T15:04:05.000Z" )
133-
134- }
135-
136- func GetUUID () string {
137- id := uuid .NewV4 ()
138- return id .String ()
139160}
140161
141- func putToDDB (ddbSvc * dynamodb.Client , tableName string , message string ) error {
162+ func putToDDB (ctx context. Context , ddbSvc * dynamodb.Client , tableName string , msgId string , message string ) error {
142163 inputMap := make (map [string ]types.AttributeValue )
143164
144- msgId := GetUUID ()
145165 utcTimeISO := GetUTCTimestampNow ()
146166
147167 inputMap ["id" ] = & types.AttributeValueMemberS {Value : msgId }
0 commit comments