@@ -55,11 +55,6 @@ type AsyncMessageHandlerConfig struct {
5555 TargetURL string
5656}
5757
58- type userPayload struct {
59- Body io.ReadCloser
60- ContentType string
61- }
62-
6358func NewAsyncMessageHandler (config AsyncMessageHandlerConfig , awsClient * awslib.Client , eventHandler RequestEventHandler , logger * zap.SugaredLogger ) * AsyncMessageHandler {
6459 return & AsyncMessageHandler {
6560 config : config ,
@@ -104,9 +99,21 @@ func (h *AsyncMessageHandler) handleMessage(requestID string) error {
10499 }
105100 return errors .Wrap (err , "failed to get payload" )
106101 }
107- defer h .deletePayload (requestID )
102+ defer func () {
103+ h .deletePayload (requestID )
104+ _ = payload .Close ()
105+ }()
108106
109- result , err := h .submitRequest (payload , requestID )
107+ headers , err := h .getHeaders (requestID )
108+ if err != nil {
109+ updateStatusErr := h .updateStatus (requestID , async .StatusFailed )
110+ if updateStatusErr != nil {
111+ h .log .Errorw ("failed to update status after failure to get headers" , "id" , requestID , "error" , updateStatusErr )
112+ }
113+ return errors .Wrap (err , "failed to get payload" )
114+ }
115+
116+ result , err := h .submitRequest (payload , headers , requestID )
110117 if err != nil {
111118 h .log .Errorw ("failed to submit request to user container" , "id" , requestID , "error" , err )
112119 updateStatusErr := h .updateStatus (requestID , async .StatusFailed )
@@ -138,7 +145,7 @@ func (h *AsyncMessageHandler) updateStatus(requestID string, status async.Status
138145 return h .aws .UploadStringToS3 ("" , h .config .Bucket , key )
139146}
140147
141- func (h * AsyncMessageHandler ) getPayload (requestID string ) (* userPayload , error ) {
148+ func (h * AsyncMessageHandler ) getPayload (requestID string ) (io. ReadCloser , error ) {
142149 key := async .PayloadPath (h .storagePath , requestID )
143150 output , err := h .aws .S3 ().GetObject (
144151 & s3.GetObjectInput {
@@ -149,16 +156,7 @@ func (h *AsyncMessageHandler) getPayload(requestID string) (*userPayload, error)
149156 if err != nil {
150157 return nil , errors .WithStack (err )
151158 }
152-
153- contentType := "application/octet-stream"
154- if output .ContentType != nil {
155- contentType = * output .ContentType
156- }
157-
158- return & userPayload {
159- Body : output .Body ,
160- ContentType : contentType ,
161- }, nil
159+ return output .Body , nil
162160}
163161
164162func (h * AsyncMessageHandler ) deletePayload (requestID string ) {
@@ -170,13 +168,13 @@ func (h *AsyncMessageHandler) deletePayload(requestID string) {
170168 }
171169}
172170
173- func (h * AsyncMessageHandler ) submitRequest (payload * userPayload , requestID string ) (interface {}, error ) {
174- req , err := http .NewRequest (http .MethodPost , h .config .TargetURL , payload . Body )
171+ func (h * AsyncMessageHandler ) submitRequest (payload io. Reader , headers http. Header , requestID string ) (interface {}, error ) {
172+ req , err := http .NewRequest (http .MethodPost , h .config .TargetURL , payload )
175173 if err != nil {
176174 return nil , errors .WithStack (err )
177175 }
178176
179- req .Header . Set ( "Content-Type" , payload . ContentType )
177+ req .Header = headers
180178 req .Header .Set (CortexRequestIDHeader , requestID )
181179
182180 startTime := time .Now ()
@@ -216,3 +214,14 @@ func (h *AsyncMessageHandler) uploadResult(requestID string, result interface{})
216214 key := async .ResultPath (h .storagePath , requestID )
217215 return h .aws .UploadJSONToS3 (result , h .config .Bucket , key )
218216}
217+
218+ func (h * AsyncMessageHandler ) getHeaders (requestID string ) (http.Header , error ) {
219+ key := async .HeadersPath (h .storagePath , requestID )
220+
221+ var headers http.Header
222+ if err := h .aws .ReadJSONFromS3 (& headers , h .config .Bucket , key ); err != nil {
223+ return nil , err
224+ }
225+
226+ return headers , nil
227+ }
0 commit comments