Skip to content

Commit ea5e1bd

Browse files
committed
Merge branch 'develop' into feature/GPU_support_extended
2 parents 4528bc6 + 9e37e57 commit ea5e1bd

File tree

4 files changed

+2688
-9
lines changed

4 files changed

+2688
-9
lines changed

FunctionWorker/python/PublicationUtils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def _shutdown_local_queue_client(self):
103103

104104
def _get_backup_data_layer_client(self):
105105
if self._backup_data_layer_client is None:
106-
self._backup_data_layer_client = DataLayerClient(locality=0, for_mfn=True, sid=self._sandboxid, connect=self._datalayer)
106+
self._backup_data_layer_client = DataLayerClient(locality=-1, for_mfn=True, sid=self._sandboxid, connect=self._datalayer)
107107
return self._backup_data_layer_client
108108

109109
def _shutdown_backup_data_layer_client(self):

Sandbox/frontend/frontend.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -246,13 +246,36 @@ func handler(w http.ResponseWriter, r *http.Request) {
246246
http.Error(w, "Can't fetch result", http.StatusInternalServerError)
247247
} else if res == nil {
248248
if async {
249-
log.Printf("handler: Result not yet available of execution ID %s, redirecting", id)
249+
/*
250+
the client asked for the result in async mode.
251+
if the result is not available yet, then just return the execution id again
252+
and let the client handle the retry.
253+
*/
254+
log.Printf("handler: Result not yet available of execution ID %s.", id)
255+
//log.Printf("handler: Result not yet available of execution ID %s, redirecting", id)
250256
// TODO: 300 location redirect
251-
http.Redirect(w, r, r.URL.Path + "?executionId=" + id, http.StatusTemporaryRedirect)
252-
// w.Header().Set("Content-Type", "application/json")
257+
//http.Redirect(w, r, r.URL.Path + "?executionId=" + id, http.StatusTemporaryRedirect)
258+
//w.Header().Set("Content-Type", "application/json")
253259
// w.Header().Set("Retry-After", "5")
254260
// http.Redirect(w, r, r.URL.Path + "?executionId=" + id, http.StatusFound)
255-
// w.Write([]byte(id))
261+
262+
type ResultMessage struct {
263+
ExecutionId string `json:"executionId"`
264+
Result []byte `json:"result"`
265+
}
266+
var res ResultMessage
267+
res.ExecutionId = id
268+
res.Result = nil
269+
var msgb []byte
270+
msgb, err := json.Marshal(res)
271+
if err != nil {
272+
http.Error(w, "Couldn't marshal result to JSON", http.StatusInternalServerError)
273+
log.Println("handler: Couldn't marshal result to JSON: ", err)
274+
} else {
275+
w.Header().Set("Content-Type", "application/json")
276+
w.Write(msgb)
277+
}
278+
//w.Write([]byte(id))
256279
} else {
257280
log.Printf("handler: Result not yet available of execution ID %s, waiting", id)
258281
ExecutionCond.L.Lock()
@@ -305,12 +328,14 @@ func handler(w http.ResponseWriter, r *http.Request) {
305328
topic string
306329
msg MfnMessage
307330
id string
331+
is_special_message bool
308332
)
309333
actionhdr := r.Header.Get("x-mfn-action")
310334
actiondata := r.Header.Get("x-mfn-action-data")
311335
if (actionhdr != "" && actiondata != "") {
312336
// handle different cases here
313337
log.Printf("Got a special message: [%s] [%s]", actionhdr, actiondata)
338+
is_special_message = true
314339
if (actionhdr == "session-update") {
315340
log.Printf("New session update message...")
316341

@@ -410,6 +435,7 @@ func handler(w http.ResponseWriter, r *http.Request) {
410435
log.Printf("Trigger-Event message topic: [%s], id: [%s]", topic, id)
411436
}
412437
} else {
438+
is_special_message = false
413439
// the headers are not special, so this must be a regular message to trigger a workflow
414440
// CREATE NEW MfnMessage
415441
id, err = GenerateExecutionID()
@@ -440,9 +466,18 @@ func handler(w http.ResponseWriter, r *http.Request) {
440466
if err != nil {
441467
http.Error(w, "Error submitting event to system", http.StatusInternalServerError)
442468
} else {
443-
// w.Header().Set("Content-Type", "application/json")
444-
// w.WriteHeader(http.StatusAccepted)
445-
w.Write([]byte(id))
469+
// w.Header().Set("Content-Type", "application/json")
470+
// w.WriteHeader(http.StatusAccepted)
471+
// Create entry and lock to wait on
472+
if !is_special_message {
473+
m := sync.Mutex{}
474+
c := sync.NewCond(&m)
475+
e := Execution{c,nil,0}
476+
ExecutionCond.L.Lock()
477+
ExecutionResults[id] = &e
478+
ExecutionCond.L.Unlock()
479+
}
480+
w.Write([]byte(id))
446481
}
447482
} else {
448483
// Create entry and lock to wait on

0 commit comments

Comments
 (0)