1111import tempfile
1212from typing import List , Union
1313
14+ import cloudpickle
1415import fsspec
1516import oracledb
1617import pandas as pd
@@ -126,7 +127,26 @@ def load_data(data_spec, storage_options=None, **kwargs):
126127 return data
127128
128129
130+ def _safe_write (fn , ** kwargs ):
131+ try :
132+ fn (** kwargs )
133+ except Exception :
134+ logger .warning (f'Failed to write file { kwargs .get ("filename" , "UNKNOWN" )} ' )
135+
136+
129137def write_data (data , filename , format , storage_options = None , index = False , ** kwargs ):
138+ return _safe_write (
139+ fn = _write_data ,
140+ data = data ,
141+ filename = filename ,
142+ format = format ,
143+ storage_options = storage_options ,
144+ index = index ,
145+ ** kwargs ,
146+ )
147+
148+
149+ def _write_data (data , filename , format , storage_options = None , index = False , ** kwargs ):
130150 disable_print ()
131151 if not format :
132152 _ , format = os .path .splitext (filename )
@@ -143,11 +163,24 @@ def write_data(data, filename, format, storage_options=None, index=False, **kwar
143163
144164
145165def write_json (json_dict , filename , storage_options = None ):
166+ return _safe_write (
167+ fn = _write_json ,
168+ json_dict = json_dict ,
169+ filename = filename ,
170+ storage_options = storage_options ,
171+ )
172+
173+
174+ def _write_json (json_dict , filename , storage_options = None ):
146175 with fsspec .open (filename , mode = "w" , ** storage_options ) as f :
147176 f .write (json .dumps (json_dict ))
148177
149178
150179def write_simple_json (data , path ):
180+ return _safe_write (fn = _write_simple_json , data = data , path = path )
181+
182+
183+ def _write_simple_json (data , path ):
151184 if ObjectStorageDetails .is_oci_path (path ):
152185 storage_options = default_signer ()
153186 else :
@@ -156,6 +189,60 @@ def write_simple_json(data, path):
156189 json .dump (data , f , indent = 4 )
157190
158191
192+ def write_file (local_filename , remote_filename , storage_options , ** kwargs ):
193+ return _safe_write (
194+ fn = _write_file ,
195+ local_filename = local_filename ,
196+ remote_filename = remote_filename ,
197+ storage_options = storage_options ,
198+ ** kwargs ,
199+ )
200+
201+
202+ def _write_file (local_filename , remote_filename , storage_options , ** kwargs ):
203+ with open (local_filename ) as f1 :
204+ with fsspec .open (
205+ remote_filename ,
206+ "w" ,
207+ ** storage_options ,
208+ ) as f2 :
209+ f2 .write (f1 .read ())
210+
211+
212+ def load_pkl (filepath ):
213+ return _safe_write (fn = _load_pkl , filepath = filepath )
214+
215+
216+ def _load_pkl (filepath ):
217+ storage_options = {}
218+ if ObjectStorageDetails .is_oci_path (filepath ):
219+ storage_options = default_signer ()
220+
221+ with fsspec .open (filepath , "rb" , ** storage_options ) as f :
222+ return cloudpickle .load (f )
223+ return None
224+
225+
226+ def write_pkl (obj , filename , output_dir , storage_options ):
227+ return _safe_write (
228+ fn = _write_pkl ,
229+ obj = obj ,
230+ filename = filename ,
231+ output_dir = output_dir ,
232+ storage_options = storage_options ,
233+ )
234+
235+
236+ def _write_pkl (obj , filename , output_dir , storage_options ):
237+ pkl_path = os .path .join (output_dir , filename )
238+ with fsspec .open (
239+ pkl_path ,
240+ "wb" ,
241+ ** storage_options ,
242+ ) as f :
243+ cloudpickle .dump (obj , f )
244+
245+
159246def merge_category_columns (data , target_category_columns ):
160247 result = data .apply (
161248 lambda x : "__" .join ([str (x [col ]) for col in target_category_columns ]), axis = 1
@@ -290,4 +377,8 @@ def disable_print():
290377
291378# Restore
292379def enable_print ():
380+ try :
381+ sys .stdout .close ()
382+ except Exception :
383+ pass
293384 sys .stdout = sys .__stdout__
0 commit comments