Skip to content

Commit 9a51405

Browse files
authored
Add region field to input data
1 parent eff0902 commit 9a51405

File tree

5 files changed

+104
-57
lines changed

5 files changed

+104
-57
lines changed

docs/applications/resources/environments.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Transfer data at scale from data warehouses like S3 into the Cortex environment.
2626
data:
2727
type: csv # file type (required)
2828
path: s3a://<bucket_name>/<file_name> # S3 is currently supported (required)
29+
region: us-west-2 # S3 region (default: us-west-2)
2930
drop_null: <bool> # drop any rows that contain at least 1 null value (default: false)
3031
csv_config: <csv_config> # optional configuration that can be provided
3132
schema:
@@ -64,6 +65,7 @@ csv_config:
6465
data:
6566
type: parquet # file type (required)
6667
path: s3a://<bucket_name>/<file_name> # S3 is currently supported (required)
68+
region: us-west-2 # S3 region (default: us-west-2)
6769
drop_null: <bool> # drop any rows that contain at least 1 null value (default: false)
6870
schema:
6971
- parquet_column_name: <string> # name of the column in the parquet file (required)

pkg/lib/aws/s3.go

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,38 +21,46 @@ import (
2121
"path/filepath"
2222
"strings"
2323

24-
"github.com/cortexlabs/cortex/pkg/lib/files"
25-
"github.com/cortexlabs/cortex/pkg/lib/msgpack"
26-
2724
"github.com/aws/aws-sdk-go/aws"
25+
"github.com/aws/aws-sdk-go/aws/endpoints"
26+
"github.com/aws/aws-sdk-go/aws/session"
2827
"github.com/aws/aws-sdk-go/service/s3"
2928

3029
"github.com/cortexlabs/cortex/pkg/lib/errors"
30+
"github.com/cortexlabs/cortex/pkg/lib/files"
3131
"github.com/cortexlabs/cortex/pkg/lib/json"
32+
"github.com/cortexlabs/cortex/pkg/lib/msgpack"
3233
"github.com/cortexlabs/cortex/pkg/lib/parallel"
34+
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
3335
s "github.com/cortexlabs/cortex/pkg/lib/strings"
3436
)
3537

36-
func (c *Client) S3Path(key string) string {
37-
return "s3://" + filepath.Join(c.Bucket, key)
38-
}
38+
const DefaultS3Region string = endpoints.UsWest2RegionID
3939

40-
func (c *Client) IsS3File(key string) (bool, error) {
41-
return c.IsS3FileExternal(c.Bucket, key)
42-
}
40+
var S3Regions strset.Set
4341

44-
func (c *Client) IsS3Dir(dirPath string) (bool, error) {
45-
prefix := s.EnsureSuffix(dirPath, "/")
46-
return c.IsS3Prefix(prefix)
42+
func init() {
43+
resolver := endpoints.DefaultResolver()
44+
partitions := resolver.(endpoints.EnumPartitions).Partitions()
45+
46+
S3Regions = strset.New()
47+
48+
for _, p := range partitions {
49+
if p.ID() == endpoints.AwsPartitionID || p.ID() == endpoints.AwsCnPartitionID {
50+
for id := range p.Regions() {
51+
S3Regions.Add(id)
52+
}
53+
}
54+
}
4755
}
4856

49-
func (c *Client) IsS3Prefix(prefix string) (bool, error) {
50-
return c.IsS3PrefixExternal(c.Bucket, prefix)
57+
func (c *Client) S3Path(key string) string {
58+
return "s3://" + filepath.Join(c.Bucket, key)
5159
}
5260

53-
func (c *Client) IsS3FileExternal(bucket string, key string) (bool, error) {
61+
func (c *Client) IsS3File(key string) (bool, error) {
5462
_, err := c.s3Client.HeadObject(&s3.HeadObjectInput{
55-
Bucket: aws.String(bucket),
63+
Bucket: aws.String(c.Bucket),
5664
Key: aws.String(key),
5765
})
5866

@@ -66,9 +74,14 @@ func (c *Client) IsS3FileExternal(bucket string, key string) (bool, error) {
6674
return true, nil
6775
}
6876

69-
func (c *Client) IsS3PrefixExternal(bucket string, prefix string) (bool, error) {
77+
func (c *Client) IsS3Dir(dirPath string) (bool, error) {
78+
prefix := s.EnsureSuffix(dirPath, "/")
79+
return c.IsS3Prefix(prefix)
80+
}
81+
82+
func (c *Client) IsS3Prefix(prefix string) (bool, error) {
7083
out, err := c.s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
71-
Bucket: aws.String(bucket),
84+
Bucket: aws.String(c.Bucket),
7285
Prefix: aws.String(prefix),
7386
})
7487

@@ -80,14 +93,6 @@ func (c *Client) IsS3PrefixExternal(bucket string, prefix string) (bool, error)
8093
return hasPrefix, nil
8194
}
8295

83-
func (c *Client) IsS3aPrefixExternal(s3aPath string) (bool, error) {
84-
bucket, prefix, err := SplitS3aPath(s3aPath)
85-
if err != nil {
86-
return false, err
87-
}
88-
return c.IsS3PrefixExternal(bucket, prefix)
89-
}
90-
9196
func (c *Client) UploadBytesToS3(data []byte, key string) error {
9297
_, err := c.s3Client.PutObject(&s3.PutObjectInput{
9398
Body: bytes.NewReader(data),
@@ -253,3 +258,29 @@ func SplitS3aPath(s3aPath string) (string, string, error) {
253258

254259
return bucket, key, nil
255260
}
261+
262+
func IsS3PrefixExternal(bucket string, prefix string, region string) (bool, error) {
263+
sess := session.Must(session.NewSession(&aws.Config{
264+
Region: aws.String(region),
265+
}))
266+
267+
out, err := s3.New(sess).ListObjectsV2(&s3.ListObjectsV2Input{
268+
Bucket: aws.String(bucket),
269+
Prefix: aws.String(prefix),
270+
})
271+
272+
if err != nil {
273+
return false, errors.Wrap(err, prefix)
274+
}
275+
276+
hasPrefix := *out.KeyCount > 0
277+
return hasPrefix, nil
278+
}
279+
280+
func IsS3aPrefixExternal(s3aPath string, region string) (bool, error) {
281+
bucket, prefix, err := SplitS3aPath(s3aPath)
282+
if err != nil {
283+
return false, err
284+
}
285+
return IsS3PrefixExternal(bucket, prefix, region)
286+
}

pkg/operator/api/userconfig/environments.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package userconfig
1818

1919
import (
20+
"github.com/cortexlabs/cortex/pkg/lib/aws"
2021
"github.com/cortexlabs/cortex/pkg/lib/configreader"
2122
cr "github.com/cortexlabs/cortex/pkg/lib/configreader"
2223
"github.com/cortexlabs/cortex/pkg/lib/errors"
@@ -120,7 +121,7 @@ var logLevelValidation = &cr.StructValidation{
120121

121122
type Data interface {
122123
GetIngestedColumns() []string
123-
GetExternalPath() string
124+
GetExternalData() ExternalData
124125
Validate() error
125126
}
126127

@@ -130,24 +131,45 @@ var dataValidation = &cr.InterfaceStructValidation{
130131
ParsedInterfaceStructTypes: map[interface{}]*cr.InterfaceStructType{
131132
CSVEnvironmentDataType: {
132133
Type: (*CSVData)(nil),
133-
StructFieldValidations: csvDataFieldValidations,
134+
StructFieldValidations: append(csvDataFieldValidations, externalDataValidation...),
134135
},
135136
ParquetEnvironmentDataType: {
136137
Type: (*ParquetData)(nil),
137-
StructFieldValidations: parquetDataFieldValidations,
138+
StructFieldValidations: append(parquetDataFieldValidations, externalDataValidation...),
138139
},
139140
},
140141
Parser: func(str string) (interface{}, error) {
141142
return EnvironmentDataTypeFromString(str), nil
142143
},
143144
}
144145

146+
type ExternalData struct {
147+
Path string `json:"path" yaml:"path"`
148+
Region string `json:"region" yaml:"region"`
149+
}
150+
151+
var externalDataValidation = []*cr.StructFieldValidation{
152+
{
153+
StructField: "Path",
154+
StringValidation: cr.GetS3aPathValidation(&cr.S3aPathValidation{
155+
Required: true,
156+
}),
157+
},
158+
{
159+
StructField: "Region",
160+
StringValidation: &cr.StringValidation{
161+
Default: aws.DefaultS3Region,
162+
AllowedValues: aws.S3Regions.Slice(),
163+
},
164+
},
165+
}
166+
145167
type CSVData struct {
146168
Type EnvironmentDataType `json:"type" yaml:"type"`
147-
Path string `json:"path" yaml:"path"`
148169
Schema []string `json:"schema" yaml:"schema"`
149170
DropNull bool `json:"drop_null" yaml:"drop_null"`
150171
CSVConfig *CSVConfig `json:"csv_config" yaml:"csv_config"`
172+
ExternalData
151173
}
152174

153175
// CSVConfig is SPARK_VERSION dependent
@@ -172,12 +194,6 @@ type CSVConfig struct {
172194
}
173195

174196
var csvDataFieldValidations = []*cr.StructFieldValidation{
175-
{
176-
StructField: "Path",
177-
StringValidation: cr.GetS3aPathValidation(&cr.S3aPathValidation{
178-
Required: true,
179-
}),
180-
},
181197
{
182198
StructField: "Schema",
183199
StringListValidation: &cr.StringListValidation{
@@ -273,18 +289,12 @@ var csvDataFieldValidations = []*cr.StructFieldValidation{
273289

274290
type ParquetData struct {
275291
Type EnvironmentDataType `json:"type" yaml:"type"`
276-
Path string `json:"path" yaml:"path"`
277292
Schema []*ParquetColumn `json:"schema" yaml:"schema"`
278293
DropNull bool `json:"drop_null" yaml:"drop_null"`
294+
ExternalData
279295
}
280296

281297
var parquetDataFieldValidations = []*cr.StructFieldValidation{
282-
{
283-
StructField: "Path",
284-
StringValidation: cr.GetS3aPathValidation(&cr.S3aPathValidation{
285-
Required: true,
286-
}),
287-
},
288298
{
289299
StructField: "Schema",
290300
StructListValidation: &cr.StructListValidation{
@@ -381,12 +391,12 @@ func (parqData *ParquetData) Validate() error {
381391
return nil
382392
}
383393

384-
func (csvData *CSVData) GetExternalPath() string {
385-
return csvData.Path
394+
func (csvData *CSVData) GetExternalData() ExternalData {
395+
return csvData.ExternalData
386396
}
387397

388-
func (parqData *ParquetData) GetExternalPath() string {
389-
return parqData.Path
398+
func (parqData *ParquetData) GetExternalData() ExternalData {
399+
return parqData.ExternalData
390400
}
391401

392402
func (csvData *CSVData) GetIngestedColumns() []string {

pkg/operator/workloads/data_job.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/cortexlabs/cortex/pkg/consts"
2727
"github.com/cortexlabs/cortex/pkg/lib/argo"
28+
"github.com/cortexlabs/cortex/pkg/lib/aws"
2829
"github.com/cortexlabs/cortex/pkg/lib/errors"
2930
"github.com/cortexlabs/cortex/pkg/lib/pointer"
3031
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
@@ -183,10 +184,13 @@ func dataWorkloadSpecs(ctx *context.Context) ([]*WorkloadSpec, error) {
183184

184185
shouldIngest := !rawFileExists
185186
if shouldIngest {
186-
externalDataPath := ctx.Environment.Data.GetExternalPath()
187-
externalDataExists, err := config.AWS.IsS3aPrefixExternal(externalDataPath)
188-
if err != nil || !externalDataExists {
189-
return nil, errors.Wrap(ErrorUserDataUnavailable(externalDataPath), ctx.App.Name, userconfig.Identify(ctx.Environment), userconfig.DataKey, userconfig.PathKey)
187+
externalData := ctx.Environment.Data.GetExternalData()
188+
externalDataExists, err := aws.IsS3aPrefixExternal(externalData.Path, externalData.Region)
189+
if err != nil {
190+
return nil, errors.Wrap(err, externalData.Path, ctx.App.Name, userconfig.Identify(ctx.Environment), userconfig.DataKey, userconfig.PathKey)
191+
}
192+
if !externalDataExists {
193+
return nil, errors.Wrap(ErrorExternalDataUnavailable(externalData.Path), ctx.App.Name, userconfig.Identify(ctx.Environment), userconfig.DataKey, userconfig.PathKey)
190194
}
191195
for _, rawColumn := range ctx.RawColumns {
192196
allComputes = append(allComputes, rawColumn.GetCompute())

pkg/operator/workloads/errors.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type ErrorKind int
2424

2525
const (
2626
ErrUnknown ErrorKind = iota
27-
ErrUserDataUnavailable
27+
ErrExternalDataUnavailable
2828
ErrMoreThanOneWorkflow
2929
ErrContextAppMismatch
3030
ErrWorkflowAppMismatch
@@ -35,8 +35,8 @@ const (
3535

3636
var errorKinds = []string{
3737
"err_unknown",
38-
"err_user_data_unavailable",
39-
"error_more_than_one_workflow",
38+
"err_external_data_unavailable",
39+
"err_more_than_one_workflow",
4040
"err_context_app_mismatch",
4141
"err_workflow_app_mismatch",
4242
"err_cortex_installation_broken",
@@ -89,10 +89,10 @@ func (e Error) Error() string {
8989
return e.message
9090
}
9191

92-
func ErrorUserDataUnavailable(s3Path string) error {
92+
func ErrorExternalDataUnavailable(s3Path string) error {
9393
return Error{
94-
Kind: ErrUserDataUnavailable,
95-
message: fmt.Sprintf("the file at %s does not exist, or your cluster does not have access to it", s3Path),
94+
Kind: ErrExternalDataUnavailable,
95+
message: fmt.Sprintf("the data at %s does not exist", s3Path),
9696
}
9797
}
9898

0 commit comments

Comments
 (0)