88
99import pandas as pd
1010
11+ from . import utils
12+
1113
1214class ModelType (Enum ):
1315 """A selection of machine learning modeling frameworks supported by Openlayer.
@@ -89,7 +91,7 @@ def __init__(
8991 env_name : str ,
9092 requirements_file_path : str ,
9193 python_version_file_path : str ,
92- logs_file_path : Optional [str ] = None ,
94+ logger : Optional [logging . Logger ] = None ,
9395 ):
9496 if not self ._conda_available ():
9597 raise Exception ("Conda is not available on this machine." )
@@ -98,22 +100,19 @@ def __init__(
98100 self .requirements_file_path = requirements_file_path
99101 self .python_version_file_path = python_version_file_path
100102 self ._conda_prefix = self ._get_conda_prefix ()
101- self ._logs_file_path = logs_file_path
102- self ._logs = None
103+ self .logger = logger or logging .getLogger ("validators" )
103104
104105 def __enter__ (self ):
105- self ._logs = open (self ._logs_file_path , "w" )
106106 existing_envs = self .get_existing_envs ()
107107 if self .env_name in existing_envs :
108- logging .info ("Found existing conda environment '%s'." , self .env_name )
108+ self . logger .info ("Found existing conda environment '%s'." , self .env_name )
109109 else :
110110 self .create ()
111111 self .install_requirements ()
112112 return self
113113
114114 def __exit__ (self , exc_type , exc_value , traceback ):
115115 self .deactivate ()
116- self ._logs .close ()
117116
118117 def _conda_available (self ) -> bool :
119118 """Checks if conda is available on the machine."""
@@ -131,55 +130,57 @@ def _get_conda_prefix(self) -> str:
131130
132131 def create (self ):
133132 """Creates a conda environment with the specified name and python version."""
134- logging . info ("Creating a new conda environment '%s'..." , self .env_name )
133+ self . logger . info ("Creating a new conda environment '%s'... \n " , self .env_name )
135134
136135 with open (
137136 self .python_version_file_path , "r" , encoding = "UTF-8"
138137 ) as python_version_file :
139138 python_version = python_version_file .read ().split ("." )[:2 ]
140139 python_version = "." .join (python_version )
141140
142- try :
143- subprocess .check_call (
144- [
145- "conda" ,
146- "create" ,
147- "-n" ,
148- f"{ self .env_name } " ,
149- f"python={ python_version } " ,
150- "--yes" ,
151- ],
152- stdout = self ._logs ,
153- stderr = subprocess .STDOUT ,
154- )
155- except subprocess .CalledProcessError as err :
141+ process = subprocess .Popen (
142+ [
143+ "conda" ,
144+ "create" ,
145+ "-n" ,
146+ f"{ self .env_name } " ,
147+ f"python={ python_version } " ,
148+ "--yes" ,
149+ ],
150+ stdout = subprocess .PIPE ,
151+ stderr = subprocess .STDOUT ,
152+ )
153+
154+ with process .stdout :
155+ utils .log_subprocess_output (self .logger , process .stdout )
156+ exitcode = process .wait ()
157+
158+ if exitcode != 0 :
156159 raise Exception (
157160 f"Failed to create conda environment '{ self .env_name } ' with python "
158161 f"version { python_version } ."
159- " Please check the model logs for details. \n "
160- f"- Error code returned { err .returncode } : { err .output } "
161- ) from None
162+ )
162163
163164 def delete (self ):
164165 """Deletes the conda environment with the specified name."""
165- logging .info ("Deleting conda environment '%s'..." , self .env_name )
166+ self . logger .info ("Deleting conda environment '%s'..." , self .env_name )
166167
167- try :
168- subprocess . check_call (
169- [ "conda" , "env" , "remove" , "-n" , f" { self . env_name } " , "--yes" ] ,
170- stdout = subprocess .DEVNULL ,
171- stderr = subprocess . STDOUT ,
172- )
173- except subprocess . CalledProcessError as err :
174- raise Exception (
175- f"Failed to delete conda environment ' { self . env_name } '."
176- " Please check the model logs for details. \n "
177- f"- Error code returned { err . returncode } : { err . output } "
178- ) from None
168+ process = subprocess . Popen (
169+ [ "conda" , "env" , "remove" , "-n" , f" { self . env_name } " , "--yes" ],
170+ stdout = subprocess . PIPE ,
171+ stderr = subprocess .STDOUT ,
172+ )
173+
174+ with process . stdout :
175+ utils . log_subprocess_output ( self . logger , process . stdout )
176+ exitcode = process . wait ()
177+
178+ if exitcode != 0 :
179+ raise Exception ( f"Failed to delete conda environment ' { self . env_name } '." )
179180
180181 def get_existing_envs (self ) -> Set [str ]:
181182 """Gets the names of all existing conda environments."""
182- logging .info ("Checking existing conda environments..." )
183+ self . logger .info ("Checking existing conda environments..." )
183184
184185 list_envs_command = """
185186 conda env list | awk '{print $1}'
@@ -189,20 +190,19 @@ def get_existing_envs(self) -> Set[str]:
189190 envs = subprocess .check_output (
190191 list_envs_command ,
191192 shell = True ,
192- stderr = self . _logs ,
193+ stderr = subprocess . DEVNULL ,
193194 )
194195 except subprocess .CalledProcessError as err :
195196 raise Exception (
196197 f"Failed to list conda environments."
197- " Please check the model logs for details. \n "
198198 f"- Error code returned { err .returncode } : { err .output } "
199199 ) from None
200200 envs = set (envs .decode ("UTF-8" ).split ("\n " ))
201201 return envs
202202
203203 def activate (self ):
204204 """Activates the conda environment with the specified name."""
205- logging .info ("Activating conda environment '%s'..." , self .env_name )
205+ self . logger .info ("Activating conda environment '%s'..." , self .env_name )
206206
207207 activation_command = f"""
208208 eval $(conda shell.bash hook)
@@ -212,20 +212,19 @@ def activate(self):
212212 try :
213213 subprocess .check_call (
214214 activation_command ,
215- stdout = self . _logs ,
215+ stdout = subprocess . DEVNULL ,
216216 stderr = subprocess .STDOUT ,
217217 shell = True ,
218218 )
219219 except subprocess .CalledProcessError as err :
220220 raise Exception (
221221 f"Failed to activate conda environment '{ self .env_name } '."
222- " Please check the model logs for details. \n "
223222 f"- Error code returned { err .returncode } : { err .output } "
224223 ) from None
225224
226225 def deactivate (self ):
227226 """Deactivates the conda environment with the specified name."""
228- logging .info ("Deactivating conda environment '%s'..." , self .env_name )
227+ self . logger .info ("Deactivating conda environment '%s'..." , self .env_name )
229228
230229 deactivation_command = f"""
231230 eval $(conda shell.bash hook)
@@ -236,7 +235,7 @@ def deactivate(self):
236235 subprocess .check_call (
237236 deactivation_command ,
238237 shell = True ,
239- stdout = self . _logs ,
238+ stdout = subprocess . DEVNULL ,
240239 stderr = subprocess .STDOUT ,
241240 )
242241 except subprocess .CalledProcessError as err :
@@ -248,20 +247,17 @@ def deactivate(self):
248247
249248 def install_requirements (self ):
250249 """Installs the requirements from the specified requirements file."""
251- logging .info (
250+ self . logger .info (
252251 "Installing requirements in conda environment '%s'..." , self .env_name
253252 )
254253
255- try :
256- self .run_commands (
257- ["pip" , "install" , "-r" , self .requirements_file_path ],
258- )
259- except subprocess .CalledProcessError as err :
254+ exitcode = self .run_commands (
255+ ["pip" , "install" , "-r" , self .requirements_file_path ],
256+ )
257+ if exitcode != 0 :
260258 raise Exception (
261- f"Failed to install the depencies specified in the requirements.txt file."
262- " Please check the model logs for details. \n "
263- f"- Error code returned { err .returncode } : { err .output } "
264- ) from None
259+ "Failed to install the depencies specified in the requirements.txt file."
260+ )
265261
266262 def run_commands (self , commands : List [str ]):
267263 """Runs the specified commands inside the conda environment.
@@ -277,34 +273,34 @@ def run_commands(self, commands: List[str]):
277273 conda activate { self .env_name }
278274 { " " .join (commands )}
279275 """
280- subprocess .check_call (
281- full_command , shell = True , stdout = self ._logs , stderr = subprocess .STDOUT
276+ process = subprocess .Popen (
277+ full_command ,
278+ shell = True ,
279+ stdout = subprocess .PIPE ,
280+ stderr = subprocess .STDOUT ,
282281 )
283282
283+ with process .stdout :
284+ utils .log_subprocess_output (self .logger , process .stdout )
285+ exitcode = process .wait ()
286+ return exitcode
287+
284288
285289class ModelRunner :
286290 """Wraps the model package and provides a uniform run method."""
287291
288- def __init__ (self , model_package : str , logs : Optional [str ] = None ):
292+ def __init__ (self , model_package : str , logger : Optional [logging . Logger ] = None ):
289293 self .model_package = model_package
290294
291- # Save log to the model package if logs is not specified
292- if logs is None :
293- logs_file_path = f"{ model_package } /model_run_logs.txt"
294-
295- logging .basicConfig (
296- filename = logs_file_path ,
297- format = "[%(asctime)s] %(levelname)s - %(message)s" ,
298- level = logging .INFO ,
299- datefmt = "%Y-%m-%d %H:%M:%S" ,
300- )
295+ # Use validators logger if no logger is provided
296+ self .logger = logger or logging .getLogger ("validators" )
301297
302298 # TODO: change env name to the model id
303299 self ._conda_environment = CondaEnvironment (
304300 env_name = "new-openlayer" ,
305301 requirements_file_path = f"{ model_package } /requirements.txt" ,
306302 python_version_file_path = f"{ model_package } /python_version" ,
307- logs_file_path = logs_file_path ,
303+ logger = self . logger ,
308304 )
309305
310306 def __del__ (self ):
@@ -338,29 +334,28 @@ def run(self, input_data: pd.DataFrame) -> pd.DataFrame:
338334
339335 # Run the model in the conda environment
340336 with self ._conda_environment as env :
341- logging .info ("Running %s rows through the model..." , len (input_data ))
342- try :
343- env .run_commands (
344- [
345- "python" ,
346- f"{ self .model_package } /prediction_job.py" ,
347- "--input" ,
348- f"{ temp_dir } /input_data.csv" ,
349- "--output" ,
350- f"{ temp_dir } /output_data.csv" ,
351- ]
352- )
353- except subprocess .CalledProcessError as err :
354- logging .error (
337+ self .logger .info (
338+ "Running %s rows through the model..." , len (input_data )
339+ )
340+ exitcode = env .run_commands (
341+ [
342+ "python" ,
343+ f"{ self .model_package } /prediction_job.py" ,
344+ "--input" ,
345+ f"{ temp_dir } /input_data.csv" ,
346+ "--output" ,
347+ f"{ temp_dir } /output_data.csv" ,
348+ ]
349+ )
350+ if exitcode != 0 :
351+ self .logger .error (
355352 "Failed to run the model. Check the stacktrace above for details."
356353 )
357354 raise Exception (
358355 "Failed to run the model in the conda environment."
359- " Please check the model logs for details. \n "
360- f" Error { err .returncode } : { err .output } "
361356 ) from None
362357
363- logging .info ("Successfully ran data through the model!" )
358+ self . logger .info ("Successfully ran data through the model!" )
364359 # Read the output data from the csv file
365360 output_data = pd .read_csv (f"{ temp_dir } /output_data.csv" )
366361
0 commit comments