Skip to content

Commit 3b6f1a2

Browse files
committed
Updated pr.
1 parent 2fa05bf commit 3b6f1a2

File tree

2 files changed

+117
-83
lines changed

2 files changed

+117
-83
lines changed

ads/jobs/builders/infrastructure/dsc_file_system.py

Lines changed: 110 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import ads
77
import oci
88
import copy
9+
import ipaddress
910

1011
from ads.common import utils
1112
from dataclasses import asdict, dataclass
@@ -16,7 +17,9 @@
1617
@dataclass
1718
class DSCFileSystem:
1819

19-
destination_directory_name: str = None
20+
src: str = None
21+
dsc: str = None
22+
storage_type: str = None
2023

2124
def to_dict(self) -> dict:
2225
"""Converts the object to dictionary."""
@@ -27,7 +30,7 @@ def from_dict(cls, env: dict) -> "DSCFileSystem":
2730
"""Initialize the object from a Python dictionary."""
2831
return cls(**{utils.camel_to_snake(k): v for k, v in env.items()})
2932

30-
def update_to_dsc_model(self, **kwargs) -> dict:
33+
def update_to_dsc_model(self) -> dict:
3134
return self.to_dict()
3235

3336
@classmethod
@@ -38,94 +41,119 @@ def update_from_dsc_model(cls, dsc_model: dict) -> "DSCFileSystem":
3841
@dataclass
3942
class OCIFileStorage(DSCFileSystem):
4043

41-
mount_target: str = None
4244
mount_target_id: str = None
43-
export_path: str = None
4445
export_id: str = None
4546
storage_type: str = FILE_STORAGE_TYPE
4647

4748
def __post_init__(self):
48-
if not self.destination_directory_name:
49-
raise ValueError(
50-
"Parameter `destination_directory_name` must be provided to mount file system."
51-
)
49+
if not self.src:
50+
if not self.mount_target_id:
51+
raise ValueError(
52+
"Missing required parameter. Either `src` or `mount_target_id` is required for mounting file storage system."
53+
)
5254

53-
if not self.mount_target and not self.mount_target_id:
54-
raise ValueError(
55-
"Either parameter `mount_target` or `mount_target_id` must be provided to mount file system."
56-
)
55+
if not self.export_id:
56+
raise ValueError(
57+
"Missing required parameter. Either `src` or `export_id` is required for mounting file storage system."
58+
)
5759

58-
if not self.export_path and not self.export_id:
60+
if not self.dsc:
5961
raise ValueError(
60-
"Either parameter `export_path` or `export_id` must be provided to mount file system."
62+
"Parameter `src` is required for mounting file storage system."
6163
)
6264

63-
def update_to_dsc_model(self, **kwargs) -> dict:
65+
def update_to_dsc_model(self) -> dict:
6466
"""Updates arguments to dsc model.
6567
6668
Returns
6769
-------
6870
dict:
6971
A dictionary of arguments.
7072
"""
71-
auth = ads.auth.default_signer()
72-
file_storage_client = oci.file_storage.FileStorageClient(**auth)
73-
identity_client = oci.identity.IdentityClient(**auth)
74-
7573
arguments = self.to_dict()
7674

77-
compartment_id = kwargs["compartment_id"]
75+
if "exportId" not in arguments:
76+
arguments["exportId"] = self._get_export_id(arguments)
77+
7878
if "mountTargetId" not in arguments:
79-
list_availability_domains_response = (
80-
identity_client.list_availability_domains(
81-
compartment_id=compartment_id
82-
).data
79+
arguments["mountTargetId"] = self._get_mount_target_id(arguments)
80+
81+
arguments.pop("src")
82+
arguments["destination_directory_name"] = arguments.pop("dsc")[5:]
83+
84+
return arguments
85+
86+
def _get_export_id(self, arguments: dict) -> str:
87+
file_storage_client = oci.file_storage.FileStorageClient(**ads.auth.default_signer())
88+
src_list = arguments["src"].split(":")
89+
ip = src_list[0]
90+
export_path = src_list[1]
91+
92+
resource_summary = self._get_resource(ip)
93+
94+
list_exports_response = file_storage_client.list_exports(
95+
compartment_id=resource_summary.compartment_id
96+
).data
97+
exports = [
98+
export.id
99+
for export in list_exports_response
100+
if export.path == export_path
101+
]
102+
if len(exports) == 0:
103+
raise ValueError(
104+
f"No `export_id` found under ip {ip}. Specify a valid `src`."
105+
)
106+
if len(exports) > 1:
107+
raise ValueError(
108+
f"Multiple `export_id` found under ip {ip}. Specify `export_id` of the file system instead."
83109
)
84-
mount_targets = []
85-
for availability_domain in list_availability_domains_response:
86-
mount_targets.extend(
87-
file_storage_client.list_mount_targets(
88-
compartment_id=compartment_id,
89-
availability_domain=availability_domain.name,
90-
).data
91-
)
92-
mount_targets = [
93-
mount_target.id
94-
for mount_target in mount_targets
95-
if mount_target.display_name == self.mount_target
96-
]
97-
if len(mount_targets) == 0:
98-
raise ValueError(
99-
f"No `mount_target` with value {self.mount_target} found under compartment {compartment_id}. Specify a valid one."
100-
)
101-
if len(mount_targets) > 1:
102-
raise ValueError(
103-
f"Multiple `mount_target` with value {self.mount_target} found under compartment {compartment_id}. Specify `mount_target_id` of the file system instead."
104-
)
105-
arguments["mountTargetId"] = mount_targets[0]
106-
arguments.pop("mountTarget")
107110

108-
if "exportId" not in arguments:
109-
list_exports_response = file_storage_client.list_exports(
110-
compartment_id=compartment_id
111-
).data
112-
exports = [
113-
export.id
114-
for export in list_exports_response
115-
if export.path == self.export_path
116-
]
117-
if len(exports) == 0:
118-
raise ValueError(
119-
f"No `export_path` with value {self.export_path} found under compartment {compartment_id}. Specify a valid one."
120-
)
121-
if len(exports) > 1:
122-
raise ValueError(
123-
f"Multiple `export_path` with value {self.export_path} found under compartment {compartment_id}. Specify `export_id` of the file system instead."
124-
)
125-
arguments["exportId"] = exports[0]
126-
arguments.pop("exportPath")
111+
return exports[0]
112+
113+
def _get_mount_target_id(self, arguments: dict) -> str:
114+
file_storage_client = oci.file_storage.FileStorageClient(**ads.auth.default_signer())
115+
ip = arguments["src"].split(":")[0]
116+
resource = self._get_resource(ip)
117+
118+
mount_targets = file_storage_client.list_mount_targets(
119+
compartment_id=resource.compartment_id,
120+
availability_domain=resource.availability_domain,
121+
export_set_id=file_storage_client.get_export(arguments["exportId"]).data.export_set_id
122+
).data
123+
mount_targets = [
124+
mount_target.id
125+
for mount_target in mount_targets
126+
if resource.identifier in mount_target.private_ip_ids
127+
]
128+
if len(mount_targets) == 0:
129+
raise ValueError(
130+
f"No `mount_target_id` found under ip {ip}. Specify a valid `src`."
131+
)
132+
if len(mount_targets) > 1:
133+
raise ValueError(
134+
f"Multiple `mount_target_id` found under ip {ip}. Specify `mount_target_id` of the file system instead."
135+
)
136+
return mount_targets[0]
127137

128-
return arguments
138+
def _get_resource(self, ip: str) -> oci.resource_search.models.ResourceSummary:
139+
resource_client = oci.resource_search.ResourceSearchClient(**ads.auth.default_signer())
140+
resource = resource_client.search_resources(
141+
search_details=oci.resource_search.models.FreeTextSearchDetails(
142+
text=ip,
143+
matching_context_type="NONE"
144+
),
145+
#limit=440,
146+
#page="EXAMPLE-page-Value",
147+
#tenant_id="ocid1.test.oc1..<unique_ID>EXAMPLE-tenantId-Value",
148+
#opc_request_id="4DLA5ESGCPUPH3J3NAMC<unique_ID>"
149+
).data.items
150+
151+
resource = sorted(resource, key=lambda resource_summary: resource_summary.time_created)
152+
153+
if not resource or not hasattr(resource[-1], "compartment_id") or not hasattr(resource[-1], "identifier"):
154+
raise ValueError(f"Can't find the compartment id or identifier from ip {ip}. Specify a valid `src`.")
155+
156+
return resource[-1]
129157

130158
@classmethod
131159
def update_from_dsc_model(cls, dsc_model: dict) -> DSCFileSystem:
@@ -162,3 +190,18 @@ def update_from_dsc_model(cls, dsc_model: dict) -> DSCFileSystem:
162190
).data.path
163191

164192
return super().from_dict(argument)
193+
194+
195+
class DSCFileSystemManager:
196+
storage_mount_type_dict = {FILE_STORAGE_TYPE: OCIFileStorage}
197+
198+
@classmethod
199+
def initialize(cls, arguments: dict) -> DSCFileSystem:
200+
if "src" in arguments:
201+
try:
202+
ipaddress.IPv4Network(arguments["src"].split(":")[0])
203+
return OCIFileStorage(arguments)
204+
except:
205+
pass
206+
elif "mount_target_id" in arguments or "export_id" in arguments:
207+
return OCIFileStorage(arguments)

ads/jobs/builders/infrastructure/dsc_job.py

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from ads.jobs.builders.runtimes.python_runtime import GitPythonRuntime
3838

3939
from ads.jobs.builders.infrastructure.dsc_file_system import (
40+
DSCFileSystemManager,
4041
OCIFileStorage,
4142
DSCFileSystem,
4243
)
@@ -1234,14 +1235,14 @@ def log_group_id(self) -> str:
12341235
return self.get_spec(self.CONST_LOG_GROUP_ID)
12351236

12361237
def with_storage_mount(
1237-
self, *storage_mount: List[Union(DSCFileSystem, dict)]
1238+
self, *storage_mount: List[dict]
12381239
) -> DataScienceJob:
12391240
"""Sets the file systems to be mounted for the data science job.
12401241
A maximum number of 5 file systems are allowed to be mounted for a single data science job.
12411242
12421243
Parameters
12431244
----------
1244-
storage_mount : List[Union(DSCFileSystem, dict)
1245+
storage_mount : List[dict]
12451246
A list of file systems to be mounted.
12461247
12471248
Returns
@@ -1251,20 +1252,11 @@ def with_storage_mount(
12511252
"""
12521253
storage_mount_list = []
12531254
for item in storage_mount:
1254-
if not isinstance(item, DSCFileSystem) and not isinstance(item, dict):
1255+
if not isinstance(item, dict):
12551256
raise ValueError(
1256-
"Parameter `storage_mount` should be a list of either DSCFileSystem instances or dictionaries."
1257+
"Parameter `storage_mount` should be a list of dictionaries."
12571258
)
1258-
if isinstance(item, dict):
1259-
storage_type = item.get("storage_type", None)
1260-
if not storage_type:
1261-
raise ValueError(
1262-
"Parameter `storage_type` must be provided for each file system to be mounted."
1263-
)
1264-
if storage_type not in self.storage_mount_type_dict:
1265-
raise ValueError(f"Storage type {storage_type} is not supprted.")
1266-
item = self.storage_mount_type_dict[storage_type](**item)
1267-
storage_mount_list.append(item)
1259+
storage_mount_list.append(DSCFileSystemManager.initialize(item))
12681260
if len(storage_mount_list) > MAXIMUM_MOUNT_COUNT:
12691261
raise ValueError(
12701262
f"A maximum number of {MAXIMUM_MOUNT_COUNT} file systems are allowed to be mounted at this time for a job."
@@ -1436,8 +1428,7 @@ def _update_job_infra(self, dsc_job: DSCJob) -> DataScienceJob:
14361428
"Storage mount hasn't been supported in the current OCI SDK installed."
14371429
)
14381430
dsc_job.job_storage_mount_configuration_details_list = [
1439-
file_system.update_to_dsc_model(compartment_id=dsc_job.compartment_id)
1440-
for file_system in self.storage_mount
1431+
file_system.update_to_dsc_model() for file_system in self.storage_mount
14411432
]
14421433
return self
14431434

0 commit comments

Comments
 (0)