@@ -209,7 +209,7 @@ class DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
209209 bucket = traits .Generic (mandatory = False ,
210210 desc = 'Boto3 S3 bucket for manual override of bucket' )
211211 # Set this if user wishes to have local copy of files as well
212- local_dir = traits .Str (desc = 'Copy files locally as well as to S3 bucket' )
212+ local_copy = traits .Str (desc = 'Copy files locally as well as to S3 bucket' )
213213
214214 # Set call-able inputs attributes
215215 def __setattr__ (self , key , value ):
@@ -392,6 +392,10 @@ def _check_s3_base_dir(self):
392392 s3_str = 's3://'
393393 base_directory = self .inputs .base_directory
394394
395+ if not isdefined (base_directory ):
396+ s3_flag = False
397+ return s3_flag
398+
395399 # Explicitly lower-case the "s3"
396400 if base_directory .lower ().startswith (s3_str ):
397401 base_dir_sp = base_directory .split ('/' )
@@ -616,7 +620,7 @@ def _upload_to_s3(self, src, dst):
616620 else :
617621 iflogger .info ('Overwriting previous S3 file...' )
618622
619- except ClientError as exc :
623+ except ClientError :
620624 iflogger .info ('New file to S3' )
621625
622626 # Copy file up to S3 (either encrypted or not)
@@ -653,18 +657,21 @@ def _list_outputs(self):
653657 # Check if base directory reflects S3 bucket upload
654658 try :
655659 s3_flag = self ._check_s3_base_dir ()
656- s3dir = self .inputs .base_directory
657- if isdefined (self .inputs .container ):
658- s3dir = os .path .join (s3dir , self .inputs .container )
660+ if s3_flag :
661+ s3dir = self .inputs .base_directory
662+ if isdefined (self .inputs .container ):
663+ s3dir = os .path .join (s3dir , self .inputs .container )
664+ else :
665+ s3dir = '<N/A>'
659666 # If encountering an exception during bucket access, set output
660667 # base directory to a local folder
661668 except Exception as exc :
669+ s3dir = '<N/A>'
670+ s3_flag = False
662671 if not isdefined (self .inputs .local_copy ):
663672 local_out_exception = os .path .join (os .path .expanduser ('~' ),
664673 's3_datasink_' + self .bucket .name )
665674 outdir = local_out_exception
666- else :
667- outdir = self .inputs .local_copy
668675 # Log local copying directory
669676 iflogger .info ('Access to S3 failed! Storing outputs locally at: ' \
670677 '%s\n Error: %s' % (outdir , exc ))
@@ -673,8 +680,8 @@ def _list_outputs(self):
673680 if isdefined (self .inputs .container ):
674681 outdir = os .path .join (outdir , self .inputs .container )
675682
676- # If doing a localy output
677- if not outdir . lower (). startswith ( 's3://' ) :
683+ # If sinking to local folder
684+ if outdir != s3dir :
678685 outdir = os .path .abspath (outdir )
679686 # Create the directory if it doesn't exist
680687 if not os .path .exists (outdir ):
@@ -714,18 +721,19 @@ def _list_outputs(self):
714721 if not os .path .isfile (src ):
715722 src = os .path .join (src , '' )
716723 dst = self ._get_dst (src )
724+ if s3_flag :
725+ s3dst = os .path .join (s3tempoutdir , dst )
726+ s3dst = self ._substitute (s3dst )
717727 dst = os .path .join (tempoutdir , dst )
718- s3dst = os .path .join (s3tempoutdir , dst )
719728 dst = self ._substitute (dst )
720729 path , _ = os .path .split (dst )
721730
722731 # If we're uploading to S3
723732 if s3_flag :
724- dst = dst .replace (outdir , self .inputs .base_directory )
725- self ._upload_to_s3 (src , dst )
726- out_files .append (dst )
733+ self ._upload_to_s3 (src , s3dst )
734+ out_files .append (s3dst )
727735 # Otherwise, copy locally src -> dst
728- else :
736+ if not s3_flag or isdefined ( self . inputs . local_copy ) :
729737 # Create output directory if it doesnt exist
730738 if not os .path .exists (path ):
731739 try :
@@ -787,6 +795,8 @@ class S3DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
787795 _outputs = traits .Dict (traits .Str , value = {}, usedefault = True )
788796 remove_dest_dir = traits .Bool (False , usedefault = True ,
789797 desc = 'remove dest directory when copying dirs' )
798+ # Set this if user wishes to have local copy of files as well
799+ local_copy = traits .Str (desc = 'Copy files locally as well as to S3 bucket' )
790800
791801 def __setattr__ (self , key , value ):
792802 if key not in self .copyable_trait_names ():
0 commit comments