Skip to content

Commit 67789e3

Browse files
authored
Fix the race condition in ExecuteAsync && correctly implement fast-path in Execute() (#13)
* Fix the race condition in ExecAsync: - Never do more than one HTTP fetch in ExecAsync - If it's a fast-path, multipart response, use the data that's already in the response, instead of fetching it again from the cloud. NOTE: - Exec is now correct, though not as performant as it could be, - b/c it currently always loops until relationalai.com reports the Transcation is COMPLETED - missing the fast-path optimization * Add fast-path optimization to Execute(). We added a GotCompleteResult boolean to indicate whether or not a TransactionAsyncResult is a fast-path response or not. We think all the SDKs should follow this model. Co-Authored-By: pete.vilter@gmail.com * Fix type error in our ExecuteAsync code * Add Execute() example file
1 parent e5f70e8 commit 67789e3

File tree

3 files changed

+114
-49
lines changed

3 files changed

+114
-49
lines changed

examples/execute/main.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2022 RelationalAI, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"io/ioutil"
19+
"log"
20+
"os"
21+
22+
"github.com/jessevdk/go-flags"
23+
"github.com/relationalai/rai-sdk-go/rai"
24+
)
25+
26+
type Options struct {
27+
Database string `short:"d" long:"database" required:"true" description:"database name"`
28+
Engine string `short:"e" long:"engine" required:"true" descriptio:"engine name"`
29+
Code string `short:"c" long:"code" description:"rel source code"`
30+
File string `short:"f" long:"file" description:"rel source file"`
31+
Readonly bool `long:"readonly" description:"readonly query (default: false)"`
32+
Profile string `long:"profile" default:"default" description:"config profile"`
33+
}
34+
35+
func getCode(opts *Options) (string, error) {
36+
if opts.Code != "" {
37+
return opts.Code, nil
38+
}
39+
bytes, err := ioutil.ReadFile(opts.File)
40+
if err != nil {
41+
return "", nil
42+
}
43+
return string(bytes), nil
44+
}
45+
46+
func run(opts *Options) error {
47+
client, err := rai.NewClientFromConfig(opts.Profile)
48+
if err != nil {
49+
return err
50+
}
51+
source, err := getCode(opts)
52+
if err != nil {
53+
return err
54+
}
55+
rsp, err := client.Execute(opts.Database, opts.Engine, source, nil, opts.Readonly)
56+
if err != nil {
57+
return err
58+
}
59+
rai.Print(rsp, 4)
60+
return nil
61+
}
62+
63+
func main() {
64+
var opts Options
65+
if _, err := flags.ParseArgs(&opts, os.Args); err != nil {
66+
os.Exit(1)
67+
}
68+
if err := run(&opts); err != nil {
69+
log.Fatal(err)
70+
}
71+
}

rai/client.go

Lines changed: 37 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -271,26 +271,41 @@ func unmarshal(rsp *http.Response, result interface{}) error {
271271
return nil
272272
}
273273

274+
dstValues := reflect.ValueOf(result).Elem()
275+
274276
switch data.(type) {
275277

276-
case []byte:
277-
err := json.Unmarshal(data.([]byte), &result)
278-
if err != nil {
278+
case []byte: // Got back a JSON response
279+
280+
if dstValues.Type() == reflect.TypeOf(TransactionAsyncResult{}) {
281+
// But the user asked for the whole TransactionResult struct,
282+
// so we need to parse the JSON TransactionResponse, and fill it in
283+
// the TransactionResult.
284+
var txnResult TransactionAsyncResult
285+
err := json.Unmarshal(data.([]byte), &txnResult.Transaction)
286+
if err != nil {
287+
return err
288+
}
289+
txnResult.GotCompleteResult = false
290+
291+
// Set it into result
292+
srcValues := reflect.ValueOf(txnResult)
293+
294+
dstValues.Set(srcValues)
279295
return err
280296
}
281297

282-
case []TransactionAsyncFile:
283-
dstValues := reflect.ValueOf(result).Elem()
284-
if dstValues.Type() == reflect.TypeOf(TransactionAsyncResponse{}) {
285-
rsp, err := readTransactionAsyncFiles(data.([]TransactionAsyncFile))
286-
srcValues := reflect.ValueOf(rsp.Transaction)
287-
dstValues.Set(srcValues)
298+
// If they asked for just a regular JSON object, unmarshal it.
299+
err := json.Unmarshal(data.([]byte), &result)
300+
if err != nil {
288301
return err
289302
}
290303

304+
case []TransactionAsyncFile: // Multipart response
291305
if dstValues.Type() == reflect.TypeOf(TransactionAsyncResult{}) {
292306
rsp, err := readTransactionAsyncFiles(data.([]TransactionAsyncFile))
293-
srcValues := reflect.ValueOf(rsp)
307+
rsp.GotCompleteResult = true
308+
srcValues := reflect.ValueOf(*rsp)
294309
dstValues.Set(srcValues)
295310
return err
296311
}
@@ -489,7 +504,7 @@ func readTransactionAsyncFiles(files []TransactionAsyncFile) (*TransactionAsyncR
489504
return nil, err
490505
}
491506

492-
return &TransactionAsyncResult{txn, results, metadata, problems}, nil
507+
return &TransactionAsyncResult{true, txn, results, metadata, problems}, nil
493508
}
494509

495510
type HTTPError struct {
@@ -1194,40 +1209,10 @@ func (c *Client) ExecuteAsync(
11941209
Source: source,
11951210
Readonly: readonly,
11961211
}
1197-
var txn TransactionAsyncResponse
1212+
txnResult := &TransactionAsyncResult{}
11981213
data := tx.payload(inputs)
1199-
err := c.Post(PathTransactions, tx.QueryArgs(), data, &txn)
1200-
if txn.State == "CREATED" {
1201-
return &TransactionAsyncResult{
1202-
Transaction: txn,
1203-
Results: make([]ArrowRelation, 0),
1204-
Metadata: make([]TransactionAsyncMetadataResponse, 0),
1205-
Problems: make([]interface{}, 0),
1206-
}, err
1207-
}
1208-
1209-
results, err := c.GetTransactionResults(txn.ID)
1210-
if err != nil {
1211-
return nil, err
1212-
}
1213-
1214-
metadata, err := c.GetTransactionMetadata(txn.ID)
1215-
if err != nil {
1216-
return nil, err
1217-
}
1218-
1219-
problems, err := c.GetTransactionProblems(txn.ID)
1220-
if err != nil {
1221-
return nil, err
1222-
}
1223-
1224-
return &TransactionAsyncResult{
1225-
Transaction: txn,
1226-
Results: results,
1227-
Metadata: metadata,
1228-
Problems: problems,
1229-
}, nil
1230-
1214+
err := c.Post(PathTransactions, tx.QueryArgs(), data, txnResult)
1215+
return txnResult, err
12311216
}
12321217

12331218
func (c *Client) Execute(
@@ -1240,6 +1225,13 @@ func (c *Client) Execute(
12401225
return nil, err
12411226
}
12421227

1228+
// Fast-path optimization
1229+
if rsp.GotCompleteResult {
1230+
return rsp, err
1231+
}
1232+
1233+
// Slow-path
1234+
12431235
id := rsp.Transaction.ID
12441236
count := 0
12451237
for {
@@ -1264,7 +1256,7 @@ func (c *Client) Execute(
12641256
metadata, _ := c.GetTransactionMetadata(id)
12651257
problems, _ := c.GetTransactionProblems(id)
12661258

1267-
return &TransactionAsyncResult{txn.Transaction, results, metadata, problems}, nil
1259+
return &TransactionAsyncResult{true, txn.Transaction, results, metadata, problems}, nil
12681260
}
12691261

12701262
func (c *Client) GetTransactions() (*TransactionAsyncMultipleResponses, error) {

rai/models.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,10 @@ type Source struct {
301301
}
302302

303303
type TransactionAsyncResult struct {
304-
Transaction TransactionAsyncResponse
305-
Results []ArrowRelation
306-
Metadata []TransactionAsyncMetadataResponse
307-
Problems []interface{}
304+
// If !GotCompleteResult, keep polling until Transaction reaches terminal State.
305+
GotCompleteResult bool
306+
Transaction TransactionAsyncResponse
307+
Results []ArrowRelation
308+
Metadata []TransactionAsyncMetadataResponse
309+
Problems []interface{}
308310
}

0 commit comments

Comments
 (0)