Skip to content

Commit 1d5c88c

Browse files
authored
Fix cortex deploy panic when object and prefix present on bucket (#1830)
1 parent 83e6b39 commit 1d5c88c

File tree

5 files changed

+56
-32
lines changed

5 files changed

+56
-32
lines changed

pkg/cortex/serve/cortex_internal/lib/storage/gcs.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,30 @@ def is_valid_gcs_path(path: str) -> bool:
5757
def _does_blob_exist(self, prefix: str) -> bool:
5858
return isinstance(self.gcs.get_blob(blob_name=prefix), storage.blob.Blob)
5959

60+
def _gcs_matching_blobs_generator(self, max_results=None, prefix="", include_dir_objects=False):
61+
for blob in self.gcs.list_blobs(max_results, prefix=prefix):
62+
if include_dir_objects or not blob.name.endswith("/"):
63+
yield blob
64+
6065
def _is_gcs_dir(self, dir_path: str) -> bool:
6166
prefix = util.ensure_suffix(dir_path, "/")
62-
return len(list(self.gcs.list_blobs(max_results=2, prefix=prefix))) > 1
67+
matching_blobs = list(
68+
self._gcs_matching_blobs_generator(
69+
max_results=2, prefix=prefix, include_dir_objects=True
70+
)
71+
)
72+
73+
return len(matching_blobs) > 0
6374

64-
def search(self, prefix: str = "") -> Tuple[List[str], List[datetime.datetime]]:
75+
def search(
76+
self, prefix: str = "", include_dir_objects=False
77+
) -> Tuple[List[str], List[datetime.datetime]]:
6578
paths = []
6679
timestamps = []
6780

68-
for blob in self.gcs.list_blobs(prefix=prefix):
81+
for blob in self._gcs_matching_blobs_generator(
82+
prefix=prefix, include_dir_objects=include_dir_objects
83+
):
6984
paths.append(blob.name)
7085
timestamps.append(blob.updated)
7186

@@ -112,12 +127,14 @@ def download_dir(self, prefix: str, local_dir: str):
112127
def download_dir_contents(self, prefix: str, local_dir: str):
113128
util.mkdir_p(local_dir)
114129
prefix = util.ensure_suffix(prefix, "/")
115-
for blob in self.gcs.list_blobs(prefix=prefix):
116-
if blob.name.endswith("/"):
117-
continue
130+
for blob in self._gcs_matching_blobs_generator(prefix=prefix, include_dir_objects=True):
118131
relative_path = util.trim_prefix(blob.name, prefix)
119132
local_dest_path = os.path.join(local_dir, relative_path)
120-
self.download_file(blob.name, local_dest_path)
133+
134+
if not local_dest_path.endswith("/"):
135+
self.download_file(blob.name, local_dest_path)
136+
else:
137+
util.mkdir_p(os.path.dirname(local_dest_path))
121138

122139
def download(self, prefix: str, local_dir: str):
123140
"""

pkg/cortex/serve/cortex_internal/lib/storage/s3.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def _is_s3_dir(self, dir_path):
8585
prefix = util.ensure_suffix(dir_path, "/")
8686
return self._is_s3_prefix(prefix)
8787

88-
def _get_matching_s3_objects_generator(self, prefix="", suffix=""):
88+
def _get_matching_s3_objects_generator(self, prefix="", suffix="", include_dir_objects=False):
8989
kwargs = {"Bucket": self.bucket, "Prefix": prefix}
9090

9191
while True:
@@ -97,16 +97,20 @@ def _get_matching_s3_objects_generator(self, prefix="", suffix=""):
9797

9898
for obj in contents:
9999
key = obj["Key"]
100-
if key.startswith(prefix) and key.endswith(suffix):
100+
if (
101+
key.startswith(prefix)
102+
and key.endswith(suffix)
103+
and (include_dir_objects or not key.endswith("/"))
104+
):
101105
yield obj
102106

103107
try:
104108
kwargs["ContinuationToken"] = resp["NextContinuationToken"]
105109
except KeyError:
106110
break
107111

108-
def _get_matching_s3_keys_generator(self, prefix="", suffix=""):
109-
for obj in self._get_matching_s3_objects_generator(prefix, suffix):
112+
def _get_matching_s3_keys_generator(self, prefix="", suffix="", include_dir_objects=False):
113+
for obj in self._get_matching_s3_objects_generator(prefix, suffix, include_dir_objects):
110114
yield obj["Key"], obj["LastModified"]
111115

112116
def put_object(self, body, key):
@@ -146,18 +150,15 @@ def _read_bytes_from_s3_single(self, key, allow_missing=False, ext_bucket=None):
146150

147151
return byte_array.strip()
148152

149-
def search(self, prefix="", suffix="") -> Tuple[List[str], List[datetime.datetime]]:
153+
def search(
154+
self, prefix="", suffix="", include_dir_objects=False
155+
) -> Tuple[List[str], List[datetime.datetime]]:
150156
paths = []
151157
timestamps = []
152158

153-
timestamp_map = {}
154-
for key, ts in self._get_matching_s3_keys_generator(prefix, suffix):
155-
timestamp_map[key] = ts
156-
157-
filtered_keys = util.remove_non_empty_directory_paths(list(timestamp_map.keys()))
158-
for key in filtered_keys:
159+
for key, ts in self._get_matching_s3_keys_generator(prefix, suffix, include_dir_objects):
159160
paths.append(key)
160-
timestamps.append(timestamp_map[key])
161+
timestamps.append(ts)
161162

162163
return paths, timestamps
163164

@@ -217,12 +218,14 @@ def download_dir(self, prefix, local_dir):
217218
def download_dir_contents(self, prefix, local_dir):
218219
util.mkdir_p(local_dir)
219220
prefix = util.ensure_suffix(prefix, "/")
220-
for key, _ in self._get_matching_s3_keys_generator(prefix):
221-
if key.endswith("/"):
222-
continue
221+
for key, _ in self._get_matching_s3_keys_generator(prefix, include_dir_objects=True):
223222
rel_path = util.trim_prefix(key, prefix)
224223
local_dest_path = os.path.join(local_dir, rel_path)
225-
self.download_file(key, local_dest_path)
224+
225+
if not local_dest_path.endswith("/"):
226+
self.download_file(key, local_dest_path)
227+
else:
228+
util.mkdir_p(os.path.dirname(local_dest_path))
226229

227230
def download_and_unzip(self, key, local_dir):
228231
util.mkdir_p(local_dir)

pkg/lib/gcp/gcs.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,12 @@ func ConvertGCSObjectsToKeys(gcsObjects ...*storage.ObjectAttrs) []string {
255255
return paths
256256
}
257257

258-
func (c *Client) ListGCSDir(bucket string, gcsDir string, maxResults *int64) ([]*storage.ObjectAttrs, error) {
258+
func (c *Client) ListGCSDir(bucket string, gcsDir string, includeDirObjects bool, maxResults *int64) ([]*storage.ObjectAttrs, error) {
259259
gcsDir = s.EnsureSuffix(gcsDir, "/")
260-
return c.ListGCSPrefix(bucket, gcsDir, maxResults)
260+
return c.ListGCSPrefix(bucket, gcsDir, includeDirObjects, maxResults)
261261
}
262262

263-
func (c *Client) ListGCSPrefix(bucket string, prefix string, maxResults *int64) ([]*storage.ObjectAttrs, error) {
263+
func (c *Client) ListGCSPrefix(bucket string, prefix string, includeDirObjects bool, maxResults *int64) ([]*storage.ObjectAttrs, error) {
264264
gcsClient, err := c.GCS()
265265
if err != nil {
266266
return nil, err
@@ -282,7 +282,11 @@ func (c *Client) ListGCSPrefix(bucket string, prefix string, maxResults *int64)
282282
if attrs == nil {
283283
continue
284284
}
285-
gcsObjects = append(gcsObjects, attrs)
285+
286+
if includeDirObjects || !strings.HasSuffix(attrs.Name, "/") {
287+
gcsObjects = append(gcsObjects, attrs)
288+
}
289+
286290
if maxResults != nil && int64(len(gcsObjects)) >= *maxResults {
287291
break
288292
}
@@ -291,12 +295,12 @@ func (c *Client) ListGCSPrefix(bucket string, prefix string, maxResults *int64)
291295
return gcsObjects, nil
292296
}
293297

294-
func (c *Client) ListGCSPathDir(gcsDirPath string, maxResults *int64) ([]*storage.ObjectAttrs, error) {
298+
func (c *Client) ListGCSPathDir(gcsDirPath string, includeDirObjects bool, maxResults *int64) ([]*storage.ObjectAttrs, error) {
295299
bucket, gcsDir, err := SplitGCSPath(gcsDirPath)
296300
if err != nil {
297301
return nil, err
298302
}
299-
return c.ListGCSDir(bucket, gcsDir, maxResults)
303+
return c.ListGCSDir(bucket, gcsDir, includeDirObjects, maxResults)
300304
}
301305

302306
// This behaves like you'd expect `ls` to behave on a local file system

pkg/operator/config/wrappers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func ListBucketPrefix(prefix string, maxResults *int64) ([]*storage.ObjectAttrs,
159159
}
160160
return nil, s3Objects, nil
161161
case types.GCPProviderType:
162-
gcsObjects, err := GCP.ListGCSPrefix(GCPCoreConfig.Bucket, prefix, maxResults)
162+
gcsObjects, err := GCP.ListGCSPrefix(GCPCoreConfig.Bucket, prefix, false, maxResults)
163163
if err != nil {
164164
return nil, nil, err
165165
}

pkg/types/spec/utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func validateDirModels(
153153
return nil, err
154154
}
155155

156-
gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, nil)
156+
gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, false, nil)
157157
if err != nil {
158158
return nil, err
159159
}
@@ -287,7 +287,7 @@ func validateModels(
287287
}
288288
modelPrefix = s.EnsureSuffix(modelPrefix, "/")
289289

290-
gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, nil)
290+
gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, false, nil)
291291
if err != nil {
292292
return nil, errors.Wrap(err, modelNameWrapStr)
293293
}

0 commit comments

Comments
 (0)