diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/AwsClientFactory.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/AwsClientFactory.groovy index 04ae3958da..c272b62c50 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/AwsClientFactory.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/AwsClientFactory.groovy @@ -16,6 +16,7 @@ package nextflow.cloud.aws +import nextflow.cloud.aws.config.AwsBucketConfig import nextflow.cloud.aws.nio.util.S3AsyncClientConfiguration import nextflow.cloud.aws.nio.util.S3SyncClientConfiguration import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider @@ -209,19 +210,20 @@ class AwsClientFactory { return CloudWatchLogsClient.builder().region(getRegionObj(region)).credentialsProvider(getCredentialsProvider0()).build() } - S3Client getS3Client(S3SyncClientConfiguration s3ClientConfig, boolean global = false) { + S3Client getS3Client(S3SyncClientConfiguration s3ClientConfig, String bucketName, boolean global = false) { final SdkHttpClient.Builder httpClientBuilder = s3ClientConfig.getHttpClientBuilder() final ClientOverrideConfiguration overrideConfiguration = s3ClientConfig.getClientOverrideConfiguration() + final bucketConfig = config.getBucketConfig(bucketName) final builder = S3Client.builder() .crossRegionAccessEnabled(global) - .credentialsProvider(getS3CredentialsProvider()) + .credentialsProvider(getS3CredentialsProvider(bucketConfig)) .serviceConfiguration(S3Configuration.builder() - .pathStyleAccessEnabled(config.s3Config.pathStyleAccess) + .pathStyleAccessEnabled(bucketConfig.s3PathStyleAccess) .multiRegionEnabled(global) .build()) - if( config.s3Config.endpoint ) - builder.endpointOverride(URI.create(config.s3Config.endpoint)) + if( bucketConfig.endpoint ) + builder.endpointOverride(URI.create(bucketConfig.endpoint)) // AWS SDK v2 region must be always set, even when endpoint is overridden builder.region(getRegionObj(region)) @@ -235,14 +237,15 @@ class AwsClientFactory { return builder.build() } - S3AsyncClient getS3AsyncClient(S3AsyncClientConfiguration s3ClientConfig, boolean global = false) { + S3AsyncClient getS3AsyncClient(S3AsyncClientConfiguration s3ClientConfig, String bucketName, boolean global = false) { + final bucketConfig = config.getBucketConfig(bucketName) def builder = S3AsyncClient.crtBuilder() .crossRegionAccessEnabled(global) - .credentialsProvider(getS3CredentialsProvider()) - .forcePathStyle(config.s3Config.pathStyleAccess) + .credentialsProvider(getS3CredentialsProvider(bucketConfig)) + .forcePathStyle(bucketConfig.pathStyleAccess) .region(getRegionObj(region)) - if( config.s3Config.endpoint ) - builder.endpointOverride(URI.create(config.s3Config.endpoint)) + if( bucketConfig.endpoint ) + builder.endpointOverride(URI.create(bucketConfig.endpoint)) final retryConfiguration = s3ClientConfig.getCrtRetryConfiguration() if( retryConfiguration != null ) @@ -287,8 +290,8 @@ class AwsClientFactory { * * @return an AwsCredentialsProvider instance, falling back to anonymous if needed. */ - private AwsCredentialsProvider getS3CredentialsProvider() { - if ( config.s3Config.anonymous ) + private AwsCredentialsProvider getS3CredentialsProvider(AwsBucketConfig configBucket) { + if ( configBucket?.anonymous || config.s3Config.anonymous) return AnonymousCredentialsProvider.create() def provider = getCredentialsProvider0() try { diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy index cf8fb3d3be..16c1ff37f6 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy @@ -353,19 +353,24 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec List getLaunchCommand(String s3WorkDir) { // the cmd list to launch it final opts = getAwsOptions() + + // We cannot use 's3WorkDir' to extract the bucket-specific CLI arguments. Depending on the Task type (Array or Single), + // it could be an environment variable reference or a real path. In both cases, these argument will be used to upload + // the '.command.log' file to the workingDir bucket. + final bucket = (getWorkDir() as S3Path).bucket + String args = opts.generateUploadCliArgs(bucket) + args = args ? " $args" : '' + final cmd = opts.s5cmdPath - ? s5Cmd(s3WorkDir, opts) - : s3Cmd(s3WorkDir, opts) + ? s5Cmd(s3WorkDir, opts, args) + : s3Cmd(s3WorkDir, opts, args) return ['bash','-o','pipefail','-c', cmd.toString()] } - static String s3Cmd(String workDir, AwsOptions opts) { + static String s3Cmd(String workDir, AwsOptions opts, String args) { final cli = opts.getAwsCli() final debug = opts.debug ? ' --debug' : '' - final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : '' - final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : '' - final requesterPays = opts.requesterPays ? ' --request-payer requester' : '' - final aws = "$cli s3 cp --only-show-errors${sse}${kms}${debug}${requesterPays}" + final aws = "$cli s3 cp --only-show-errors${debug}${args}" /* * Enhanced signal handling for AWS Batch tasks to fix nested Nextflow execution issues. @@ -402,12 +407,8 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec return cmd } - static String s5Cmd(String workDir, AwsOptions opts) { + static String s5Cmd(String workDir, AwsOptions opts, String args) { final cli = opts.getS5cmdPath() - final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : '' - final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : '' - final requesterPays = opts.requesterPays ? ' --request-payer requester' : '' - /* * Enhanced signal handling for AWS Batch tasks using s5cmd (high-performance S3 client). * This implementation mirrors the s3Cmd method but uses s5cmd instead of aws-cli for @@ -430,7 +431,7 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec * - Maintains the same signal-responsive background execution pattern * - Provides real-time logging while allowing proper signal handling */ - final cmd = "trap \"[[ -n \\\$pid ]] && kill -TERM \\\$pid\" TERM; trap \"{ ret=\$?; $cli cp${sse}${kms}${requesterPays} ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $cli cat ${workDir}/${TaskRun.CMD_RUN} | bash > >(tee ${TaskRun.CMD_LOG}) 2>&1 & pid=\$!; wait \$pid" + final cmd = "trap \"[[ -n \\\$pid ]] && kill -TERM \\\$pid\" TERM; trap \"{ ret=\$?; $cli cp${args} ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $cli cat ${workDir}/${TaskRun.CMD_RUN} | bash > >(tee ${TaskRun.CMD_LOG}) 2>&1 & pid=\$!; wait \$pid" return cmd } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchFileCopyStrategy.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchFileCopyStrategy.groovy index 116bce4755..a51a337835 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchFileCopyStrategy.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchFileCopyStrategy.groovy @@ -20,6 +20,8 @@ import java.nio.file.Path import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import nextflow.cloud.aws.config.AwsConfig +import nextflow.cloud.aws.nio.S3Path import nextflow.cloud.aws.util.S3BashLib import nextflow.executor.SimpleFileCopyStrategy import nextflow.processor.TaskBean @@ -93,10 +95,12 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy { */ @Override String stageInputFile( Path path, String targetName ) { + def args = path instanceof S3Path ? opts.generateDownloadCliArgs(((S3Path)path).bucket) : '' + args = args ? " $args" : '' // third param should not be escaped, because it's used in the grep match rule def stage_cmd = opts.maxTransferAttempts > 1 && !opts.retryMode - ? "downloads+=(\"nxf_cp_retry nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}\")" - : "downloads+=(\"nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}\")" + ? "downloads+=(\"nxf_cp_retry nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}${args}\")" + : "downloads+=(\"nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}${args}\")" return stage_cmd } @@ -116,12 +120,14 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy { final escape = new ArrayList(outputFiles.size()) for( String it : patterns ) escape.add( Escape.path(it) ) + def args = targetDir instanceof S3Path ? opts.generateUploadCliArgs(((S3Path)targetDir).bucket) : '' + args = args ? " $args" : '' return """\ uploads=() IFS=\$'\\n' for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do - uploads+=("nxf_s3_upload '\$name' s3:/${Escape.path(targetDir)}") + uploads+=("nxf_s3_upload '\$name' s3:/${Escape.path(targetDir)}${args}") done unset IFS nxf_parallel "\${uploads[@]}" @@ -133,7 +139,9 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy { */ @Override String touchFile( Path file ) { - "echo start | nxf_s3_upload - s3:/${Escape.path(file)}" + def args = file instanceof S3Path ? opts.generateUploadCliArgs(((S3Path)file).bucket) : '' + args = args ? " $args" : '' + return "echo start | nxf_s3_upload - s3:/${Escape.path(file)}${args}" } /** @@ -141,7 +149,7 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy { */ @Override String fileStr( Path path ) { - Escape.path(path.getFileName()) + return Escape.path(path.getFileName()) } /** @@ -149,18 +157,24 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy { */ @Override String copyFile( String name, Path target ) { - "nxf_s3_upload ${Escape.path(name)} s3:/${Escape.path(target.getParent())}" + def args = target instanceof S3Path ? opts.generateUploadCliArgs(((S3Path)target).bucket) : '' + args = args ? " $args" : '' + return "nxf_s3_upload ${Escape.path(name)} s3:/${Escape.path(target.getParent())}${args}" } - static String uploadCmd( String source, Path target ) { - "nxf_s3_upload ${Escape.path(source)} s3:/${Escape.path(target)}" + static String uploadCmd( String source, Path target, Map config) { + def args = target instanceof S3Path && config ? new AwsConfig(config).generateUploadCliArgs(((S3Path)target).bucket) : '' + args = args ? " $args" : '' + return "nxf_s3_upload ${Escape.path(source)} s3:/${Escape.path(target)}${args}" } /** * {@inheritDoc} */ String exitFile( Path path ) { - "| nxf_s3_upload - s3:/${Escape.path(path)} || true" + def args = path instanceof S3Path ? opts.generateUploadCliArgs(((S3Path)path).bucket) : '' + args = args ? " $args" : '' + return "| nxf_s3_upload - s3:/${Escape.path(path)}${args} || true" } /** diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsOptions.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsOptions.groovy index 86ea44db9d..1c030bfc2a 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsOptions.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsOptions.groovy @@ -18,7 +18,6 @@ package nextflow.cloud.aws.batch import java.nio.file.Path -import software.amazon.awssdk.services.s3.model.ObjectCannedACL import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode import groovy.transform.ToString @@ -112,34 +111,13 @@ class AwsOptions implements CloudTransferOptions { return awsConfig.batchConfig.getDelayBetweenAttempts() } - String getStorageClass() { - return awsConfig.s3Config.getStorageClass() - } - - String getStorageEncryption() { - return awsConfig.s3Config.getStorageEncryption() - } - - String getStorageKmsKeyId() { - return awsConfig.s3Config.getStorageKmsKeyId() - } - - ObjectCannedACL getS3Acl() { - return awsConfig.s3Config.getS3Acl() - } - Boolean getDebug() { return awsConfig.s3Config.getDebug() } - Boolean getRequesterPays() { - return awsConfig.s3Config.getRequesterPays() - } - String getAwsCli() { def result = getCliPath() if( !result ) result = 'aws' - if( region ) result += " --region $region" return result } @@ -164,4 +142,12 @@ class AwsOptions implements CloudTransferOptions { return awsConfig.batchConfig.terminateUnschedulableJobs } + String generateUploadCliArgs(String bucketName){ + return awsConfig.generateUploadCliArgs(bucketName) + } + + String generateDownloadCliArgs(String bucketName){ + return awsConfig.generateDownloadCliArgs(bucketName) + } + } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsBucketConfig.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsBucketConfig.groovy new file mode 100644 index 0000000000..e593b918d3 --- /dev/null +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsBucketConfig.groovy @@ -0,0 +1,50 @@ +/* + * Copyright 2020-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.config + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.config.spec.ConfigOption +import nextflow.script.dsl.Description + +/** + * Model AWS S3 bucket config settings + * + * @author Jorge Ejarque + */ +@Slf4j +@CompileStatic +class AwsBucketConfig extends AwsS3CommonConfig { + + @ConfigOption + @Description(""" + S3 Bucket specific AWS region (e.g. `us-east-1`). + """) + final String region + + AwsBucketConfig(Map opts) { + super(opts) + this.region = opts.region as String + } + + Map toBucketConfigMap(){ + final map = super.toBucketConfigMap() + if( region ) map.put('region', region) + return map + } +} diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsConfig.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsConfig.groovy index b42f151c63..924fe5bb4e 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsConfig.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsConfig.groovy @@ -27,6 +27,7 @@ import nextflow.Global import nextflow.SysEnv import nextflow.config.spec.ConfigOption import nextflow.config.spec.ConfigScope +import nextflow.config.spec.PlaceholderName import nextflow.config.spec.ScopeName import nextflow.script.dsl.Description import nextflow.util.IniFile @@ -45,7 +46,7 @@ class AwsConfig implements ConfigScope { final AwsBatchConfig batch - final AwsS3Config client + final AwsS3ClientConfig client @ConfigOption @Description(""" @@ -71,6 +72,9 @@ class AwsConfig implements ConfigScope { """) final String profile + @PlaceholderName("") + final Map buckets + /* required by extension point -- do not remove */ AwsConfig() {} @@ -80,7 +84,16 @@ class AwsConfig implements ConfigScope { this.profile = getAwsProfile0(SysEnv.get(), opts) this.region = getAwsRegion(SysEnv.get(), opts) this.batch = new AwsBatchConfig((Map)opts.batch ?: Collections.emptyMap()) - this.client = new AwsS3Config((Map)opts.client ?: Collections.emptyMap()) + this.client = new AwsS3ClientConfig((Map)opts.client ?: Collections.emptyMap()) + this.buckets = parseBuckets((Map)opts.buckets ?:Collections. emptyMap()) + } + + private static Map parseBuckets(Map buckets){ + final result = new LinkedHashMap() + buckets.each { Map.Entry entry -> + result[entry.key] = new AwsBucketConfig(entry.value) + } + return result } List getCredentials() { @@ -89,7 +102,7 @@ class AwsConfig implements ConfigScope { : Collections.emptyList() } - AwsS3Config getS3Config() { client } + AwsS3ClientConfig getS3Config() { client } AwsBatchConfig getBatchConfig() { batch } @@ -105,15 +118,22 @@ class AwsConfig implements ConfigScope { * Fallback to the global region US_EAST_1 when no region is found. * * Preference: - * 1. endpoint region - * 2. config region - * 3. US_EAST_1 + * 1. bucket specific endpoint region + * 2. global s3 client endpoint region + * 3. bucket specific region + * 5. config region + * 6. US_EAST_1 * * @returns Resolved region. **/ - String resolveS3Region() { - final epRegion = client.getEndpointRegion() - return epRegion ?: this.region ?: Region.US_EAST_1.id() + String resolveS3Region(String bucketName) { + def bucketRegion = null + def bucketEpRegion = null + if( bucketName && buckets && buckets.containsKey(bucketName) ){ + bucketRegion = buckets[bucketName].region + bucketEpRegion = buckets[bucketName].getEndpointRegion() + } + return bucketEpRegion ?: client.getEndpointRegion() ?: bucketRegion ?: this.region ?: Region.US_EAST_1.id() } static protected String getAwsProfile0(Map env, Map config) { @@ -148,6 +168,10 @@ class AwsConfig implements ConfigScope { return region.toString() } + if( env && env.AWS_REGION ) { + return env.AWS_REGION.toString() + } + if( env && env.AWS_DEFAULT_REGION ) { return env.AWS_DEFAULT_REGION.toString() } @@ -161,11 +185,17 @@ class AwsConfig implements ConfigScope { return ini.section(profile).region } - Map getS3LegacyProperties() { + Map getS3LegacyProperties(String bucketName) { final result = new LinkedHashMap(20) - // -- remaining client config options + // -- global client config options def config = client.getAwsClientConfig() + + // -- overwrite with bucket specific options + if( bucketName && buckets && buckets.containsKey(bucketName) ){ + config = config + buckets[bucketName].toLegacyConfig() + } + config = checkDefaultErrorRetry(config, SysEnv.get()) if( config ) { result.putAll(config) @@ -175,6 +205,18 @@ class AwsConfig implements ConfigScope { return result } + AwsBucketConfig getBucketConfig(String bucketName){ + // Get global bucket + def config = client.toBucketConfigMap() + + // overwrite with bucket specific options + if( bucketName && buckets && buckets.containsKey(bucketName) ){ + config = config + buckets[bucketName].toBucketConfigMap() + } + + return new AwsBucketConfig(config) + } + static protected Map checkDefaultErrorRetry(Map result, Map env) { if( result == null ) result = new HashMap(10) @@ -219,4 +261,39 @@ class AwsConfig implements ConfigScope { static AwsConfig config() { getConfig0(Global.config) } + + String generateUploadCliArgs(String bucketName){ + final config = getBucketConfig(bucketName) + final region = config.region ?: region + final cliArgs = [] + cliArgs.add(config.storageClass ? "--storage-class ${config.storageClass}" : "--storage-class STANDARD") + if(!batch.s5cmdPath && region) + cliArgs.add("--region ${region}") + if( config.anonymous ) + cliArgs.add("--no-sign-request") + if( config.storageEncryption ) + cliArgs.add("--sse ${config.storageEncryption}") + if( config.storageKmsKeyId ) + cliArgs.add("--sse-kms-key-id ${config.storageKmsKeyId}") + if( config.s3Acl ) + cliArgs.add("--acl ${config.s3Acl}") + if( config.requesterPays ) + cliArgs.add("--request-payer requester") + if( config.endpoint) + cliArgs.add("--endpoint-url ${config.endpoint}") + return cliArgs.join(" ") + } + + String generateDownloadCliArgs(String bucketName){ + final config = getBucketConfig(bucketName) + final region = config.region ?: region + final cliArgs = [] + if(!batch.s5cmdPath && region) + cliArgs.add("--region ${region}") + if( config.anonymous ) + cliArgs.add("--no-sign-request") + if( config.endpoint) + cliArgs.add("--endpoint-url ${config.endpoint}") + return cliArgs.join(" ") + } } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsS3Config.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsS3ClientConfig.groovy similarity index 64% rename from plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsS3Config.groovy rename to plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsS3ClientConfig.groovy index bbb385a266..87562294b5 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsS3Config.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsS3ClientConfig.groovy @@ -17,33 +17,21 @@ package nextflow.cloud.aws.config -import static nextflow.cloud.aws.util.AwsHelper.* - -import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.services.s3.model.ObjectCannedACL import groovy.transform.CompileStatic import groovy.util.logging.Slf4j -import nextflow.SysEnv import nextflow.config.spec.ConfigOption -import nextflow.config.spec.ConfigScope import nextflow.script.dsl.Description import nextflow.file.FileHelper import nextflow.util.Duration import nextflow.util.MemoryUnit /** - * Model AWS S3 config settings + * Model AWS S3 client config settings. It is applied to all buckets when there is no a specific bucket configuration. * * @author Paolo Di Tommaso */ @Slf4j @CompileStatic -class AwsS3Config implements ConfigScope { - - @ConfigOption - @Description(""" - Allow the access of public S3 buckets without providing AWS credentials (default: `false`). Any service that does not accept unsigned requests will return a service access error. - """) - final Boolean anonymous +class AwsS3ClientConfig extends AwsS3CommonConfig { @ConfigOption @Description(""" @@ -53,12 +41,6 @@ class AwsS3Config implements ConfigScope { final Boolean debug - @ConfigOption - @Description(""" - The AWS S3 API entry point e.g. `https://s3-us-west-1.amazonaws.com`. The endpoint must include the protocol prefix e.g. `https://`. - """) - final String endpoint - /** * Maximum number of concurrent transfers used by S3 transfer manager. By default, * it is determined automatically by `targetThroughputInGbps`. @@ -133,48 +115,12 @@ class AwsS3Config implements ConfigScope { """) final String proxyPassword - @ConfigOption - @Description(""" - Use [Requester Pays](https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html) for S3 buckets (default: `false`). - """) - final Boolean requesterPays - - @ConfigOption(types=[String]) - @Description(""" - Specify predefined bucket permissions, also known as [canned ACL](https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl). Can be one of `Private`, `PublicRead`, `PublicReadWrite`, `AuthenticatedRead`, `LogDeliveryWrite`, `BucketOwnerRead`, `BucketOwnerFullControl`, or `AwsExecRead`. - """) - final ObjectCannedACL s3Acl - - @ConfigOption - @Description(""" - Use the path-based access model to access objects in S3-compatible storage systems (default: `false`). - """) - final Boolean s3PathStyleAccess - @ConfigOption @Description(""" The amount of time to wait (in milliseconds) for data to be transferred over an established, open connection before the connection is timed out (default: `50000`). """) final Integer socketTimeout - @ConfigOption - @Description(""" - The S3 storage class applied to stored objects, one of \\[`STANDARD`, `STANDARD_IA`, `ONEZONE_IA`, `INTELLIGENT_TIERING`\\] (default: `STANDARD`). - """) - final String storageClass - - @ConfigOption - @Description(""" - The S3 server side encryption to be used when saving objects on S3. Can be `AES256` or `aws:kms` (default: none). - """) - final String storageEncryption - - @ConfigOption - @Description(""" - The AWS KMS key Id to be used to encrypt files stored in the target S3 bucket. - """) - final String storageKmsKeyId - @ConfigOption @Description(""" The target network throughput (in Gbps) used for S3 uploads and downloads (default: `10`). @@ -225,11 +171,10 @@ class AwsS3Config implements ConfigScope { // Maximum heap buffer size public static final long DEFAULT_MAX_DOWNLOAD_BUFFER_SIZE = 400 * _1MB; - AwsS3Config(Map opts) { - this.anonymous = opts.anonymous as Boolean + AwsS3ClientConfig(Map opts) { + super(opts) this.connectionTimeout = opts.connectionTimeout as Integer this.debug = opts.debug as Boolean - this.endpoint = opts.endpoint ?: SysEnv.get('AWS_S3_ENDPOINT') if( endpoint && FileHelper.getUrlProtocol(endpoint) !in ['http','https'] ) throw new IllegalArgumentException("S3 endpoint must begin with http:// or https:// prefix - offending value: '${endpoint}'") this.maxConcurrency = opts.maxConcurrency as Integer @@ -244,13 +189,7 @@ class AwsS3Config implements ConfigScope { this.proxyScheme = opts.proxyScheme this.proxyUsername = opts.proxyUsername this.proxyPassword = opts.proxyPassword - this.requesterPays = opts.requesterPays as Boolean - this.s3Acl = parseS3Acl(opts.s3Acl as String) - this.s3PathStyleAccess = opts.s3PathStyleAccess as Boolean this.socketTimeout = opts.socketTimeout as Integer - this.storageClass = parseStorageClass((opts.storageClass ?: opts.uploadStorageClass) as String) // 'uploadStorageClass' is kept for legacy purposes - this.storageEncryption = parseStorageEncryption(opts.storageEncryption as String) - this.storageKmsKeyId = opts.storageKmsKeyId this.targetThroughputInGbps = opts.targetThroughputInGbps as Double this.uploadChunkSize = opts.uploadChunkSize as MemoryUnit this.uploadMaxAttempts = opts.uploadMaxAttempts as Integer @@ -259,65 +198,6 @@ class AwsS3Config implements ConfigScope { checkDownloadBufferParams() } - private String parseStorageClass(String value) { - if( value in [null, 'STANDARD', 'STANDARD_IA', 'ONEZONE_IA', 'INTELLIGENT_TIERING', 'REDUCED_REDUNDANCY' ]) { - if (value == 'REDUCED_REDUNDANCY') { - log.warn "AWS S3 Storage Class `REDUCED_REDUNDANCY` is deprecated (and more expensive than `STANDARD`). For cost savings, look to `STANDARD_IA`, `ONEZONE_IA`, `INTELLIGENT_TIERING`." - } - return value - } else { - log.warn "Unsupported AWS storage-class: $value" - return null - } - } - - private String parseStorageEncryption(String value) { - if( value in [null,'AES256','aws:kms'] ) - return value - // - log.warn "Unsupported AWS storage-encryption: $value" - return null - } - - // ==== getters ===== - - Boolean getPathStyleAccess() { - return s3PathStyleAccess - } - - boolean isCustomEndpoint() { - endpoint && !endpoint.endsWith(".amazonaws.com") - } - - /** - * Looks for the region defined in endpoints such as https://xxx..amazonaws.com - * @returns Region defined in the endpoint. Null if no endpoint or custom endpoint is defined, - * or when URI region subdomain doesn't match with a region (global or multi-region access point) - */ - String getEndpointRegion(){ - if( !endpoint || isCustomEndpoint() ) - return null - - try { - String host = URI.create(endpoint).getHost() - final hostDomains = host.split('\\.') - if (hostDomains.size() < 3) { - log.debug("Region subdomain doesn't exist in endpoint '${endpoint}'") - return null - } - final region = hostDomains[hostDomains.size()-3] - if (!Region.regions().contains(Region.of(region))){ - log.debug("Region '${region}' extracted from endpoint '${endpoint}' is not valid") - return null - } - return region - - } catch (Exception e){ - log.debug("Exception getting region from endpoint: '${endpoint}' - ${e.message}") - return null - } - } - Map getAwsClientConfig() { return [ connection_timeout: connectionTimeout?.toString(), diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsS3CommonConfig.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsS3CommonConfig.groovy new file mode 100644 index 0000000000..a48cbba3eb --- /dev/null +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsS3CommonConfig.groovy @@ -0,0 +1,192 @@ +/* + * Copyright 2020-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.config + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.SysEnv +import nextflow.config.spec.ConfigOption +import nextflow.config.spec.ConfigScope +import nextflow.file.FileHelper +import nextflow.script.dsl.Description +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.model.ObjectCannedACL + +import static nextflow.cloud.aws.util.AwsHelper.parseS3Acl + +/** + * Model AWS S3 bucket config settings + * + * @author Jorge Ejarque + */ +@Slf4j +@CompileStatic +class AwsS3CommonConfig implements ConfigScope { + + @ConfigOption + @Description(""" + Allow the access of public S3 buckets without providing AWS credentials (default: `false`). Any service that does not accept unsigned requests will return a service access error. + """) + final Boolean anonymous + + @ConfigOption + @Description(""" + The AWS S3 API entry point e.g. `https://s3-us-west-1.amazonaws.com`. The endpoint must include the protocol prefix e.g. `https://`. + """) + final String endpoint + + @ConfigOption + @Description(""" + S3 Bucket specific AWS region (e.g. `us-east-1`). + """) + final String region + + @ConfigOption + @Description(""" + Use [Requester Pays](https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html) for S3 buckets (default: `false`). + """) + final Boolean requesterPays + + @ConfigOption(types=[String]) + @Description(""" + Specify predefined bucket permissions, also known as [canned ACL](https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl). Can be one of `Private`, `PublicRead`, `PublicReadWrite`, `AuthenticatedRead`, `LogDeliveryWrite`, `BucketOwnerRead`, `BucketOwnerFullControl`, or `AwsExecRead`. + """) + final ObjectCannedACL s3Acl + + @ConfigOption + @Description(""" + Use the path-based access model to access objects in S3-compatible storage systems (default: `false`). + """) + final Boolean s3PathStyleAccess + + @ConfigOption + @Description(""" + The S3 storage class applied to stored objects, one of \\[`STANDARD`, `STANDARD_IA`, `ONEZONE_IA`, `INTELLIGENT_TIERING`\\] (default: `STANDARD`). + """) + final String storageClass + + @ConfigOption + @Description(""" + The S3 server side encryption to be used when saving objects on S3. Can be `AES256` or `aws:kms` (default: none). + """) + final String storageEncryption + + @ConfigOption + @Description(""" + The AWS KMS key Id to be used to encrypt files stored in the target S3 bucket. + """) + final String storageKmsKeyId + + AwsS3CommonConfig(Map opts) { + this.anonymous = opts.anonymous as Boolean + this.endpoint = opts.endpoint ?: SysEnv.get('AWS_S3_ENDPOINT') + if( endpoint && FileHelper.getUrlProtocol(endpoint) !in ['http','https'] ) + throw new IllegalArgumentException("S3 endpoint must begin with http:// or https:// prefix - offending value: '${endpoint}'") + this.region = opts.region as String + this.requesterPays = opts.requesterPays as Boolean + this.s3Acl = parseS3Acl(opts.s3Acl as String) + this.s3PathStyleAccess = opts.s3PathStyleAccess as Boolean + this.storageClass = parseStorageClass((opts.storageClass ?: opts.uploadStorageClass) as String) // 'uploadStorageClass' is kept for legacy purposes + this.storageEncryption = parseStorageEncryption(opts.storageEncryption as String) + this.storageKmsKeyId = opts.storageKmsKeyId as String + } + + private String parseStorageClass(String value) { + if( value in [null, 'STANDARD', 'STANDARD_IA', 'ONEZONE_IA', 'INTELLIGENT_TIERING', 'REDUCED_REDUNDANCY' ]) { + if (value == 'REDUCED_REDUNDANCY') { + log.warn "AWS S3 Storage Class `REDUCED_REDUNDANCY` is deprecated (and more expensive than `STANDARD`). For cost savings, look to `STANDARD_IA`, `ONEZONE_IA`, `INTELLIGENT_TIERING`." + } + return value + } else { + log.warn "Unsupported AWS storage-class: $value" + return null + } + } + + private String parseStorageEncryption(String value) { + if( value in [null,'AES256','aws:kms'] ) + return value + // + log.warn "Unsupported AWS storage-encryption: $value" + return null + } + + // ==== getters ===== + + Boolean getPathStyleAccess() { + return s3PathStyleAccess + } + + boolean isCustomEndpoint() { + endpoint && !endpoint.endsWith(".amazonaws.com") + } + + /** + * Looks for the region defined in endpoints such as https://xxx..amazonaws.com + * @returns Region defined in the endpoint. Null if no endpoint or custom endpoint is defined, + * or when URI region subdomain doesn't match with a region (global or multi-region access point) + */ + String getEndpointRegion(){ + if( !endpoint || isCustomEndpoint() ) + return null + + try { + String host = URI.create(endpoint).getHost() + final hostDomains = host.split('\\.') + if (hostDomains.size() < 3) { + log.debug("Region subdomain doesn't exist in endpoint '${endpoint}'") + return null + } + final region = hostDomains[hostDomains.size()-3] + if (!Region.regions().contains(Region.of(region))){ + log.debug("Region '${region}' extracted from endpoint '${endpoint}' is not valid") + return null + } + return region + + } catch (Exception e){ + log.debug("Exception getting region from endpoint: '${endpoint}' - ${e.message}") + return null + } + } + + Map toBucketConfigMap(){ + return ([ + anonymous: anonymous, + endpoint: endpoint, + requesterPays: requesterPays, + region: region, + s3PathStyleAccess: s3PathStyleAccess, + s3Acl: s3Acl?.toString(), + storageEncryption: storageEncryption, + storageKmsKeyId: storageKmsKeyId, + storageClass: storageClass + ] as Map).findAll { k, v -> v != null } + } + + Map toLegacyConfig() { + return [ + requester_pays: requesterPays?.toString(), + s3_acl: s3Acl?.toString(), + storage_encryption: storageEncryption?.toString(), + storage_kms_key_id: storageKmsKeyId?.toString(), + upload_storage_class: storageClass?.toString() + ].findAll { k, v -> v != null } + } + +} diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java index 09c74502cf..016d2aac02 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java @@ -84,12 +84,15 @@ public class S3Client { private boolean global; - public S3Client(AwsClientFactory factory, Properties props, boolean global) { + private String bucketName; + + public S3Client(AwsClientFactory factory, Properties props, String bucketName, boolean global) { S3SyncClientConfiguration clientConfig = S3SyncClientConfiguration.create(props); this.factory = factory; this.props = props; this.global = global; - this.client = factory.getS3Client(clientConfig, global); + this.bucketName = bucketName; + this.client = factory.getS3Client(clientConfig, bucketName, global); this.semaphore = Threads.useVirtual() ? new Semaphore(clientConfig.getMaxConnections()) : null; this.callerAccount = fetchCallerAccount(); } @@ -309,7 +312,7 @@ synchronized ExtendedS3TransferManager transferManager() { if( transferManager == null ) { transferPool = ThreadPoolManager.create("S3TransferManager"); var delegate = S3TransferManager.builder() - .s3Client(factory.getS3AsyncClient(S3AsyncClientConfiguration.create(props), global)) + .s3Client(factory.getS3AsyncClient(S3AsyncClientConfiguration.create(props), bucketName, global)) .executor(transferPool) .build(); transferManager = new ExtendedS3TransferManager(delegate, props); diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java index 713ee652a6..8f721a9a30 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java @@ -732,16 +732,18 @@ public void setAttribute(Path path, String attribute, Object value, protected S3FileSystem createFileSystem(URI uri, AwsConfig awsConfig) { // try to load amazon props Properties props = loadAmazonProperties(); - // add properties for legacy compatibility - props.putAll(awsConfig.getS3LegacyProperties()); final String bucketName = S3Path.bucketName(uri); + + // add properties for legacy compatibility + props.putAll(awsConfig.getS3LegacyProperties(bucketName)); + // do not use `global` flag for custom endpoint because // when enabling that flag, it overrides S3 endpoints with AWS global endpoint // see https://github.com/nextflow-io/nextflow/pull/5779 final boolean global = bucketName!=null && !awsConfig.getS3Config().isCustomEndpoint(); - final AwsClientFactory factory = new AwsClientFactory(awsConfig, awsConfig.resolveS3Region()); - final S3Client client = new S3Client(factory, props, global); + final AwsClientFactory factory = new AwsClientFactory(awsConfig, awsConfig.resolveS3Region(bucketName)); + final S3Client client = new S3Client(factory, props, bucketName, global); // set the client acl client.setCannedAcl(getProp(props, "s_3_acl", "s3_acl", "s3acl", "s3Acl")); diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/ExtendedS3TransferManager.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/ExtendedS3TransferManager.java index 3a6c37da0d..97b4dce4c0 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/ExtendedS3TransferManager.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/ExtendedS3TransferManager.java @@ -32,7 +32,7 @@ import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; -import static nextflow.cloud.aws.config.AwsS3Config.*; +import static nextflow.cloud.aws.config.AwsS3ClientConfig.*; /** * Extends the S3 Transfer Manager with semaphores to limit concurrent diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/util/S3BashLib.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/util/S3BashLib.groovy index 3f5c569ee3..a1a8173074 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/util/S3BashLib.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/util/S3BashLib.groovy @@ -21,7 +21,6 @@ import nextflow.Global import nextflow.Session import nextflow.cloud.aws.batch.AwsOptions import nextflow.executor.BashFunLib -import software.amazon.awssdk.services.s3.model.ObjectCannedACL /** * AWS S3 helper class @@ -29,15 +28,10 @@ import software.amazon.awssdk.services.s3.model.ObjectCannedACL @CompileStatic class S3BashLib extends BashFunLib { - private String storageClass = 'STANDARD' - private String storageEncryption = '' - private String storageKmsKeyId = '' private String debug = '' private String cli = 'aws' private String retryMode private String s5cmdPath - private String acl = '' - private String requesterPays = '' S3BashLib withCliPath(String cliPath) { if( cliPath ) @@ -56,40 +50,11 @@ class S3BashLib extends BashFunLib { return this } - S3BashLib withStorageClass(String value) { - if( value ) - this.storageClass = value - return this - } - - S3BashLib withStorageEncryption(String value) { - if( value ) - this.storageEncryption = value ? "--sse $value " : '' - return this - } - - S3BashLib withStorageKmsKeyId(String value) { - if( value ) - this.storageKmsKeyId = value ? "--sse-kms-key-id $value " : '' - return this - } - S3BashLib withS5cmdPath(String value) { this.s5cmdPath = value return this } - S3BashLib withAcl(ObjectCannedACL value) { - if( value ) - this.acl = "--acl $value " - return this - } - - S3BashLib withRequesterPays(Boolean value) { - this.requesterPays = value ? "--request-payer requester " : '' - return this - } - protected String retryEnv() { if( !retryMode ) return '' @@ -111,12 +76,15 @@ class S3BashLib extends BashFunLib { nxf_s3_upload() { local name=\$1 local s3path=\$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "\$@" ) if [[ "\$name" == - ]]; then - $cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass - "\$s3path" + $cli s3 cp --only-show-errors "\${opts[@]}" - "\$s3path" elif [[ -d "\$name" ]]; then - $cli s3 cp --only-show-errors --recursive ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name" + $cli s3 cp --only-show-errors --recursive "\${opts[@]}" "\$name" "\$s3path/\$name" else - $cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name" + $cli s3 cp --only-show-errors "\${opts[@]}" "\$name" "\$s3path/\$name" fi } @@ -124,11 +92,14 @@ class S3BashLib extends BashFunLib { local source=\$1 local target=\$2 local file_name=\$(basename \$1) - local is_dir=\$($cli s3 ls \$source | grep -F "PRE \${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "\$@" ) + local is_dir=\$($cli s3 ls "\${opts[@]}" \$source | grep -F "PRE \${file_name}/" -c) if [[ \$is_dir == 1 ]]; then - $cli s3 cp --only-show-errors --recursive "\$source" "\$target" + $cli s3 cp --only-show-errors${debug} --recursive "\${opts[@]}" "\$source" "\$target" else - $cli s3 cp --only-show-errors "\$source" "\$target" + $cli s3 cp --only-show-errors${debug} "\${opts[@]}" "\$source" "\$target" fi } """.stripIndent(true) @@ -147,14 +118,17 @@ class S3BashLib extends BashFunLib { nxf_s3_upload() { local name=\$1 local s3path=\$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "\$@" ) if [[ "\$name" == - ]]; then local tmp=\$(nxf_mktemp) cp /dev/stdin \$tmp/\$name - $cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass \$tmp/\$name "\$s3path" + $cli cp "\${opts[@]}" \$tmp/\$name "\$s3path" elif [[ -d "\$name" ]]; then - $cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name/" "\$s3path/\$name/" + $cli cp "\${opts[@]}" "\$name/" "\$s3path/\$name/" else - $cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name" + $cli cp "\${opts[@]}" "\$name" "\$s3path/\$name" fi } @@ -162,11 +136,14 @@ class S3BashLib extends BashFunLib { local source=\$1 local target=\$2 local file_name=\$(basename \$1) - local is_dir=\$($cli ls \$source | grep -F "DIR \${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "\$@" ) + local is_dir=\$($cli ls \${opts[@]} \$source | grep -F "DIR \${file_name}/" -c) if [[ \$is_dir == 1 ]]; then - $cli cp "\$source/*" "\$target" + $cli cp \${opts[@]} "\$source/*" "\$target" else - $cli cp "\$source" "\$target" + $cli cp \${opts[@]} "\$source" "\$target" fi } """.stripIndent() @@ -186,14 +163,9 @@ class S3BashLib extends BashFunLib { .withDelayBetweenAttempts(opts.delayBetweenAttempts ) .withMaxTransferAttempts( opts.maxTransferAttempts ) .withCliPath( opts.awsCli ) - .withStorageClass(opts.storageClass ) - .withStorageEncryption( opts.storageEncryption ) - .withStorageKmsKeyId( opts.storageKmsKeyId ) .withRetryMode( opts.retryMode ) .withDebug( opts.debug ) .withS5cmdPath( opts.s5cmdPath ) - .withAcl( opts.s3Acl ) - .withRequesterPays( opts.requesterPays ) } static String script(AwsOptions opts) { diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/util/S3PathFactory.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/util/S3PathFactory.groovy index 328f44c0ed..c642aeb777 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/util/S3PathFactory.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/util/S3PathFactory.groovy @@ -61,7 +61,7 @@ class S3PathFactory extends FileSystemPathFactory { @Override protected String getUploadCmd(String source, Path target) { return target instanceof S3Path - ? AwsBatchFileCopyStrategy.uploadCmd(source,target) + ? AwsBatchFileCopyStrategy.uploadCmd(source,target,config()) : null } diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchFileCopyStrategyTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchFileCopyStrategyTest.groovy index 0e54b42fb8..e5bf1e0efc 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchFileCopyStrategyTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchFileCopyStrategyTest.groovy @@ -17,7 +17,11 @@ package nextflow.cloud.aws.batch import java.nio.file.Paths +import nextflow.cloud.aws.config.AwsConfig +import nextflow.cloud.aws.util.S3PathFactory import nextflow.processor.TaskBean +import nextflow.util.Escape +import software.amazon.awssdk.services.s3.model.ObjectCannedACL import spock.lang.Specification import test.TestHelper @@ -124,8 +128,6 @@ class AwsBatchFileCopyStrategyTest extends Specification { def script = copy.getBeforeStartScript() then: 1 * opts.getAwsCli() >> 'aws' - 1 * opts.getStorageClass() >> null - 1 * opts.getStorageEncryption() >> null script == '''\ # bash helper functions @@ -189,12 +191,15 @@ class AwsBatchFileCopyStrategyTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -202,105 +207,17 @@ class AwsBatchFileCopyStrategyTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - aws s3 cp --only-show-errors "$source" "$target" + aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } '''.stripIndent(true) - - when: - script = copy.getBeforeStartScript() - then: - 1 * opts.getAwsCli() >> '/foo/aws' - 1 * opts.getStorageClass() >> 'STANDARD_IA' - 1 * opts.getStorageEncryption() >> 'AES256' - - script == '''\ - # bash helper functions - nxf_cp_retry() { - local max_attempts=1 - local timeout=10 - local attempt=0 - local exitCode=0 - while (( \$attempt < \$max_attempts )) - do - if "\$@" - then - return 0 - else - exitCode=\$? - fi - if [[ \$exitCode == 0 ]] - then - break - fi - nxf_sleep \$timeout - attempt=\$(( attempt + 1 )) - timeout=\$(( timeout * 2 )) - done - } - - nxf_parallel() { - IFS=$'\\n' - local cmd=("$@") - local cpus=$(nproc 2>/dev/null || < /proc/cpuinfo grep '^process' -c) - local max=$(if (( cpus>4 )); then echo 4; else echo $cpus; fi) - local i=0 - local pid=() - ( - set +u - while ((i<${#cmd[@]})); do - local copy=() - for x in "${pid[@]}"; do - # if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code - # see https://github.com/nextflow-io/nextflow/pull/4050 - [[ -e /proc/$x ]] && copy+=($x) || wait $x - done - pid=("${copy[@]}") - - if ((${#pid[@]}>=$max)); then - nxf_sleep 0.2 - else - eval "${cmd[$i]}" & - pid+=($!) - ((i+=1)) - fi - done - for p in "${pid[@]}"; do - wait $p - done - ) - unset IFS - } - - # aws helper - nxf_s3_upload() { - local name=$1 - local s3path=$2 - if [[ "$name" == - ]]; then - /foo/aws s3 cp --only-show-errors --sse AES256 --storage-class STANDARD_IA - "$s3path" - elif [[ -d "$name" ]]; then - /foo/aws s3 cp --only-show-errors --recursive --sse AES256 --storage-class STANDARD_IA "$name" "$s3path/$name" - else - /foo/aws s3 cp --only-show-errors --sse AES256 --storage-class STANDARD_IA "$name" "$s3path/$name" - fi - } - - nxf_s3_download() { - local source=$1 - local target=$2 - local file_name=$(basename $1) - local is_dir=$(/foo/aws s3 ls $source | grep -F "PRE ${file_name}/" -c) - if [[ $is_dir == 1 ]]; then - /foo/aws s3 cp --only-show-errors --recursive "$source" "$target" - else - /foo/aws s3 cp --only-show-errors "$source" "$target" - fi - } - '''.stripIndent(true) } def 'should return env variables' () { @@ -379,5 +296,44 @@ class AwsBatchFileCopyStrategyTest extends Specification { script == "downloads+=(\"nxf_s3_download s3:/$file bar.txt\")" as String } + + def 'should return staging with arguments'() { + given: + def config = [ + client: [ endpoint: 'https://custom.endpoint.com', requesterPays: true, s3Acl: ObjectCannedACL.PRIVATE.toString(), storageEncryption: 'aws:kms', storageKmsKeyId: 'my-kms-key'], + buckets: [ + 'bucket-1': [region: "eu-south-1", anonymous: true, storageClass: 'STANDARD_IA'], + ] + ] + def cfg = new AwsConfig(config) + def opts = new AwsOptions(awsConfig: cfg) + + def bean = Mock(TaskBean) + def copy = new AwsBatchFileCopyStrategy(bean, opts) + + // Test download with region and anonymous + when: + def file = S3PathFactory.create("s3:///bucket-1/bar.txt") + def script = copy.stageInputFile( file, 'bar.txt') + then: + script == "downloads+=(\"nxf_s3_download s3:/${Escape.path(file)} bar.txt --region eu-south-1 --no-sign-request --endpoint-url https://custom.endpoint.com\")" as String + + // Test upload with storage class, region, anonymous, encryption, kms key, acl, and requester pays + when: + def target = S3PathFactory.create("s3:///bucket-1/bar") + script = copy.getUnstageOutputFilesScript(['file.txt'], target) + then: + script.trim() == ''' + uploads=() + IFS=$'\\n' + for name in $(eval "ls -1d file.txt" | sort | uniq); do + uploads+=("nxf_s3_upload '$name' s3://bucket-1/bar --storage-class STANDARD_IA --region eu-south-1 --no-sign-request --sse aws:kms --sse-kms-key-id my-kms-key --acl private --request-payer requester --endpoint-url https://custom.endpoint.com") + done + unset IFS + nxf_parallel "${uploads[@]}" + ''' + .stripIndent().trim() + } + } diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchScriptLauncherTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchScriptLauncherTest.groovy index a792b4355c..d8e2bc8264 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchScriptLauncherTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchScriptLauncherTest.groovy @@ -123,12 +123,15 @@ class AwsBatchScriptLauncherTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - /conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + /conda/bin/aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - /conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + /conda/bin/aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - /conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + /conda/bin/aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -136,11 +139,14 @@ class AwsBatchScriptLauncherTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(/conda/bin/aws --region eu-west-1 s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(/conda/bin/aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - /conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --recursive "$source" "$target" + /conda/bin/aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - /conda/bin/aws --region eu-west-1 s3 cp --only-show-errors "$source" "$target" + /conda/bin/aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } @@ -302,12 +308,15 @@ class AwsBatchScriptLauncherTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -315,11 +324,14 @@ class AwsBatchScriptLauncherTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - aws s3 cp --only-show-errors "$source" "$target" + aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } @@ -474,12 +486,15 @@ class AwsBatchScriptLauncherTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -487,11 +502,14 @@ class AwsBatchScriptLauncherTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - aws s3 cp --only-show-errors "$source" "$target" + aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } @@ -590,12 +608,15 @@ class AwsBatchScriptLauncherTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -603,11 +624,14 @@ class AwsBatchScriptLauncherTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - aws s3 cp --only-show-errors "$source" "$target" + aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy index f865f2e386..2824622541 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy @@ -187,7 +187,10 @@ class AwsBatchTaskHandlerTest extends Specification { task.getConfig() >> new TaskConfig(memory: '2GB', cpus: 4, accelerator: 2) task.getWorkDirStr() >> 's3://my-bucket/work/dir' and: - def executor = Spy(AwsBatchExecutor) { getAwsOptions()>> new AwsOptions() } + def executor = Spy(AwsBatchExecutor) { + getAwsOptions()>> new AwsOptions() + getWorkDir()>>S3PathFactory.create('s3:///my-bucket/work/dir') + } and: def handler = Spy(new AwsBatchTaskHandler(executor: executor)) @@ -217,6 +220,7 @@ class AwsBatchTaskHandlerTest extends Specification { and: def executor = Spy(AwsBatchExecutor) { getAwsOptions() >> { new AwsOptions(awsConfig: new AwsConfig(batch:[cliPath: '/bin/aws'])) } + getWorkDir() >> S3PathFactory.create('s3:///my-bucket/work/dir') } and: def handler = Spy(new AwsBatchTaskHandler(executor: executor)) @@ -918,7 +922,9 @@ class AwsBatchTaskHandlerTest extends Specification { def 'should render submit command' () { given: - def executor = Spy(AwsBatchExecutor) + def executor = Spy(AwsBatchExecutor){ + getWorkDir() >> S3PathFactory.create('s3:///work') + } and: def handler = Spy(new AwsBatchTaskHandler(executor: executor)) { fusionEnabled() >> false @@ -935,19 +941,19 @@ class AwsBatchTaskHandlerTest extends Specification { when: result = handler.getSubmitCommand() then: - executor.getAwsOptions() >> Mock(AwsOptions) { - getAwsCli() >> 'aws'; - getDebug() >> true - getStorageEncryption() >> 'aws:kms' - getStorageKmsKeyId() >> 'kms-key-123' - } + executor.getAwsOptions() >> new AwsOptions(awsConfig: new AwsConfig( + batch: [cliPath: '/bin/aws'], + client: [debug: true, storageEncryption: 'aws:kms', storageKmsKeyId: 'kms-key-123'] + )) then: - result.join(' ') == 'bash -o pipefail -c trap "[[ -n \\$pid ]] && kill -TERM \\$pid" TERM; trap "{ ret=$?; aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id kms-key-123 --debug .command.log s3://work/.command.log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id kms-key-123 --debug s3://work/.command.run - | bash > >(tee .command.log) 2>&1 & pid=$!; wait $pid' + result.join(' ') == 'bash -o pipefail -c trap "[[ -n \\$pid ]] && kill -TERM \\$pid" TERM; trap "{ ret=$?; /bin/aws s3 cp --only-show-errors --debug --storage-class STANDARD --sse aws:kms --sse-kms-key-id kms-key-123 .command.log s3://work/.command.log||true; exit $ret; }" EXIT; /bin/aws s3 cp --only-show-errors --debug --storage-class STANDARD --sse aws:kms --sse-kms-key-id kms-key-123 s3://work/.command.run - | bash > >(tee .command.log) 2>&1 & pid=$!; wait $pid' } def 'should render submit command with s5cmd' () { given: - def executor = Spy(AwsBatchExecutor) + def executor = Spy(AwsBatchExecutor){ + getWorkDir() >> S3PathFactory.create('s3:///work') + } and: def handler = Spy(new AwsBatchTaskHandler(executor: executor)) { fusionEnabled() >> false @@ -964,13 +970,12 @@ class AwsBatchTaskHandlerTest extends Specification { when: result = handler.getSubmitCommand() then: - executor.getAwsOptions() >> Mock(AwsOptions) { - getS5cmdPath() >> 's5cmd --debug' - getStorageEncryption() >> 'aws:kms' - getStorageKmsKeyId() >> 'kms-key-123' - } + executor.getAwsOptions() >> new AwsOptions(awsConfig: new AwsConfig( + batch: [platformType: 'fargate', cliPath: 's5cmd --debug'], + client: [storageEncryption: 'aws:kms', storageKmsKeyId: 'kms-key-123'] + )) then: - result.join(' ') == 'bash -o pipefail -c trap "[[ -n \\$pid ]] && kill -TERM \\$pid" TERM; trap "{ ret=$?; s5cmd --debug cp --sse aws:kms --sse-kms-key-id kms-key-123 .command.log s3://work/.command.log||true; exit $ret; }" EXIT; s5cmd --debug cat s3://work/.command.run | bash > >(tee .command.log) 2>&1 & pid=$!; wait $pid' + result.join(' ') == 'bash -o pipefail -c trap "[[ -n \\$pid ]] && kill -TERM \\$pid" TERM; trap "{ ret=$?; s5cmd --debug cp --storage-class STANDARD --sse aws:kms --sse-kms-key-id kms-key-123 .command.log s3://work/.command.log||true; exit $ret; }" EXIT; s5cmd --debug cat s3://work/.command.run | bash > >(tee .command.log) 2>&1 & pid=$!; wait $pid' } diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsOptionsTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsOptionsTest.groovy index 06b0e645aa..9535f08570 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsOptionsTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsOptionsTest.groovy @@ -45,11 +45,6 @@ class AwsOptionsTest extends Specification { opts = new AwsOptions(awsConfig: new AwsConfig(batch: [cliPath: '/foo/bin/aws'])) then: opts.awsCli == '/foo/bin/aws' - - when: - opts = new AwsOptions(awsConfig: new AwsConfig(region: 'eu-west-1', batch: [cliPath: '/foo/bin/aws'])) - then: - opts.awsCli == '/foo/bin/aws --region eu-west-1' } def 'should get max connection' () { @@ -101,8 +96,6 @@ class AwsOptionsTest extends Specification { opts.maxParallelTransfers == 5 opts.maxTransferAttempts == 3 opts.delayBetweenAttempts.seconds == 9 - opts.storageClass == 'STANDARD' - opts.storageEncryption == 'AES256' opts.region == 'aws-west-2' opts.jobRole == 'aws::foo::bar' opts.volumes == ['/foo','/this:/that'] @@ -114,86 +107,17 @@ class AwsOptionsTest extends Specification { } - def 'should set aws kms key' () { - when: - def sess1 = Mock(Session) { - getConfig() >> [aws: [ client: [ storageKmsKeyId: 'my-kms-key']]] - } - and: - def opts = new AwsOptions(sess1) - then: - opts.storageKmsKeyId == 'my-kms-key' - opts.storageEncryption == null - - when: - def sess2 = Mock(Session) { - getConfig() >> [aws: [ client: [ storageKmsKeyId: 'my-kms-key', storageEncryption: 'aws:kms']]] - } - and: - def opts2 = new AwsOptions(sess2) - then: - opts2.storageKmsKeyId == 'my-kms-key' - opts2.storageEncryption == 'aws:kms' // <-- allow explicit `storageEncryption` - - } - - - - @Unroll - def 'should return aws options'() { - given: - def cfg = [ - aws: [client: [ - uploadStorageClass: awsStorClass, - storageEncryption : awsStorEncrypt], - batch: [ cliPath: awscliPath ]] - ] - def session = new Session(cfg) - - when: - def opts = new AwsOptions(session) - then: - opts.cliPath == awscliPath - opts.storageClass == awsStorClass - opts.storageEncryption == awsStorEncrypt - - where: - awscliPath | awsStorClass | awsStorEncrypt - null | null | null - '/foo/bin/aws' | 'STANDARD' | 'AES256' - - } - def 'should validate aws options' () { when: def opts = new AwsOptions(awsConfig: new AwsConfig([:])) then: opts.getCliPath() == null - opts.getStorageClass() == null - opts.getStorageEncryption() == null when: - opts = new AwsOptions(awsConfig: new AwsConfig(batch: [cliPath: '/foo/bin/aws'], client: [storageClass: 'STANDARD', storageEncryption: 'AES256'])) + opts = new AwsOptions(awsConfig: new AwsConfig(batch: [cliPath: '/foo/bin/aws'])) then: opts.getCliPath() == '/foo/bin/aws' - opts.getStorageClass() == 'STANDARD' - opts.getStorageEncryption() == 'AES256' - - when: - opts = new AwsOptions(awsConfig: new AwsConfig(client:[storageClass: 'foo'])) - then: - opts.getStorageClass() == null - - when: - opts = new AwsOptions(awsConfig: new AwsConfig(client:[storageEncryption: 'abr'])) - then: - opts.getStorageEncryption() == null - - when: - opts = new AwsOptions(awsConfig: new AwsConfig(client:[storageKmsKeyId: 'arn:aws:kms:eu-west-1:1234567890:key/e97ecf28-951e-4700-bf22-1bd416ec519f'])) - then: - opts.getStorageKmsKeyId() == 'arn:aws:kms:eu-west-1:1234567890:key/e97ecf28-951e-4700-bf22-1bd416ec519f' when: new AwsOptions(awsConfig: new AwsConfig(batch: [cliPath: 'bin/aws'])) @@ -242,24 +166,5 @@ class AwsOptionsTest extends Specification { [aws:[batch:[platformType: 'fargate', cliPath: 's5cmd --foo']]] | null | 's5cmd --foo' [aws:[batch:[platformType: 'fargate', cliPath: '/some/path/s5cmd --foo']]] | null | '/some/path/s5cmd --foo' } - - def 'should parse s3 acl' ( ) { - when: - def opts = new AwsOptions(new Session(aws:[client:[s3Acl: 'PublicRead']])) - then: - opts.getS3Acl() == ObjectCannedACL.PUBLIC_READ - - - when: - opts = new AwsOptions(new Session(aws:[client:[s3Acl: 'public-read']])) - then: - opts.getS3Acl() == ObjectCannedACL.PUBLIC_READ - - - when: - opts = new AwsOptions(new Session(aws:[client:[s3Acl: 'unknown']])) - then: - thrown(IllegalArgumentException) - } } diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/config/AwsConfigTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/config/AwsConfigTest.groovy index bcfad6c708..52d01c7e31 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/config/AwsConfigTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/config/AwsConfigTest.groovy @@ -18,6 +18,7 @@ package nextflow.cloud.aws.config import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.model.ObjectCannedACL import java.nio.file.Files @@ -131,4 +132,42 @@ class AwsConfigTest extends Specification { [region: "eu-south-1", client: [endpoint: "https://s3.eu-west-1.amazonaws.com"]] | Region.EU_WEST_1.id() [region: "eu-south-1", client: [endpoint: "https://bucket.s3-global.amazonaws.com"]] | Region.EU_SOUTH_1.id() } + + def 'should resolve specific bucket config' () { + given: + def config = [ + client: [endpoint: "http://custom.endpoint.com", requesterPays: true, s3Acl: ObjectCannedACL.PRIVATE.toString()], + buckets: [ + 'bucket-1': [endpoint: "http://custom2.endpoint2.com", region: "eu-south-1", anonymous: true, requesterPays: false] + ] + ] + def awsConfig = new AwsConfig(config) + + when: + def bucketConfig = awsConfig.getBucketConfig('bucket-1') + then: + bucketConfig.endpoint == "http://custom2.endpoint2.com" + bucketConfig.requesterPays == false + bucketConfig.region == "eu-south-1" + bucketConfig.anonymous == true + bucketConfig.s3Acl == ObjectCannedACL.PRIVATE + + when: + bucketConfig = awsConfig.getBucketConfig('no-exist') + then: + bucketConfig.endpoint == "http://custom.endpoint.com" + bucketConfig.requesterPays == true + bucketConfig.region == null + bucketConfig.anonymous == null + bucketConfig.s3Acl == ObjectCannedACL.PRIVATE + + when: + bucketConfig = awsConfig.getBucketConfig(null) + then: + bucketConfig.endpoint == "http://custom.endpoint.com" + bucketConfig.requesterPays == true + bucketConfig.region == null + bucketConfig.anonymous == null + bucketConfig.s3Acl == ObjectCannedACL.PRIVATE + } } diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/config/AwsS3ConfigTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/config/AwsS3ConfigTest.groovy index d70d84de14..4a21a56a77 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/config/AwsS3ConfigTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/config/AwsS3ConfigTest.groovy @@ -31,7 +31,7 @@ class AwsS3ConfigTest extends Specification { def 'should get default config' () { when: - def client = new AwsS3Config([:]) + def client = new AwsS3ClientConfig([:]) then: !client.storageClass !client.storageKmsKeyId @@ -56,7 +56,7 @@ class AwsS3ConfigTest extends Specification { ] when: - def client = new AwsS3Config(OPTS) + def client = new AwsS3ClientConfig(OPTS) then: client.debug client.storageClass == 'STANDARD' @@ -74,7 +74,7 @@ class AwsS3ConfigTest extends Specification { ] when: - def client = new AwsS3Config(OPTS) + def client = new AwsS3ClientConfig(OPTS) then: client.storageClass == 'STANDARD_IA' } @@ -85,7 +85,7 @@ class AwsS3ConfigTest extends Specification { SysEnv.push(ENV) when: - def config = new AwsS3Config(CONFIG) + def config = new AwsS3ClientConfig(CONFIG) then: config.endpoint == EXPECTED @@ -103,7 +103,7 @@ class AwsS3ConfigTest extends Specification { @Unroll def 'should fail with invalid endpoint protocol' () { when: - new AwsS3Config(CONFIG) + new AwsS3ClientConfig(CONFIG) then: def e = thrown(IllegalArgumentException) e.message == EXPECTED @@ -160,7 +160,7 @@ class AwsS3ConfigTest extends Specification { @Unroll def 'should check is custom endpoint' () { given: - def config = new AwsS3Config(CONFIG) + def config = new AwsS3ClientConfig(CONFIG) expect: config.isCustomEndpoint() == EXPECTED @@ -178,7 +178,7 @@ class AwsS3ConfigTest extends Specification { @Unroll def 'should fail with invalid maxDownloadHeapMemory and minimumPartSize are incorrect' () { when: - new AwsS3Config(CONFIG) + new AwsS3ClientConfig(CONFIG) then: def e = thrown(IllegalArgumentException) e.message == EXPECTED @@ -193,7 +193,7 @@ class AwsS3ConfigTest extends Specification { @Unroll def 'should get region from endpoint' () { expect: - new AwsS3Config(CONFIG).getEndpointRegion() == REGION + new AwsS3ClientConfig(CONFIG).getEndpointRegion() == REGION where: CONFIG | REGION diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/nio/AwsS3NioTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/nio/AwsS3NioTest.groovy index ff4f247820..272fafb0b4 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/nio/AwsS3NioTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/nio/AwsS3NioTest.groovy @@ -78,6 +78,11 @@ class AwsS3NioTest extends Specification implements AwsS3BaseSpec { Global.session = Mock(Session) { getConfig()>>cfg } } + def cleanup() { + Global.config = null + Global.session = null + } + def 'should create a blob' () { given: def bucket = createBucket() diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/nio/S3OutputStreamTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/nio/S3OutputStreamTest.groovy index da39d171da..2947797d4a 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/nio/S3OutputStreamTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/nio/S3OutputStreamTest.groovy @@ -58,6 +58,11 @@ class S3OutputStreamTest extends Specification implements AwsS3BaseSpec { Global.session = Mock(Session) { getConfig() >> cfg } } + def cleanup() { + Global.config = null + Global.session = null + } + @IgnoreIf({System.getenv('NXF_SMOKE')}) @Requires({System.getenv('AWS_S3FS_ACCESS_KEY') && System.getenv('AWS_S3FS_SECRET_KEY')}) def 'should ensure multipart is used'() { diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/util/S3BashLibTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/util/S3BashLibTest.groovy index d05d4df994..19bec03814 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/util/S3BashLibTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/util/S3BashLibTest.groovy @@ -12,213 +12,10 @@ import spock.lang.Specification */ class S3BashLibTest extends Specification { - // -- legacy - - def 'should get uploader script' () { - - given: - def opts = Mock(AwsOptions) - - when: - def script = S3BashLib.script(opts) - then: - 1 * opts.getAwsCli() >> 'aws' - 1 * opts.getStorageClass() >> null - 1 * opts.getStorageEncryption() >> null - - script == '''\ - # bash helper functions - nxf_cp_retry() { - local max_attempts=1 - local timeout=10 - local attempt=0 - local exitCode=0 - while (( \$attempt < \$max_attempts )) - do - if "\$@" - then - return 0 - else - exitCode=\$? - fi - if [[ \$exitCode == 0 ]] - then - break - fi - nxf_sleep \$timeout - attempt=\$(( attempt + 1 )) - timeout=\$(( timeout * 2 )) - done - } - - nxf_parallel() { - IFS=$'\\n\' - local cmd=("$@") - local cpus=$(nproc 2>/dev/null || < /proc/cpuinfo grep '^process' -c) - local max=$(if (( cpus>4 )); then echo 4; else echo $cpus; fi) - local i=0 - local pid=() - ( - set +u - while ((i<${#cmd[@]})); do - local copy=() - for x in "${pid[@]}"; do - # if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code - # see https://github.com/nextflow-io/nextflow/pull/4050 - [[ -e /proc/$x ]] && copy+=($x) || wait $x - done - pid=("${copy[@]}") - - if ((${#pid[@]}>=$max)); then - nxf_sleep 0.2 - else - eval "${cmd[$i]}" & - pid+=($!) - ((i+=1)) - fi - done - for p in "${pid[@]}"; do - wait $p - done - ) - unset IFS - } - - # aws helper - nxf_s3_upload() { - local name=$1 - local s3path=$2 - if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" - elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" - else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" - fi - } - - nxf_s3_download() { - local source=$1 - local target=$2 - local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) - if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" - else - aws s3 cp --only-show-errors "$source" "$target" - fi - } - ''' - .stripIndent(true) - } - - def 'should set storage class and encryption' () { - - given: - def opts = Mock(AwsOptions) - - when: - def script = S3BashLib.script(opts) - then: - opts.getStorageClass() >> 'S-CLAZZ' - opts.getStorageEncryption() >> 'S-ENCRYPT' - opts.getAwsCli() >> '/foo/bin/aws' - opts.getMaxParallelTransfers() >> 33 - - script == '''\ - # bash helper functions - nxf_cp_retry() { - local max_attempts=1 - local timeout=10 - local attempt=0 - local exitCode=0 - while (( \$attempt < \$max_attempts )) - do - if "\$@" - then - return 0 - else - exitCode=\$? - fi - if [[ \$exitCode == 0 ]] - then - break - fi - nxf_sleep \$timeout - attempt=\$(( attempt + 1 )) - timeout=\$(( timeout * 2 )) - done - } - - nxf_parallel() { - IFS=$'\\n\' - local cmd=("$@") - local cpus=$(nproc 2>/dev/null || < /proc/cpuinfo grep '^process' -c) - local max=$(if (( cpus>33 )); then echo 33; else echo $cpus; fi) - local i=0 - local pid=() - ( - set +u - while ((i<${#cmd[@]})); do - local copy=() - for x in "${pid[@]}"; do - # if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code - # see https://github.com/nextflow-io/nextflow/pull/4050 - [[ -e /proc/$x ]] && copy+=($x) || wait $x - done - pid=("${copy[@]}") - - if ((${#pid[@]}>=$max)); then - nxf_sleep 0.2 - else - eval "${cmd[$i]}" & - pid+=($!) - ((i+=1)) - fi - done - for p in "${pid[@]}"; do - wait $p - done - ) - unset IFS - } - - # aws helper - nxf_s3_upload() { - local name=$1 - local s3path=$2 - if [[ "$name" == - ]]; then - /foo/bin/aws s3 cp --only-show-errors --sse S-ENCRYPT --storage-class S-CLAZZ - "$s3path" - elif [[ -d "$name" ]]; then - /foo/bin/aws s3 cp --only-show-errors --recursive --sse S-ENCRYPT --storage-class S-CLAZZ "$name" "$s3path/$name" - else - /foo/bin/aws s3 cp --only-show-errors --sse S-ENCRYPT --storage-class S-CLAZZ "$name" "$s3path/$name" - fi - } - - nxf_s3_download() { - local source=$1 - local target=$2 - local file_name=$(basename $1) - local is_dir=$(/foo/bin/aws s3 ls $source | grep -F "PRE ${file_name}/" -c) - if [[ $is_dir == 1 ]]; then - /foo/bin/aws s3 cp --only-show-errors --recursive "$source" "$target" - else - /foo/bin/aws s3 cp --only-show-errors "$source" "$target" - fi - } - ''' - .stripIndent(true) - - } - - // -- new test - - def 'should create base script' () { + def 'should create base script with default retry mode' () { given: Global.session = Mock(Session) { - getConfig() >> [:] - } + getConfig() >> [:] } expect: S3BashLib.script() == ''' @@ -229,12 +26,15 @@ class S3BashLibTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -242,11 +42,14 @@ class S3BashLibTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - aws s3 cp --only-show-errors "$source" "$target" + aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } '''.stripIndent(true) @@ -267,12 +70,15 @@ class S3BashLibTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -280,11 +86,14 @@ class S3BashLibTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - aws s3 cp --only-show-errors "$source" "$target" + aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } '''.stripIndent(true) @@ -302,12 +111,15 @@ class S3BashLibTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -315,17 +127,20 @@ class S3BashLibTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - aws s3 cp --only-show-errors "$source" "$target" + aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } '''.stripIndent(true) } - def 'should create base script with custom settings' () { + def 'should create base script with custom cli path' () { given: Global.session = Mock(Session) { getConfig() >> [aws:[batch: [cliPath: '/some/bin/aws', retryMode: 'legacy', maxTransferAttempts: 99]]] @@ -340,12 +155,15 @@ class S3BashLibTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - /some/bin/aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + /some/bin/aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - /some/bin/aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + /some/bin/aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - /some/bin/aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + /some/bin/aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -353,17 +171,20 @@ class S3BashLibTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(/some/bin/aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(/some/bin/aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - /some/bin/aws s3 cp --only-show-errors --recursive "$source" "$target" + /some/bin/aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - /some/bin/aws s3 cp --only-show-errors "$source" "$target" + /some/bin/aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } '''.stripIndent(true) } - def 'should create base script with custom options' () { + def 'should create script with custom options including core functions' () { given: def opts = Mock(AwsOptions) { getMaxParallelTransfers() >> 5 @@ -434,12 +255,15 @@ class S3BashLibTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -447,17 +271,19 @@ class S3BashLibTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - aws s3 cp --only-show-errors "$source" "$target" + aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } '''.stripIndent(true) } - def 'should create base script with options' () { given: def opts = Mock(AwsOptions) @@ -525,109 +351,15 @@ class S3BashLibTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" - elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" - else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" - fi - } - - nxf_s3_download() { - local source=$1 - local target=$2 - local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) - if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" - else - aws s3 cp --only-show-errors "$source" "$target" - fi - } - '''.stripIndent(true) - } - - def 'should create with storage encrypt' () { - given: - def sess1 = Mock(Session) { - getConfig() >> [aws: [ client: [ storageKmsKeyId: 'my-kms-key', storageEncryption: 'aws:kms']]] - } - and: - def opts = new AwsOptions(sess1) - - expect: - S3BashLib.script(opts) == '''\ - # bash helper functions - nxf_cp_retry() { - local max_attempts=5 - local timeout=10 - local attempt=0 - local exitCode=0 - while (( $attempt < $max_attempts )) - do - if "$@" - then - return 0 - else - exitCode=$? - fi - if [[ $exitCode == 0 ]] - then - break - fi - nxf_sleep $timeout - attempt=$(( attempt + 1 )) - timeout=$(( timeout * 2 )) - done - } - - nxf_parallel() { - IFS=$'\\n' - local cmd=("$@") - local cpus=$(nproc 2>/dev/null || < /proc/cpuinfo grep '^process' -c) - local max=$(if (( cpus>4 )); then echo 4; else echo $cpus; fi) - local i=0 - local pid=() - ( - set +u - while ((i<${#cmd[@]})); do - local copy=() - for x in "${pid[@]}"; do - # if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code - # see https://github.com/nextflow-io/nextflow/pull/4050 - [[ -e /proc/$x ]] && copy+=($x) || wait $x - done - pid=("${copy[@]}") - - if ((${#pid[@]}>=$max)); then - nxf_sleep 0.2 - else - eval "${cmd[$i]}" & - pid+=($!) - ((i+=1)) - fi - done - for p in "${pid[@]}"; do - wait $p - done - ) - unset IFS - } - - # aws cli retry config - export AWS_RETRY_MODE=standard - export AWS_MAX_ATTEMPTS=5 - # aws helper - nxf_s3_upload() { - local name=$1 - local s3path=$2 - if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id my-kms-key --storage-class STANDARD - "$s3path" + aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --sse aws:kms --sse-kms-key-id my-kms-key --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id my-kms-key --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -635,109 +367,14 @@ class S3BashLibTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - aws s3 cp --only-show-errors "$source" "$target" - fi - } - '''.stripIndent(true) - } - - - def 'should create with s3 acl' () { - given: - def sess1 = Mock(Session) { - getConfig() >> [aws: [ client: [ s3Acl: 'PublicRead']]] - } - and: - def opts = new AwsOptions(sess1) - - expect: - S3BashLib.script(opts) == '''\ - # bash helper functions - nxf_cp_retry() { - local max_attempts=5 - local timeout=10 - local attempt=0 - local exitCode=0 - while (( $attempt < $max_attempts )) - do - if "$@" - then - return 0 - else - exitCode=$? - fi - if [[ $exitCode == 0 ]] - then - break - fi - nxf_sleep $timeout - attempt=$(( attempt + 1 )) - timeout=$(( timeout * 2 )) - done - } - - nxf_parallel() { - IFS=$'\\n' - local cmd=("$@") - local cpus=$(nproc 2>/dev/null || < /proc/cpuinfo grep '^process' -c) - local max=$(if (( cpus>4 )); then echo 4; else echo $cpus; fi) - local i=0 - local pid=() - ( - set +u - while ((i<${#cmd[@]})); do - local copy=() - for x in "${pid[@]}"; do - # if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code - # see https://github.com/nextflow-io/nextflow/pull/4050 - [[ -e /proc/$x ]] && copy+=($x) || wait $x - done - pid=("${copy[@]}") - - if ((${#pid[@]}>=$max)); then - nxf_sleep 0.2 - else - eval "${cmd[$i]}" & - pid+=($!) - ((i+=1)) - fi - done - for p in "${pid[@]}"; do - wait $p - done - ) - unset IFS - } - - # aws cli retry config - export AWS_RETRY_MODE=standard - export AWS_MAX_ATTEMPTS=5 - # aws helper - nxf_s3_upload() { - local name=$1 - local s3path=$2 - if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --acl public-read --storage-class STANDARD - "$s3path" - elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --acl public-read --storage-class STANDARD "$name" "$s3path/$name" - else - aws s3 cp --only-show-errors --acl public-read --storage-class STANDARD "$name" "$s3path/$name" - fi - } - - nxf_s3_download() { - local source=$1 - local target=$2 - local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) - if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" - else - aws s3 cp --only-show-errors "$source" "$target" + aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } '''.stripIndent(true) @@ -755,51 +392,17 @@ class S3BashLibTest extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then local tmp=$(nxf_mktemp) cp /dev/stdin $tmp/$name - s5cmd cp --storage-class STANDARD $tmp/$name "$s3path" - elif [[ -d "$name" ]]; then - s5cmd cp --storage-class STANDARD "$name/" "$s3path/$name/" - else - s5cmd cp --storage-class STANDARD "$name" "$s3path/$name" - fi - } - - nxf_s3_download() { - local source=$1 - local target=$2 - local file_name=$(basename $1) - local is_dir=$(s5cmd ls $source | grep -F "DIR ${file_name}/" -c) - if [[ $is_dir == 1 ]]; then - s5cmd cp "$source/*" "$target" - else - s5cmd cp "$source" "$target" - fi - } - '''.stripIndent(true) - } - - def 'should create s5cmd script with acl' () { - given: - Global.session = Mock(Session) { - getConfig() >> [aws:[batch:[platformType: 'fargate', cliPath: 's5cmd'], client:[ s3Acl: 'PublicRead']]] - } - - expect: - S3BashLib.script() == ''' - # aws helper for s5cmd - nxf_s3_upload() { - local name=$1 - local s3path=$2 - if [[ "$name" == - ]]; then - local tmp=$(nxf_mktemp) - cp /dev/stdin $tmp/$name - s5cmd cp --acl public-read --storage-class STANDARD $tmp/$name "$s3path" + s5cmd cp "${opts[@]}" $tmp/$name "$s3path" elif [[ -d "$name" ]]; then - s5cmd cp --acl public-read --storage-class STANDARD "$name/" "$s3path/$name/" + s5cmd cp "${opts[@]}" "$name/" "$s3path/$name/" else - s5cmd cp --acl public-read --storage-class STANDARD "$name" "$s3path/$name" + s5cmd cp "${opts[@]}" "$name" "$s3path/$name" fi } @@ -807,15 +410,17 @@ class S3BashLibTest extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(s5cmd ls $source | grep -F "DIR ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(s5cmd ls ${opts[@]} $source | grep -F "DIR ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - s5cmd cp "$source/*" "$target" + s5cmd cp ${opts[@]} "$source/*" "$target" else - s5cmd cp "$source" "$target" + s5cmd cp ${opts[@]} "$source" "$target" fi } '''.stripIndent(true) } - -} +} \ No newline at end of file diff --git a/plugins/nf-amazon/src/test/nextflow/executor/AwsBatchExecutorTest.groovy b/plugins/nf-amazon/src/test/nextflow/executor/AwsBatchExecutorTest.groovy index 0b8c26633c..f3884bb85a 100644 --- a/plugins/nf-amazon/src/test/nextflow/executor/AwsBatchExecutorTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/executor/AwsBatchExecutorTest.groovy @@ -7,6 +7,8 @@ package nextflow.executor +import nextflow.file.FileHelper + import java.nio.file.Path import nextflow.Session @@ -121,6 +123,7 @@ class AwsBatchExecutorTest extends Specification { getS5cmdPath() >> { S5CMD ? 's5cmd' : null } getAwsCli() >> { 'aws' } } + getWorkDir() >> FileHelper.asPath(TASK_DIR) } expect: executor.getArrayLaunchCommand(TASK_DIR) == EXPECTED diff --git a/plugins/nf-amazon/src/test/nextflow/executor/BashWrapperBuilderWithS3Test.groovy b/plugins/nf-amazon/src/test/nextflow/executor/BashWrapperBuilderWithS3Test.groovy index 4f90e22aa2..d80b12ea67 100644 --- a/plugins/nf-amazon/src/test/nextflow/executor/BashWrapperBuilderWithS3Test.groovy +++ b/plugins/nf-amazon/src/test/nextflow/executor/BashWrapperBuilderWithS3Test.groovy @@ -34,6 +34,7 @@ class BashWrapperBuilderWithS3Test extends Specification { def 'should include s3 helpers' () { given: Global.session = Mock(Session) { getConfig() >> [:] } + Global.config = [:] and: def folder = Paths.get('/work/dir') def target = S3PathFactory.parse('s3://some/buck et') // <-- path with blank @@ -71,12 +72,15 @@ class BashWrapperBuilderWithS3Test extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -84,11 +88,14 @@ class BashWrapperBuilderWithS3Test extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - aws s3 cp --only-show-errors "$source" "$target" + aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi } @@ -98,6 +105,7 @@ class BashWrapperBuilderWithS3Test extends Specification { def 'should include s3 helpers and bash lib' () { given: Global.session = Mock(Session) { getConfig() >> [:] } + //Global.config = [:] and: def folder = Paths.get('/work/dir') def target = S3PathFactory.parse('s3://some/bucket') @@ -191,12 +199,15 @@ class BashWrapperBuilderWithS3Test extends Specification { nxf_s3_upload() { local name=$1 local s3path=$2 + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) if [[ "$name" == - ]]; then - aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path" + aws s3 cp --only-show-errors "${opts[@]}" - "$s3path" elif [[ -d "$name" ]]; then - aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$name" "$s3path/$name" else - aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name" + aws s3 cp --only-show-errors "${opts[@]}" "$name" "$s3path/$name" fi } @@ -204,11 +215,14 @@ class BashWrapperBuilderWithS3Test extends Specification { local source=$1 local target=$2 local file_name=$(basename $1) - local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c) + shift 2 + # Collect remaining args in an array to preserve quoting & handle empty safely + local opts=( "$@" ) + local is_dir=$(aws s3 ls "${opts[@]}" $source | grep -F "PRE ${file_name}/" -c) if [[ $is_dir == 1 ]]; then - aws s3 cp --only-show-errors --recursive "$source" "$target" + aws s3 cp --only-show-errors --recursive "${opts[@]}" "$source" "$target" else - aws s3 cp --only-show-errors "$source" "$target" + aws s3 cp --only-show-errors "${opts[@]}" "$source" "$target" fi }