diff --git a/src/main/java/com/upplication/s3fs/AmazonS3Client.java b/src/main/java/com/upplication/s3fs/AmazonS3Client.java new file mode 100644 index 00000000..47d74a2a --- /dev/null +++ b/src/main/java/com/upplication/s3fs/AmazonS3Client.java @@ -0,0 +1,356 @@ +/* + * Copyright 2020, Seqera Labs + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * The MIT License (MIT) + * + * Copyright (c) 2014 Javier Arnáiz @arnaix + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.upplication.s3fs; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.RegionUtils; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.Bucket; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.CopyObjectResult; +import com.amazonaws.services.s3.model.CopyPartRequest; +import com.amazonaws.services.s3.model.CopyPartResult; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.Owner; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.S3Object; +import com.upplication.s3fs.util.S3MultipartOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client Amazon S3 + * @see com.amazonaws.services.s3.AmazonS3Client + */ +public class AmazonS3Client { + + private static final Logger log = LoggerFactory.getLogger(AmazonS3Client.class); + + private AmazonS3 client; + + private CannedAccessControlList cannedAcl; + + public AmazonS3Client(AmazonS3 client){ + this.client = client; + } + /** + * @see com.amazonaws.services.s3.AmazonS3Client#listBuckets() + */ + public List listBuckets() { + return client.listBuckets(); + } + /** + * @see com.amazonaws.services.s3.AmazonS3Client#listObjects(ListObjectsRequest) + */ + public ObjectListing listObjects(ListObjectsRequest request) { + return client.listObjects(request); + } + /** + * @see com.amazonaws.services.s3.AmazonS3Client#getObject(String, String) + */ + public S3Object getObject(String bucketName, String key) { + return client.getObject(bucketName, key); + } + /** + * @see com.amazonaws.services.s3.AmazonS3Client#putObject(String, String, File) + */ + public PutObjectResult putObject(String bucket, String key, File file) { + PutObjectRequest req = new PutObjectRequest(bucket, key, file); + if( cannedAcl != null ) { + log.trace("Setting canned ACL={}; bucket={}; key={}", cannedAcl, bucket, key); + req.withCannedAcl(cannedAcl); + } + return client.putObject(req); + } + + /** + * + */ + public PutObjectResult putObject(PutObjectRequest request) { + if( cannedAcl != null ) { + log.trace("Setting canned ACL={}; bucket={}; key={}", cannedAcl, request.getBucketName(), request.getKey()); + request.withCannedAcl(cannedAcl); + } + return client.putObject(request); + } + + /** + * @see com.amazonaws.services.s3.AmazonS3Client#putObject(String, String, java.io.InputStream, ObjectMetadata) + */ + public PutObjectResult putObject(String bucket, String keyName, InputStream inputStream, ObjectMetadata metadata) { + PutObjectRequest req = new PutObjectRequest(bucket, keyName, inputStream, metadata); + if( cannedAcl != null ) { + log.trace("Setting canned ACL={}; bucket={} and stream", cannedAcl, bucket); + req.withCannedAcl(cannedAcl); + } + return client.putObject(req); + } + + /** + * @see com.amazonaws.services.s3.AmazonS3Client#deleteObject(String, String) + */ + public void deleteObject(String bucket, String key) { + client.deleteObject(bucket, key); + } + /** + * @see com.amazonaws.services.s3.AmazonS3Client#copyObject(String, String, String, String) + */ + public CopyObjectResult copyObject(String sourceBucketName, String sourceKey, String destinationBucketName, String destinationKey) { + CopyObjectRequest req = new CopyObjectRequest(sourceBucketName, sourceKey, destinationBucketName, destinationKey); + if( cannedAcl != null ) { + log.trace("Setting canned ACL={}; sourceBucketName={}; sourceKey={}; destinationBucketName={}; destinationKey={}", cannedAcl, sourceBucketName, sourceKey, destinationBucketName, destinationKey); + req.withCannedAccessControlList(cannedAcl); + } + return client.copyObject(req); + } + /** + * @see com.amazonaws.services.s3.AmazonS3Client#copyObject(CopyObjectRequest) + */ + public CopyObjectResult copyObject(CopyObjectRequest req) { + if( cannedAcl != null ) { + log.trace("Setting canned ACL={}; req={}", cannedAcl, req); + req.withCannedAccessControlList(cannedAcl); + } + return client.copyObject(req); + } + + /** + * @see com.amazonaws.services.s3.AmazonS3Client#getBucketAcl(String) + */ + public AccessControlList getBucketAcl(String bucket) { + return client.getBucketAcl(bucket); + } + /** + * @see com.amazonaws.services.s3.AmazonS3Client#getS3AccountOwner() + */ + public Owner getS3AccountOwner() { + return client.getS3AccountOwner(); + } + /** + * @see com.amazonaws.services.s3.AmazonS3Client#setEndpoint(String) + */ + public void setEndpoint(String endpoint) { + client.setEndpoint(endpoint); + } + + public void setCannedAcl(String acl) { + if( acl==null ) + return; + this.cannedAcl = CannedAccessControlList.valueOf(acl); + log.debug("Setting S3 canned ACL={} [{}]", this.cannedAcl, acl); + } + + public CannedAccessControlList getCannedAcl() { + return cannedAcl; + } + + public AmazonS3 getClient() { + return client; + } + + public void setRegion(String regionName) { + Region region = RegionUtils.getRegion(regionName); + if( region == null ) + throw new IllegalArgumentException("Not a valid S3 region name: " + regionName); + client.setRegion(region); + } + + + /** + * @see com.amazonaws.services.s3.AmazonS3Client#getObjectAcl(String, String) + */ + public AccessControlList getObjectAcl(String bucketName, String key) { + return client.getObjectAcl(bucketName, key); + } + /** + * @see com.amazonaws.services.s3.AmazonS3Client#getObjectMetadata(String, String) + */ + public ObjectMetadata getObjectMetadata(String bucketName, String key) { + return client.getObjectMetadata(bucketName, key); + } + + /** + * @see com.amazonaws.services.s3.AmazonS3Client#listNextBatchOfObjects(com.amazonaws.services.s3.model.ObjectListing) + */ + public ObjectListing listNextBatchOfObjects(ObjectListing objectListing) { + return client.listNextBatchOfObjects(objectListing); + } + + public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSize, S3MultipartOptions opts ) { + + final String sourceBucketName = s3Source.getBucket(); + final String sourceObjectKey = s3Source.getKey(); + final String targetBucketName = s3Target.getBucket(); + final String targetObjectKey = s3Target.getKey(); + + + // Step 2: Initialize + InitiateMultipartUploadRequest initiateRequest = + new InitiateMultipartUploadRequest(targetBucketName, targetObjectKey); + if( cannedAcl!=null ) { + log.debug("Setting canned ACL={}; initiateMultipartUpload targetBucketName={}, targetObjectKey={}", cannedAcl, targetBucketName, targetObjectKey); + initiateRequest.withCannedACL(cannedAcl); + } + + InitiateMultipartUploadResult initResult = client.initiateMultipartUpload(initiateRequest); + + // Step 3: Save upload Id. + String uploadId = initResult.getUploadId(); + + // Get object size. + if( objectSize == null ) { + GetObjectMetadataRequest metadataRequest = new GetObjectMetadataRequest(sourceBucketName, sourceObjectKey); + ObjectMetadata metadataResult = client.getObjectMetadata(metadataRequest); + objectSize = metadataResult.getContentLength(); // in bytes + } + + final int partSize = opts.getChunkSize(objectSize); + ExecutorService executor = S3OutputStream.getOrCreateExecutor(opts.getMaxThreads()); + List> copyPartRequests = new ArrayList<>(); + + // Step 4. create copy part requests + long bytePosition = 0; + for (int i = 1; bytePosition < objectSize; i++) + { + long lastPosition = bytePosition + partSize -1 >= objectSize ? objectSize - 1 : bytePosition + partSize - 1; + + CopyPartRequest copyRequest = new CopyPartRequest() + .withDestinationBucketName(targetBucketName) + .withDestinationKey(targetObjectKey) + .withSourceBucketName(sourceBucketName) + .withSourceKey(sourceObjectKey) + .withUploadId(uploadId) + .withFirstByte(bytePosition) + .withLastByte(lastPosition) + .withPartNumber(i); + + copyPartRequests.add( copyPart(client, copyRequest, opts) ); + bytePosition += partSize; + } + + log.trace("Starting multipart copy from: {} to {} -- uploadId={}; objectSize={}; chunkSize={}; numOfChunks={}", s3Source, s3Target, uploadId, objectSize, partSize, copyPartRequests.size() ); + + List etags = new ArrayList<>(); + List> responses; + try { + // Step 5. Start parallel parts copy + responses = executor.invokeAll(copyPartRequests); + + // Step 6. Fetch all results + for (Future response : responses) { + CopyPartResult result = response.get(); + etags.add(new PartETag(result.getPartNumber(), result.getETag())); + } + } + catch( Exception e ) { + throw new IllegalStateException("Multipart copy reported an unexpected error -- uploadId=" + uploadId, e); + } + + // Step 7. Complete copy operation + CompleteMultipartUploadRequest completeRequest = new + CompleteMultipartUploadRequest( + targetBucketName, + targetObjectKey, + initResult.getUploadId(), + etags); + + log.trace("Completing multipart copy uploadId={}", uploadId); + client.completeMultipartUpload(completeRequest); + } + + static Callable copyPart( final AmazonS3 client, final CopyPartRequest request, final S3MultipartOptions opts ) { + return new Callable() { + @Override + public CopyPartResult call() throws Exception { + return copyPart0(client,request,opts); + } + }; + } + + + static CopyPartResult copyPart0(AmazonS3 client, CopyPartRequest request, S3MultipartOptions opts) throws IOException, InterruptedException { + + final String objectId = request.getUploadId(); + final int partNumber = request.getPartNumber(); + final long len = request.getLastByte() - request.getFirstByte(); + + int attempt=0; + CopyPartResult result=null; + while( result == null ) { + attempt++; + try { + log.trace("Copying multipart {} with length {} attempt {} for {} ", partNumber, len, attempt, objectId); + result = client.copyPart(request); + } + catch (AmazonClientException e) { + if( attempt >= opts.getMaxAttempts() ) + throw new IOException("Failed to upload multipart data to Amazon S3", e); + + log.debug("Failed to upload part {} attempt {} for {} -- Caused by: {}", partNumber, attempt, objectId, e.getMessage()); + Thread.sleep(opts.getRetrySleepWithAttempt(attempt)); + } + } + + return result; + } + +} diff --git a/src/main/java/com/upplication/s3fs/S3FileSystemProvider.java b/src/main/java/com/upplication/s3fs/S3FileSystemProvider.java index 77aca376..b815cb02 100644 --- a/src/main/java/com/upplication/s3fs/S3FileSystemProvider.java +++ b/src/main/java/com/upplication/s3fs/S3FileSystemProvider.java @@ -1,80 +1,156 @@ -package com.upplication.s3fs; +/* + * Copyright 2020, Seqera Labs + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.internal.Constants; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.Bucket; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3Object; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import com.upplication.s3fs.attribute.S3BasicFileAttributeView; -import com.upplication.s3fs.attribute.S3BasicFileAttributes; -import com.upplication.s3fs.attribute.S3PosixFileAttributeView; -import com.upplication.s3fs.attribute.S3PosixFileAttributes; -import com.upplication.s3fs.util.AttributesUtils; -import com.upplication.s3fs.util.Cache; -import com.upplication.s3fs.util.S3Utils; +/* + * The MIT License (MIT) + * + * Copyright (c) 2014 Javier Arnáiz @arnaix + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.upplication.s3fs; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; -import java.nio.channels.FileChannel; +import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; -import java.nio.file.*; -import java.nio.file.attribute.*; +import java.nio.file.AccessDeniedException; +import java.nio.file.AccessMode; +import java.nio.file.CopyOption; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.DirectoryStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.FileStore; +import java.nio.file.FileSystem; +import java.nio.file.FileSystemAlreadyExistsException; +import java.nio.file.FileSystemNotFoundException; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.NoSuchFileException; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.BasicFileAttributes; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.FileAttributeView; +import java.nio.file.attribute.FileTime; import java.nio.file.spi.FileSystemProvider; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.BasicSessionCredentials; +import com.amazonaws.services.s3.S3ClientOptions; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.Grant; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.Owner; +import com.amazonaws.services.s3.model.Permission; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.SSEAlgorithm; +import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.upplication.s3fs.util.CommonUtils; +import com.upplication.s3fs.util.IOUtils; +import com.upplication.s3fs.util.S3MultipartOptions; +import com.upplication.s3fs.util.S3ObjectSummaryLookup; +import com.upplication.s3fs.util.S3UploadRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static com.google.common.collect.Sets.difference; -import static com.upplication.s3fs.AmazonS3Factory.*; import static java.lang.String.format; /** * Spec: - *

+ * * URI: s3://[endpoint]/{bucket}/{key} If endpoint is missing, it's assumed to * be the default S3 endpoint (s3.amazonaws.com) - *

- *

+ * * FileSystem roots: /{bucket}/ - *

- *

+ * * Treatment of S3 objects: - If a key ends in "/" it's considered a directory * *and* a regular file. Otherwise, it's just a regular file. - It is legal for * a key "xyz" and "xyz/" to exist at the same time. The latter is treated as a * directory. - If a file "a/b/c" exists but there's no "a" or "a/b/", these are * considered "implicit" directories. They can be listed, traversed and deleted. - *

- *

+ * * Deviations from FileSystem provider API: - Deleting a file or directory * always succeeds, regardless of whether the file/directory existed before the * operation was issued i.e. Files.delete() and Files.deleteIfExists() are * equivalent. - *

- *

+ * + * * Future versions of this provider might allow for a strict mode that mimics * the semantics of the FileSystem provider API on a best effort basis, at an * increased processing cost. - *

+ * + * */ public class S3FileSystemProvider extends FileSystemProvider { - public static final String CHARSET_KEY = "s3fs_charset"; - public static final String AMAZON_S3_FACTORY_CLASS = "s3fs_amazon_s3_factory"; + private static Logger log = LoggerFactory.getLogger(S3FileSystemProvider.class); + + public static final String ACCESS_KEY = "access_key"; + public static final String SECRET_KEY = "secret_key"; + + public static final String SESSION_TOKEN = "session_token"; - private static final ConcurrentMap fileSystems = new ConcurrentHashMap<>(); - private static final List PROPS_TO_OVERLOAD = Arrays.asList(ACCESS_KEY, SECRET_KEY, REQUEST_METRIC_COLLECTOR_CLASS, CONNECTION_TIMEOUT, MAX_CONNECTIONS, MAX_ERROR_RETRY, PROTOCOL, PROXY_DOMAIN, - PROXY_HOST, PROXY_PASSWORD, PROXY_PORT, PROXY_USERNAME, PROXY_WORKSTATION, SOCKET_SEND_BUFFER_SIZE_HINT, SOCKET_RECEIVE_BUFFER_SIZE_HINT, SOCKET_TIMEOUT, - USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS); + final AtomicReference fileSystem = new AtomicReference<>(); - private S3Utils s3Utils = new S3Utils(); - private Cache cache = new Cache(); + private final S3ObjectSummaryLookup s3ObjectSummaryLookup = new S3ObjectSummaryLookup(); + + private Properties props; @Override public String getScheme() { @@ -82,227 +158,89 @@ public String getScheme() { } @Override - public FileSystem newFileSystem(URI uri, Map env) { - validateUri(uri); - // get properties for the env or properties or system - Properties props = getProperties(uri, env); - validateProperties(props); - // try to get the filesystem by the key - String key = getFileSystemKey(uri, props); - if (fileSystems.containsKey(key)) { - throw new FileSystemAlreadyExistsException("File system " + uri.getScheme() + ':' + key + " already exists"); - } - // create the filesystem with the final properties, store and return - S3FileSystem fileSystem = createFileSystem(uri, props); - fileSystems.put(fileSystem.getKey(), fileSystem); - return fileSystem; - } - - private void validateProperties(Properties props) { - Preconditions.checkArgument( - (props.getProperty(ACCESS_KEY) == null && props.getProperty(SECRET_KEY) == null) - || (props.getProperty(ACCESS_KEY) != null && props.getProperty(SECRET_KEY) != null), "%s and %s should both be provided or should both be omitted", - ACCESS_KEY, SECRET_KEY); - } - - private Properties getProperties(URI uri, Map env) { - Properties props = loadAmazonProperties(); - addEnvProperties(props, env); - // and access key and secret key can be override - String userInfo = uri.getUserInfo(); - if (userInfo != null) { - String[] keys = userInfo.split(":"); - props.setProperty(ACCESS_KEY, keys[0]); - if (keys.length > 1) { - props.setProperty(SECRET_KEY, keys[1]); - } - } - return props; - } - - private String getFileSystemKey(URI uri) { - return getFileSystemKey(uri, getProperties(uri, null)); - } - - /** - * get the file system key represented by: the access key @ endpoint. - * Example: access-key@s3.amazonaws.com - * If uri host is empty then s3.amazonaws.com are used as host - * - * @param uri URI with the endpoint - * @param props with the access key property - * @return String - */ - protected String getFileSystemKey(URI uri, Properties props) { - // we don`t use uri.getUserInfo and uri.getHost because secret key and access key have special chars - // and dont return the correct strings - String uriString = uri.toString().replace("s3://", ""); - String authority = null; - int authoritySeparator = uriString.indexOf("@"); - - if (authoritySeparator > 0) { - authority = uriString.substring(0, authoritySeparator); - } - - if (authority != null) { - String host = uriString.substring(uriString.indexOf("@") + 1, uriString.length()); - int lastPath = host.indexOf("/"); - if (lastPath > -1) { - host = host.substring(0, lastPath); - } - if (host.length() == 0) { - host = Constants.S3_HOSTNAME; - } - return authority + "@" + host; - } else { - String accessKey = (String) props.get(ACCESS_KEY); - return (accessKey != null ? accessKey + "@" : "") + - (uri.getHost() != null ? uri.getHost() : Constants.S3_HOSTNAME); - } - } - - protected void validateUri(URI uri) { + public FileSystem newFileSystem(URI uri, Map env) + throws IOException { Preconditions.checkNotNull(uri, "uri is null"); - Preconditions.checkArgument(uri.getScheme().equals(getScheme()), "uri scheme must be 's3': '%s'", uri); - } - - protected void addEnvProperties(Properties props, Map env) { - if (env == null) - env = new HashMap<>(); - for (String key : PROPS_TO_OVERLOAD) { - // but can be overloaded by envs vars - overloadProperty(props, env, key); + Preconditions.checkArgument(uri.getScheme().equals("s3"), + "uri scheme must be 's3': '%s'", uri); + // first try to load amazon props + props = loadAmazonProperties(); + Object accessKey = props.getProperty(ACCESS_KEY); + Object secretKey = props.getProperty(SECRET_KEY); + Object sessionToken = props.getProperty(SESSION_TOKEN); + // but can overload by envs vars + if (env.get(ACCESS_KEY) != null){ + accessKey = env.get(ACCESS_KEY); } - - for (String key : env.keySet()) { - Object value = env.get(key); - if (!PROPS_TO_OVERLOAD.contains(key)) { - props.put(key, value); - } + if (env.get(SECRET_KEY) != null){ + secretKey = env.get(SECRET_KEY); } - } - - /** - * try to override the properties props with: - *
    - *
  1. the map or if not setted:
  2. - *
  3. the system property or if not setted:
  4. - *
  5. the system vars
  6. - *
- * - * @param props Properties to override - * @param env Map the first option - * @param key String the key - */ - private void overloadProperty(Properties props, Map env, String key) { - boolean overloaded = overloadPropertiesWithEnv(props, env, key); - - if (!overloaded) { - overloaded = overloadPropertiesWithSystemProps(props, key); + if (env.get(SESSION_TOKEN) != null){ + sessionToken = env.get(SESSION_TOKEN); } - if (!overloaded) { - overloadPropertiesWithSystemEnv(props, key); - } - } + // allows the env variables to override the ones in the property file + props.putAll(env); - /** - * @return true if the key are overloaded by the map parameter - */ - protected boolean overloadPropertiesWithEnv(Properties props, Map env, String key) { - if (env.get(key) != null && env.get(key) instanceof String) { - props.setProperty(key, (String) env.get(key)); - return true; - } - return false; - } + Preconditions.checkArgument((accessKey == null && secretKey == null) + || (accessKey != null && secretKey != null), + "%s and %s (and optionally %s) should be provided or should be omitted", + ACCESS_KEY, SECRET_KEY, SESSION_TOKEN); - /** - * @return true if the key are overloaded by a system property - */ - public boolean overloadPropertiesWithSystemProps(Properties props, String key) { - if (System.getProperty(key) != null) { - props.setProperty(key, System.getProperty(key)); - return true; - } - return false; - } + S3FileSystem result = createFileSystem(uri, accessKey, secretKey, sessionToken); - /** - * The system envs have preference over the properties files. - * So we overload it - * @param props Properties - * @param key String - * @return true if the key are overloaded by a system property - */ - public boolean overloadPropertiesWithSystemEnv(Properties props, String key) { - if (systemGetEnv(key) != null) { - props.setProperty(key, systemGetEnv(key)); - return true; + // if this instance already has a S3FileSystem, throw exception + // otherwise set + if (!fileSystem.compareAndSet(null, result)) { + throw new FileSystemAlreadyExistsException( + "S3 filesystem already exists. Use getFileSystem() instead"); } - return false; - } - - /** - * Get the system env with the key param - * @param key String - * @return String or null - */ - public String systemGetEnv(String key) { - return System.getenv(key); - } - /** - * Get existing filesystem based on a combination of URI and env settings. Create new filesystem otherwise. - * - * @param uri URI of existing, or to be created filesystem. - * @param env environment settings. - * @return new or existing filesystem. - */ - public FileSystem getFileSystem(URI uri, Map env) { - validateUri(uri); - Properties props = getProperties(uri, env); - String key = this.getFileSystemKey(uri, props); // s3fs_access_key is part of the key here. - if (fileSystems.containsKey(key)) - return fileSystems.get(key); - return newFileSystem(uri, env); + return result; } @Override - public S3FileSystem getFileSystem(URI uri) { - validateUri(uri); - String key = this.getFileSystemKey(uri); - if (fileSystems.containsKey(key)) { - return fileSystems.get(key); - } else { - throw new FileSystemNotFoundException("S3 filesystem not yet created. Use newFileSystem() instead"); + public FileSystem getFileSystem(URI uri) { + FileSystem fileSystem = this.fileSystem.get(); + + if (fileSystem == null) { + throw new FileSystemNotFoundException( + String.format("S3 filesystem not yet created. Use newFileSystem() instead")); } - } - private S3Path toS3Path(Path path) { - Preconditions.checkArgument(path instanceof S3Path, "path must be an instance of %s", S3Path.class.getName()); - return (S3Path) path; + return fileSystem; } /** * Deviation from spec: throws FileSystemNotFoundException if FileSystem * hasn't yet been initialized. Call newFileSystem() first. * Need credentials. Maybe set credentials after? how? - * TODO: we can create a new one if the credentials are present by: - * s3://access-key:secret-key@endpoint.com/ */ @Override public Path getPath(URI uri) { - FileSystem fileSystem = getFileSystem(uri); + Preconditions.checkArgument(uri.getScheme().equals(getScheme()), + "URI scheme must be %s", getScheme()); + + if (uri.getHost() != null && !uri.getHost().isEmpty() && + !uri.getHost().equals(fileSystem.get().getEndpoint())) { + throw new IllegalArgumentException(format( + "only empty URI host or URI host that matching the current fileSystem: %s", + fileSystem.get().getEndpoint())); // TODO + } /** * TODO: set as a list. one s3FileSystem by region */ - return fileSystem.getPath(uri.getPath()); + return getFileSystem(uri).getPath(uri.getPath()); } @Override - public DirectoryStream newDirectoryStream(Path dir, DirectoryStream.Filter filter) throws IOException { - final S3Path s3Path = toS3Path(dir); + public DirectoryStream newDirectoryStream(Path dir, + DirectoryStream.Filter filter) throws IOException { + + Preconditions.checkArgument(dir instanceof S3Path, + "path must be an instance of %s", S3Path.class.getName()); + final S3Path s3Path = (S3Path) dir; + return new DirectoryStream() { @Override public void close() throws IOException { @@ -311,45 +249,200 @@ public void close() throws IOException { @Override public Iterator iterator() { - return new S3Iterator(s3Path); + return new S3Iterator(s3Path.getFileSystem(), s3Path.getBucket(), s3Path.getKey() + "/"); } }; } @Override - public InputStream newInputStream(Path path, OpenOption... options) throws IOException { - S3Path s3Path = toS3Path(path); - String key = s3Path.getKey(); + public InputStream newInputStream(Path path, OpenOption... options) + throws IOException { + Preconditions.checkArgument(options.length == 0, + "OpenOptions not yet supported: %s", + ImmutableList.copyOf(options)); // TODO - Preconditions.checkArgument(options.length == 0, "OpenOptions not yet supported: %s", ImmutableList.copyOf(options)); // TODO - Preconditions.checkArgument(!key.equals(""), "cannot create InputStream for root directory: %s", path); + Preconditions.checkArgument(path instanceof S3Path, + "path must be an instance of %s", S3Path.class.getName()); + S3Path s3Path = (S3Path) path; + Preconditions.checkArgument(!s3Path.getKey().equals(""), + "cannot create InputStream for root directory: %s", s3Path); + + InputStream result; try { - S3Object object = s3Path.getFileSystem().getClient().getObject(s3Path.getFileStore().name(), key); - InputStream res = object.getObjectContent(); + result = s3Path.getFileSystem().getClient() + .getObject(s3Path.getBucket(), s3Path.getKey()) + .getObjectContent(); - if (res == null) + if (result == null) throw new IOException(String.format("The specified path is a directory: %s", path)); - - return res; - } catch (AmazonS3Exception e) { + } + catch (AmazonS3Exception e) { if (e.getStatusCode() == 404) throw new NoSuchFileException(path.toString()); // otherwise throws a generic IO exception - throw new IOException(String.format("Cannot access file: %s", path), e); + throw new IOException(String.format("Cannot access file: %s", path),e); } + + return result; } @Override - public SeekableByteChannel newByteChannel(Path path, Set options, FileAttribute... attrs) throws IOException { - S3Path s3Path = toS3Path(path); - return new S3SeekableByteChannel(s3Path, options); + public OutputStream newOutputStream(final Path path, final OpenOption... options) throws IOException { + Preconditions.checkArgument(path instanceof S3Path, "path must be an instance of %s", S3Path.class.getName()); + S3Path s3Path = (S3Path)path; + + // validate options + if (options.length > 0) { + Set opts = new LinkedHashSet<>(Arrays.asList(options)); + + // cannot handle APPEND here -> use newByteChannel() implementation + if (opts.contains(StandardOpenOption.APPEND)) { + return super.newOutputStream(path, options); + } + + if (opts.contains(StandardOpenOption.READ)) { + throw new IllegalArgumentException("READ not allowed"); + } + + boolean create = opts.remove(StandardOpenOption.CREATE); + boolean createNew = opts.remove(StandardOpenOption.CREATE_NEW); + boolean truncateExisting = opts.remove(StandardOpenOption.TRUNCATE_EXISTING); + + // remove irrelevant/ignored options + opts.remove(StandardOpenOption.WRITE); + opts.remove(StandardOpenOption.SPARSE); + + if (!opts.isEmpty()) { + throw new UnsupportedOperationException(opts.iterator().next() + " not supported"); + } + + if (!(create && truncateExisting)) { + if (exists(s3Path)) { + if (createNew || !truncateExisting) { + throw new FileAlreadyExistsException(path.toString()); + } + } else { + if (!createNew && !create) { + throw new NoSuchFileException(path.toString()); + } + } + } + } + + return createUploaderOutputStream(s3Path); + } + + private S3OutputStream createUploaderOutputStream( S3Path fileToUpload ) { + AmazonS3Client s3 = fileToUpload.getFileSystem().getClient(); + + S3UploadRequest req = props != null ? new S3UploadRequest(props) : new S3UploadRequest(); + req.setObjectId(fileToUpload.toS3ObjectId()); + + S3OutputStream stream = new S3OutputStream(s3.getClient(), req); + stream.setCannedAcl(s3.getCannedAcl()); + return stream; } @Override - public FileChannel newFileChannel(Path path, Set options, FileAttribute... attrs) throws IOException { - S3Path s3Path = toS3Path(path); - return new S3FileChannel(s3Path, options); + public SeekableByteChannel newByteChannel(Path path, + Set options, FileAttribute... attrs) + throws IOException { + Preconditions.checkArgument(path instanceof S3Path, + "path must be an instance of %s", S3Path.class.getName()); + final S3Path s3Path = (S3Path) path; + // we resolve to a file inside the temp folder with the s3path name + final Path tempFile = createTempDir().resolve(path.getFileName().toString()); + + try { + InputStream is = s3Path.getFileSystem().getClient() + .getObject(s3Path.getBucket(), s3Path.getKey()) + .getObjectContent(); + + if (is == null) + throw new IOException(String.format("The specified path is a directory: %s", path)); + + Files.write(tempFile, IOUtils.toByteArray(is)); + } + catch (AmazonS3Exception e) { + if (e.getStatusCode() != 404) + throw new IOException(String.format("Cannot access file: %s", path),e); + } + + // and we can use the File SeekableByteChannel implementation + final SeekableByteChannel seekable = Files .newByteChannel(tempFile, options); + + return new SeekableByteChannel() { + @Override + public boolean isOpen() { + return seekable.isOpen(); + } + + @Override + public void close() throws IOException { + + if (!seekable.isOpen()) { + return; + } + seekable.close(); + // upload the content where the seekable ends (close) + if (Files.exists(tempFile)) { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(Files.size(tempFile)); + + // FIXME: #20 ServiceLoader cant load com.upplication.s3fs.util.FileTypeDetector when this library is used inside a ear :( + metadata.setContentType(Files.probeContentType(tempFile)); + + try (InputStream stream = Files.newInputStream(tempFile)) { + /* + FIXME: if the stream is {@link InputStream#markSupported()} i can reuse the same stream + and evict the close and open methods of probeContentType. By this way: + metadata.setContentType(new Tika().detect(stream, tempFile.getFileName().toString())); + */ + upload(s3Path, s3Path.getKey(), stream, metadata); + } + } + else { + // delete: check option delete_on_close + s3Path.getFileSystem(). + getClient().deleteObject(s3Path.getBucket(), s3Path.getKey()); + } + // and delete the temp dir + Files.deleteIfExists(tempFile); + Files.deleteIfExists(tempFile.getParent()); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return seekable.write(src); + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + return seekable.truncate(size); + } + + @Override + public long size() throws IOException { + return seekable.size(); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return seekable.read(dst); + } + + @Override + public SeekableByteChannel position(long newPosition) + throws IOException { + return seekable.position(newPosition); + } + + @Override + public long position() throws IOException { + return seekable.position(); + } + }; } /** @@ -358,76 +451,123 @@ public FileChannel newFileChannel(Path path, Set options, * created or it already existed. */ @Override - public void createDirectory(Path dir, FileAttribute... attrs) throws IOException { - S3Path s3Path = toS3Path(dir); - Preconditions.checkArgument(attrs.length == 0, "attrs not yet supported: %s", ImmutableList.copyOf(attrs)); // TODO - if (exists(s3Path)) - throw new FileAlreadyExistsException(format("target already exists: %s", s3Path)); - // create bucket if necesary - Bucket bucket = s3Path.getFileStore().getBucket(); - String bucketName = s3Path.getFileStore().name(); - if (bucket == null) { - s3Path.getFileSystem().getClient().createBucket(bucketName); - } - // create the object as directory + public void createDirectory(Path dir, FileAttribute... attrs) + throws IOException { + + // FIXME: throw exception if the same key already exists at amazon s3 + + S3Path s3Path = (S3Path) dir; + + Preconditions.checkArgument(attrs.length == 0, + "attrs not yet supported: %s", ImmutableList.copyOf(attrs)); // TODO + ObjectMetadata metadata = new ObjectMetadata(); metadata.setContentLength(0); - String directoryKey = s3Path.getKey().endsWith("/") ? s3Path.getKey() : s3Path.getKey() + "/"; - s3Path.getFileSystem().getClient().putObject(bucketName, directoryKey, new ByteArrayInputStream(new byte[0]), metadata); + + String keyName = s3Path.getKey() + + (s3Path.getKey().endsWith("/") ? "" : "/"); + + upload(s3Path, keyName, new ByteArrayInputStream(new byte[0]), metadata); } @Override public void delete(Path path) throws IOException { - S3Path s3Path = toS3Path(path); - if (Files.notExists(s3Path)) - throw new NoSuchFileException("the path: " + this + " not exists"); - if (Files.isDirectory(s3Path) && Files.newDirectoryStream(s3Path).iterator().hasNext()) - throw new DirectoryNotEmptyException("the path: " + this + " is a directory and is not empty"); - - String key = s3Path.getKey(); - String bucketName = s3Path.getFileStore().name(); - s3Path.getFileSystem().getClient().deleteObject(bucketName, key); + Preconditions.checkArgument(path instanceof S3Path, + "path must be an instance of %s", S3Path.class.getName()); + + S3Path s3Path = (S3Path) path; + + if (Files.notExists(path)){ + throw new NoSuchFileException("the path: " + path + " not exists"); + } + + if (Files.isDirectory(path)){ + try (DirectoryStream stream = Files.newDirectoryStream(path)){ + if (stream.iterator().hasNext()){ + throw new DirectoryNotEmptyException("the path: " + path + " is a directory and is not empty"); + } + } + } + // we delete the two objects (sometimes exists the key '/' and sometimes not) - s3Path.getFileSystem().getClient().deleteObject(bucketName, key + "/"); + s3Path.getFileSystem().getClient() + .deleteObject(s3Path.getBucket(), s3Path.getKey()); + s3Path.getFileSystem().getClient() + .deleteObject(s3Path.getBucket(), s3Path.getKey() + "/"); } @Override - public void copy(Path source, Path target, CopyOption... options) throws IOException { - if (isSameFile(source, target)) + public void copy(Path source, Path target, CopyOption... options) + throws IOException { + Preconditions.checkArgument(source instanceof S3Path, + "source must be an instance of %s", S3Path.class.getName()); + Preconditions.checkArgument(target instanceof S3Path, + "target must be an instance of %s", S3Path.class.getName()); + + if (isSameFile(source, target)) { return; + } - S3Path s3Source = toS3Path(source); - S3Path s3Target = toS3Path(target); - // TODO: implements support for copying directories - - Preconditions.checkArgument(!Files.isDirectory(source), "copying directories is not yet supported: %s", source); - Preconditions.checkArgument(!Files.isDirectory(target), "copying directories is not yet supported: %s", target); - + S3Path s3Source = (S3Path) source; + S3Path s3Target = (S3Path) target; + /* + * Preconditions.checkArgument(!s3Source.isDirectory(), + * "copying directories is not yet supported: %s", source); // TODO + * Preconditions.checkArgument(!s3Target.isDirectory(), + * "copying directories is not yet supported: %s", target); // TODO + */ ImmutableSet actualOptions = ImmutableSet.copyOf(options); - verifySupportedOptions(EnumSet.of(StandardCopyOption.REPLACE_EXISTING), actualOptions); + verifySupportedOptions(EnumSet.of(StandardCopyOption.REPLACE_EXISTING), + actualOptions); - if (exists(s3Target) && !actualOptions.contains(StandardCopyOption.REPLACE_EXISTING)) { - throw new FileAlreadyExistsException(format("target already exists: %s", target)); + if (!actualOptions.contains(StandardCopyOption.REPLACE_EXISTING)) { + if (exists(s3Target)) { + throw new FileAlreadyExistsException(format( + "target already exists: %s", target)); + } } - String bucketNameOrigin = s3Source.getFileStore().name(); - String keySource = s3Source.getKey(); - String bucketNameTarget = s3Target.getFileStore().name(); - String keyTarget = s3Target.getKey(); - s3Source.getFileSystem() - .getClient().copyObject( - bucketNameOrigin, - keySource, - bucketNameTarget, - keyTarget); + AmazonS3Client client = s3Source.getFileSystem() .getClient(); + + final ObjectMetadata sourceObjMetadata = s3Source.getFileSystem().getClient().getObjectMetadata(s3Source.getBucket(), s3Source.getKey()); + final S3MultipartOptions opts = props != null ? new S3MultipartOptions<>(props) : new S3MultipartOptions(); + final int chunkSize = opts.getChunkSize(); + final long length = sourceObjMetadata.getContentLength(); + + if( length <= chunkSize ) { + + CopyObjectRequest copyObjRequest = new CopyObjectRequest(s3Source.getBucket(), s3Source.getKey(), s3Target.getBucket(), s3Target.getKey()); + + String storageEncryption = props.getProperty("storage_encryption"); + if( CommonUtils.isAES256Enabled(storageEncryption) ) { + ObjectMetadata targetObjectMetadata = new ObjectMetadata(); + targetObjectMetadata.setSSEAlgorithm(SSEAlgorithm.AES256.getAlgorithm()); + copyObjRequest.setNewObjectMetadata(targetObjectMetadata); + } else if ( CommonUtils.isKMSEnabled(storageEncryption) ) { + ObjectMetadata targetObjectMetadata = new ObjectMetadata(); + targetObjectMetadata.setSSEAlgorithm(SSEAlgorithm.KMS.getAlgorithm()); + if(CommonUtils.isValidString(props.getProperty("storage_encryption_key"))) { + copyObjRequest.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(props.getProperty("storage_encryption_key"))); + } + copyObjRequest.setNewObjectMetadata(targetObjectMetadata); + } else if (sourceObjMetadata.getSSEAlgorithm()!= null) { + ObjectMetadata targetObjectMetadata = new ObjectMetadata(); + targetObjectMetadata.setSSEAlgorithm(sourceObjMetadata.getSSEAlgorithm()); + copyObjRequest.setNewObjectMetadata(targetObjectMetadata); + } + + client.copyObject(copyObjRequest); + } + else { + client.multipartCopyObject(s3Source, s3Target, length, opts); + } } + @Override - public void move(Path source, Path target, CopyOption... options) throws IOException { - if (options != null && Arrays.asList(options).contains(StandardCopyOption.ATOMIC_MOVE)) - throw new AtomicMoveNotSupportedException(source.toString(), target.toString(), "Atomic not supported"); - copy(source, target, options); - delete(source); + public void move(Path source, Path target, CopyOption... options) + throws IOException { + throw new UnsupportedOperationException(); } @Override @@ -447,191 +587,358 @@ public FileStore getFileStore(Path path) throws IOException { @Override public void checkAccess(Path path, AccessMode... modes) throws IOException { - S3Path s3Path = toS3Path(path); - Preconditions.checkArgument(s3Path.isAbsolute(), "path must be absolute: %s", s3Path); - if (modes.length == 0) { - if (exists(s3Path)) - return; - throw new NoSuchFileException(toString()); + S3Path s3Path = (S3Path) path; + Preconditions.checkArgument(s3Path.isAbsolute(), + "path must be absolute: %s", s3Path); + + AmazonS3Client client = s3Path.getFileSystem().getClient(); + + // get ACL and check if the file exists as a side-effect + AccessControlList acl = getAccessControl(s3Path); + + for (AccessMode accessMode : modes) { + switch (accessMode) { + case EXECUTE: + throw new AccessDeniedException(s3Path.toString(), null, + "file is not executable"); + case READ: + if (!hasPermissions(acl, client.getS3AccountOwner(), + EnumSet.of(Permission.FullControl, Permission.Read))) { + throw new AccessDeniedException(s3Path.toString(), null, + "file is not readable"); + } + break; + case WRITE: + if (!hasPermissions(acl, client.getS3AccountOwner(), + EnumSet.of(Permission.FullControl, Permission.Write))) { + throw new AccessDeniedException(s3Path.toString(), null, + format("bucket '%s' is not writable", + s3Path.getBucket())); + } + break; + } } - - String key = s3Utils.getS3ObjectSummary(s3Path).getKey(); - S3AccessControlList accessControlList = - new S3AccessControlList(s3Path.getFileStore().name(), key, s3Path.getFileSystem().getClient().getObjectAcl(s3Path.getFileStore().name(), key), s3Path.getFileStore().getOwner()); - - accessControlList.checkAccess(modes); } + /** + * check if the param acl has the same owner than the parameter owner and + * have almost one of the permission set in the parameter permissions + * @param acl + * @param owner + * @param permissions almost one + * @return + */ + private boolean hasPermissions(AccessControlList acl, Owner owner, + EnumSet permissions) { + boolean result = false; + for (Grant grant : acl.getGrants()) { + if (grant.getGrantee().getIdentifier().equals(owner.getId()) + && permissions.contains(grant.getPermission())) { + result = true; + break; + } + } + return result; + } @Override - public V getFileAttributeView(Path path, Class type, LinkOption... options) { - S3Path s3Path = toS3Path(path); - if (type == BasicFileAttributeView.class) { - return (V) new S3BasicFileAttributeView(s3Path); - } else if (type == PosixFileAttributeView.class) { - return (V) new S3PosixFileAttributeView(s3Path); - } else if (type == null) { - throw new NullPointerException("Type is mandatory"); - } else { - return null; - } + public V getFileAttributeView(Path path, + Class type, LinkOption... options) { + throw new UnsupportedOperationException(); } @Override - public A readAttributes(Path path, Class type, LinkOption... options) throws IOException { - S3Path s3Path = toS3Path(path); + public A readAttributes(Path path, + Class type, LinkOption... options) throws IOException { + Preconditions.checkArgument(path instanceof S3Path, + "path must be an instance of %s", S3Path.class.getName()); + S3Path s3Path = (S3Path) path; + if (type == BasicFileAttributes.class) { - if (cache.isInTime(s3Path.getFileSystem().getCache(), s3Path.getFileAttributes())) { - A result = type.cast(s3Path.getFileAttributes()); - s3Path.setFileAttributes(null); - return result; - } else { - S3BasicFileAttributes attrs = s3Utils.getS3FileAttributes(s3Path); - s3Path.setFileAttributes(attrs); - return type.cast(attrs); + + S3ObjectSummary objectSummary = s3ObjectSummaryLookup.lookup(s3Path); + + // parse the data to BasicFileAttributes. + FileTime lastModifiedTime = null; + if( objectSummary.getLastModified() != null ) { + lastModifiedTime = FileTime.from(objectSummary.getLastModified().getTime(), TimeUnit.MILLISECONDS); + } + + long size = objectSummary.getSize(); + boolean directory = false; + boolean regularFile = false; + String key = objectSummary.getKey(); + // check if is a directory and exists the key of this directory at amazon s3 + if (objectSummary.getKey().equals(s3Path.getKey() + "/") && objectSummary.getKey().endsWith("/")) { + directory = true; + } + // is a directory but not exists at amazon s3 + else if ((!objectSummary.getKey().equals(s3Path.getKey()) || "".equals(s3Path.getKey())) && objectSummary.getKey().startsWith(s3Path.getKey())){ + directory = true; + // no metadata, we fake one + size = 0; + // delete extra part + key = s3Path.getKey() + "/"; } - } else if (type == PosixFileAttributes.class) { - if (s3Path.getFileAttributes() instanceof PosixFileAttributes && - cache.isInTime(s3Path.getFileSystem().getCache(), s3Path.getFileAttributes())) { - A result = type.cast(s3Path.getFileAttributes()); - s3Path.setFileAttributes(null); - return result; + // is a file: + else { + regularFile = true; } - S3PosixFileAttributes attrs = s3Utils.getS3PosixFileAttributes(s3Path); - s3Path.setFileAttributes(attrs); - return type.cast(attrs); + return type.cast(new S3FileAttributes(key, lastModifiedTime, size, directory, regularFile)); } - throw new UnsupportedOperationException(format("only %s or %s supported", BasicFileAttributes.class, PosixFileAttributes.class)); + // not support attribute class + throw new UnsupportedOperationException(format("only %s supported", BasicFileAttributes.class)); + } + + @Override + public Map readAttributes(Path path, String attributes, + LinkOption... options) throws IOException { + throw new UnsupportedOperationException(); } @Override - public Map readAttributes(Path path, String attributes, LinkOption... options) throws IOException { - if (attributes == null) { - throw new IllegalArgumentException("Attributes null"); + public void setAttribute(Path path, String attribute, Object value, + LinkOption... options) throws IOException { + throw new UnsupportedOperationException(); + } + + protected ClientConfiguration createClientConfig(Properties props) { + ClientConfiguration config = new ClientConfiguration(); + + if( props == null ) + return config; + + if( props.containsKey("connection_timeout") ) { + log.trace("AWS client config - connection_timeout: {}", props.getProperty("connection_timeout")); + config.setConnectionTimeout(Integer.parseInt(props.getProperty("connection_timeout"))); } - if (attributes.contains(":") && !attributes.contains("basic:") && !attributes.contains("posix:")) { - throw new UnsupportedOperationException(format("attributes %s are not supported, only basic / posix are supported", attributes)); + if( props.containsKey("max_connections")) { + log.trace("AWS client config - max_connections: {}", props.getProperty("max_connections")); + config.setMaxConnections(Integer.parseInt(props.getProperty("max_connections"))); } - if (attributes.equals("*") || attributes.equals("basic:*")) { - BasicFileAttributes attr = readAttributes(path, BasicFileAttributes.class, options); - return AttributesUtils.fileAttributeToMap(attr); - } else if (attributes.equals("posix:*")) { - PosixFileAttributes attr = readAttributes(path, PosixFileAttributes.class, options); - return AttributesUtils.fileAttributeToMap(attr); - } else { - String[] filters = new String[]{attributes}; - if (attributes.contains(",")) { - filters = attributes.split(","); - } - Class filter = BasicFileAttributes.class; - if (attributes.startsWith("posix:")) { - filter = PosixFileAttributes.class; - } - return AttributesUtils.fileAttributeToMap(readAttributes(path, filter, options), filters); + if( props.containsKey("max_error_retry")) { + log.trace("AWS client config - max_error_retry: {}", props.getProperty("max_error_retry")); + config.setMaxErrorRetry(Integer.parseInt(props.getProperty("max_error_retry"))); } - } - @Override - public void setAttribute(Path path, String attribute, Object value, LinkOption... options) throws IOException { - throw new UnsupportedOperationException(); + if( props.containsKey("protocol")) { + log.trace("AWS client config - protocol: {}", props.getProperty("protocol")); + config.setProtocol(Protocol.valueOf(props.getProperty("protocol").toUpperCase())); + } + + if( props.containsKey("proxy_domain")) { + log.trace("AWS client config - proxy_domain: {}", props.getProperty("proxy_domain")); + config.setProxyDomain(props.getProperty("proxy_domain")); + } + + if( props.containsKey("proxy_host")) { + log.trace("AWS client config - proxy_host: {}", props.getProperty("proxy_host")); + config.setProxyHost(props.getProperty("proxy_host")); + } + + if( props.containsKey("proxy_port")) { + log.trace("AWS client config - proxy_port: {}", props.getProperty("proxy_port")); + config.setProxyPort(Integer.parseInt(props.getProperty("proxy_port"))); + } + + if( props.containsKey("proxy_username")) { + log.trace("AWS client config - proxy_username: {}", props.getProperty("proxy_username")); + config.setProxyUsername(props.getProperty("proxy_username")); + } + + if( props.containsKey("proxy_password")) { + log.trace("AWS client config - proxy_password: {}", props.getProperty("proxy_password")); + config.setProxyPassword(props.getProperty("proxy_password")); + } + + if ( props.containsKey("proxy_workstation")) { + log.trace("AWS client config - proxy_workstation: {}", props.getProperty("proxy_workstation")); + config.setProxyWorkstation(props.getProperty("proxy_workstation")); + } + + if ( props.containsKey("signer_override")) { + log.debug("AWS client config - signerOverride: {}", props.getProperty("signer_override")); + config.setSignerOverride(props.getProperty("signer_override")); + } + + if( props.containsKey("socket_send_buffer_size_hints") || props.containsKey("socket_recv_buffer_size_hints") ) { + log.trace("AWS client config - socket_send_buffer_size_hints: {}, socket_recv_buffer_size_hints: {}", props.getProperty("socket_send_buffer_size_hints","0"), props.getProperty("socket_recv_buffer_size_hints", "0")); + int send = Integer.parseInt(props.getProperty("socket_send_buffer_size_hints","0")); + int recv = Integer.parseInt(props.getProperty("socket_recv_buffer_size_hints", "0")); + config.setSocketBufferSizeHints(send,recv); + } + + if( props.containsKey("socket_timeout")) { + log.trace("AWS client config - socket_timeout: {}", props.getProperty("socket_timeout")); + config.setSocketTimeout(Integer.parseInt(props.getProperty("socket_timeout"))); + } + + if( props.containsKey("user_agent")) { + log.trace("AWS client config - user_agent: {}", props.getProperty("user_agent")); + config.setUserAgent(props.getProperty("user_agent")); + } + + return config; } // ~~ + /** + * Create the fileSystem + * @param uri URI + * @param accessKey Object maybe null for anonymous authentication + * @param secretKey Object maybe null for anonymous authentication + * @return S3FileSystem never null + */ + + protected S3FileSystem createFileSystem(URI uri, Object accessKey, Object secretKey) { + return createFileSystem0(uri, accessKey, secretKey, null); + } /** * Create the fileSystem - * - * @param uri URI - * @param props Properties + * @param uri URI + * @param accessKey Object maybe null for anonymous authentication + * @param secretKey Object maybe null for anonymous authentication + * @param sessionToken Object maybe null for anonymous authentication * @return S3FileSystem never null */ - public S3FileSystem createFileSystem(URI uri, Properties props) { - return new S3FileSystem(this, getFileSystemKey(uri, props), getAmazonS3(uri, props), uri.getHost()); + protected S3FileSystem createFileSystem(URI uri, Object accessKey, Object secretKey, Object sessionToken) { + return createFileSystem0(uri, accessKey, secretKey, sessionToken); } - protected AmazonS3 getAmazonS3(URI uri, Properties props) { - return getAmazonS3Factory(props).getAmazonS3(uri, props); + protected S3FileSystem createFileSystem0(URI uri, Object accessKey, Object secretKey, Object sessionToken) { + AmazonS3Client client; + ClientConfiguration config = createClientConfig(props); + + if (accessKey == null && secretKey == null) { + client = new AmazonS3Client(new com.amazonaws.services.s3.AmazonS3Client(config)); + } else { + + AWSCredentials credentials = (sessionToken == null + ? new BasicAWSCredentials(accessKey.toString(), secretKey.toString()) + : new BasicSessionCredentials(accessKey.toString(), secretKey.toString(), sessionToken.toString()) ); + client = new AmazonS3Client(new com.amazonaws.services.s3.AmazonS3Client(credentials,config)); + } + + // note: path style access is going to be deprecated + // https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/ + boolean usePathStyle = "true".equals(props.getProperty("s_3_path_style_access")) || "true".equals(props.getProperty("s3_path_style_access")); + if (usePathStyle) { + S3ClientOptions options = S3ClientOptions.builder() + .setPathStyleAccess(usePathStyle) + .build(); + client.getClient().setS3ClientOptions(options); + } + + // set the client acl + client.setCannedAcl(getProp(props, "s_3_acl", "s3_acl", "s3Acl")); + + if (uri.getHost() != null) { + client.setEndpoint(uri.getHost()); + } + else if( props.getProperty("endpoint") != null ){ + client.setEndpoint(props.getProperty("endpoint")); + } + + return new S3FileSystem(this, client, uri.getHost()); } - protected AmazonS3Factory getAmazonS3Factory(Properties props) { - if (props.containsKey(AMAZON_S3_FACTORY_CLASS)) { - String amazonS3FactoryClass = props.getProperty(AMAZON_S3_FACTORY_CLASS); - try { - return (AmazonS3Factory) Class.forName(amazonS3FactoryClass).newInstance(); - } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | ClassCastException e) { - throw new S3FileSystemConfigurationException("Configuration problem, couldn't instantiate AmazonS3Factory (" + amazonS3FactoryClass + "): ", e); + protected String getProp(Properties props, String... keys) { + for( String k : keys ) { + if( props.containsKey(k) ) { + return props.getProperty(k); } } - return new AmazonS3ClientFactory(); + return null; } - + /** * find /amazon.properties in the classpath - * * @return Properties amazon.properties */ - public Properties loadAmazonProperties() { + protected Properties loadAmazonProperties() { Properties props = new Properties(); // http://www.javaworld.com/javaworld/javaqa/2003-06/01-qa-0606-load.html // http://www.javaworld.com/javaqa/2003-08/01-qa-0808-property.html - try (InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream("amazon.properties")) { - if (in != null) + try(InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream("amazon.properties")){ + if (in != null){ props.load(in); - } catch (IOException e) { - // If amazon.properties can't be loaded that's ok. - } + } + + } catch (IOException e) {} + return props; } - + // ~~~ - private void verifySupportedOptions(Set allowedOptions, Set actualOptions) { - Sets.SetView unsupported = difference(actualOptions, allowedOptions); - Preconditions.checkArgument(unsupported.isEmpty(), "the following options are not supported: %s", unsupported); + private void verifySupportedOptions(Set allowedOptions, + Set actualOptions) { + Sets.SetView unsupported = difference(actualOptions, + allowedOptions); + Preconditions.checkArgument(unsupported.isEmpty(), + "the following options are not supported: %s", unsupported); } - /** * check that the paths exists or not - * * @param path S3Path * @return true if exists */ - boolean exists(S3Path path) { - S3Path s3Path = toS3Path(path); + private boolean exists(S3Path path) { try { - s3Utils.getS3ObjectSummary(s3Path); + s3ObjectSummaryLookup.lookup(path); return true; - } catch (NoSuchFileException e) { + } + catch(NoSuchFileException e) { return false; } } - public void close(S3FileSystem fileSystem) { - if (fileSystem.getKey() != null && fileSystems.containsKey(fileSystem.getKey())) - fileSystems.remove(fileSystem.getKey()); - } - - public boolean isOpen(S3FileSystem s3FileSystem) { - return fileSystems.containsKey(s3FileSystem.getKey()); - } - /** - * only 4 testing + * Get the Control List, if the path not exists + * (because the path is a directory and this key isnt created at amazon s3) + * then return the ACL of the first child. + * + * @param path {@link S3Path} + * @return AccessControlList + * @throws NoSuchFileException if not found the path and any child */ - - protected static ConcurrentMap getFilesystems() { - return fileSystems; + private AccessControlList getAccessControl(S3Path path) throws NoSuchFileException{ + S3ObjectSummary obj = s3ObjectSummaryLookup.lookup(path); + // check first for file: + return path.getFileSystem().getClient().getObjectAcl(obj.getBucketName(), obj.getKey()); } - public Cache getCache() { - return cache; - } + /** + * create a temporal directory to create streams + * @return Path temporal folder + * @throws IOException + */ + protected Path createTempDir() throws IOException { + return Files.createTempDirectory("temp-s3-"); + } + + private void upload(S3Path s3Path, String keyName, InputStream stream, ObjectMetadata metadata) { + PutObjectRequest req = new PutObjectRequest(s3Path.getBucket(), keyName, + stream, + metadata); + + String storageEncryption = props.getProperty("storage_encryption"); + if( CommonUtils.isAES256Enabled(storageEncryption) ) { + metadata.setSSEAlgorithm(SSEAlgorithm.AES256.getAlgorithm()); + } else if( CommonUtils.isKMSEnabled(storageEncryption) ) { + metadata.setSSEAlgorithm(SSEAlgorithm.KMS.getAlgorithm()); + String encryptionKey = props.getProperty("storage_encryption_key"); + if(CommonUtils.isValidString(encryptionKey)) { + req.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(encryptionKey)); + } + } + req.setMetadata(metadata); - public void setCache(Cache cache) { - this.cache = cache; + s3Path.getFileSystem().getClient().putObject(req); } -} \ No newline at end of file +} diff --git a/src/main/java/com/upplication/s3fs/S3OutputStream.java b/src/main/java/com/upplication/s3fs/S3OutputStream.java new file mode 100644 index 00000000..7f669b45 --- /dev/null +++ b/src/main/java/com/upplication/s3fs/S3OutputStream.java @@ -0,0 +1,651 @@ +/* + * Copyright 2020, Seqera Labs + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.upplication.s3fs; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Phaser; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3ObjectId; +import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; +import com.amazonaws.services.s3.model.StorageClass; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.util.Base64; +import com.upplication.s3fs.util.ByteBufferInputStream; +import com.upplication.s3fs.util.CommonUtils; +import com.upplication.s3fs.util.S3UploadRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static java.util.Objects.requireNonNull; + +/** + * Parallel S3 multipart uploader. Based on the following code request + * See https://github.com/Upplication/Amazon-S3-FileSystem-NIO2/pulls + * + * @author Paolo Di Tommaso + * @author Tom Wieczorek + */ + +public final class S3OutputStream extends OutputStream { + + /** + * Hack a LinkedBlockingQueue to make the offer method blocking + * + * http://stackoverflow.com/a/4522411/395921 + * + * @param + */ + static class LimitedQueue extends LinkedBlockingQueue + { + public LimitedQueue(int maxSize) + { + super(maxSize); + } + + @Override + public boolean offer(E e) + { + // turn offer() and add() into a blocking calls (unless interrupted) + try { + put(e); + return true; + } catch(InterruptedException ie) { + Thread.currentThread().interrupt(); + } + return false; + } + } + + private static final Logger log = LoggerFactory.getLogger(S3OutputStream.class); + + + /** + * Amazon S3 API implementation to use. + */ + private final AmazonS3 s3; + + /** + * ID of the S3 object to store data into. + */ + private final S3ObjectId objectId; + + /** + * Amazon S3 storage class to apply to the newly created S3 object, if any. + */ + private final StorageClass storageClass; + + /** + * Amazon S3 storage class to apply to the newly created S3 object, if any. + */ + private final String storageEncryptionKey; + + /** + * Metadata that will be attached to the stored S3 object. + */ + private final ObjectMetadata metadata; + + /** + * Indicates if the stream has been closed. + */ + private volatile boolean closed; + + /** + * Indicates if the upload has been aborted + */ + private volatile boolean aborted; + + /** + * If a multipart upload is in progress, holds the ID for it, {@code null} otherwise. + */ + private volatile String uploadId; + + /** + * If a multipart upload is in progress, holds the ETags of the uploaded parts, {@code null} otherwise. + */ + private Queue partETags; + + /** + * Holds upload request metadata + */ + private final S3UploadRequest request; + + /** + * Instead of allocate a new buffer for each chunks recycle them, putting + * a buffer instance into this queue when the upload process is completed + */ + final private Queue bufferPool = new ConcurrentLinkedQueue(); + + /** + * The executor service (thread pool) which manages the upload in background + */ + private ExecutorService executor; + + /** + * The current working buffer + */ + private ByteBuffer buf; + + private MessageDigest md5; + + /** + * Phaser object to synchronize stream termination + */ + private Phaser phaser; + + /** + * Count the number of uploaded chunks + */ + private int partsCount; + + private int chunkSize; + + private CannedAccessControlList cannedAcl; + + /** + * Creates a s3 uploader output stream + * @param s3 The S3 client + * @param objectId The S3 object ID to upload + */ + public S3OutputStream(final AmazonS3 s3, S3ObjectId objectId ) { + this(s3, new S3UploadRequest().setObjectId(objectId)); + } + + /** + * Creates a new {@code S3OutputStream} that writes data directly into the S3 object with the given {@code objectId}. + * No special object metadata or storage class will be attached to the object. + * + * @param s3 Amazon S3 API implementation to use + * @param request An instance of {@link S3UploadRequest} + * + * @throws NullPointerException if at least one parameter is {@code null} + */ + public S3OutputStream(final AmazonS3 s3, S3UploadRequest request) { + this.s3 = requireNonNull(s3); + this.objectId = requireNonNull(request.getObjectId()); + this.metadata = request.getMetadata() != null ? request.getMetadata() : new ObjectMetadata(); + this.storageClass = request.getStorageClass(); + this.request = request; + this.chunkSize = request.getChunkSize(); + this.storageEncryptionKey = request.getStorageEncryptionKey(); + } + + private ByteBuffer expandBuffer(ByteBuffer byteBuffer) { + + final float expandFactor = 2.5f; + final int newCapacity = Math.min( (int)(byteBuffer.capacity() * expandFactor), chunkSize ); + + byteBuffer.flip(); + ByteBuffer expanded = ByteBuffer.allocate(newCapacity); + expanded.order(byteBuffer.order()); + expanded.put(byteBuffer); + return expanded; + } + + public void setCannedAcl(CannedAccessControlList acl) { + this.cannedAcl = acl; + } + + /** + * @return A MD5 message digester + */ + private MessageDigest createMd5() { + try { + return MessageDigest.getInstance("MD5"); + } + catch(NoSuchAlgorithmException e) { + throw new IllegalStateException("Cannot find a MD5 algorithm provider",e); + } + } + + + /** + * Writes a byte into the uploader buffer. When it is full starts the upload process + * in a asynchornous manner + * + * @param b The byte to be written + * @throws IOException + */ + @Override + public void write (int b) throws IOException { + if( buf == null ) { + buf = allocate(); + md5 = createMd5(); + } + else if( !buf.hasRemaining() ) { + if( buf.position() < chunkSize ) { + buf = expandBuffer(buf); + } + else { + flush(); + // create a new buffer + buf = allocate(); + md5 = createMd5(); + } + } + + buf.put((byte) b); + // update the md5 checksum + md5.update((byte) b); + } + + /** + * Flush the current buffer uploading to S3 storage + * + * @throws IOException + */ + @Override + public void flush() throws IOException { + // send out the current current + uploadBuffer(buf); + // clear the current buffer + buf = null; + md5 = null; + } + + private ByteBuffer allocate() { + + if( partsCount==0 ) { + return ByteBuffer.allocate(10 * 1024); + } + + // try to reuse a buffer from the poll + ByteBuffer result = bufferPool.poll(); + if( result != null ) { + result.clear(); + } + else { + // allocate a new buffer + result = ByteBuffer.allocateDirect(request.getChunkSize()); + } + + return result; + } + + + /** + * Upload the given buffer to S3 storage in a asynchronous manner. + * NOTE: when the executor service is busy (i.e. there are any more free threads) + * this method will block + */ + private void uploadBuffer(ByteBuffer buf) throws IOException { + // when the buffer is empty nothing to do + if( buf == null || buf.position()==0 ) { return; } + + if (partsCount == 0) { + init(); + } + + // set the buffer in read mode and submit for upload + executor.submit( task(buf, md5.digest(), ++partsCount) ); + } + + /** + * Initialize multipart upload data structures + * + * @throws IOException + */ + private void init() throws IOException { + // get the upload id + uploadId = initiateMultipartUpload().getUploadId(); + if (uploadId == null) { + throw new IOException("Failed to get a valid multipart upload ID from Amazon S3"); + } + // create the executor + executor = getOrCreateExecutor(request.getMaxThreads()); + partETags = new LinkedBlockingQueue<>(); + phaser = new Phaser(); + phaser.register(); + log.trace("Starting S3 upload: {}; chunk-size: {}; max-threads: {}", uploadId, request.getChunkSize(), request.getMaxThreads()); + } + + + /** + * Creates a {@link Runnable} task to handle the upload process + * in background + * + * @param buffer The buffer to be uploaded + * @param partIndex The index count + * @return + */ + private Runnable task(final ByteBuffer buffer, final byte[] checksum, final int partIndex) { + + phaser.register(); + return new Runnable() { + @Override + public void run() { + try { + uploadPart(buffer, checksum, partIndex, false); + } + catch (IOException e) { + final StringWriter writer = new StringWriter(); + e.printStackTrace(new PrintWriter(writer)); + log.error("Upload: {} > Error for part: {}\nCaused by: {}", uploadId, partIndex, writer.toString()); + } + finally { + phaser.arriveAndDeregister(); + } + } + }; + + } + + /** + * Close the stream uploading any remaning buffered data + * + * @throws IOException + */ + @Override + public void close() throws IOException { + if (closed) { + return; + } + + if (uploadId == null) { + if( buf != null ) + putObject(buf, md5.digest()); + else + // this is needed when trying to upload an empty + putObject(new ByteArrayInputStream(new byte[]{}), 0, createMd5().digest()); + } + else { + // -- upload remaining chunk + if( buf != null ) + uploadBuffer(buf); + + // -- shutdown upload executor and await termination + phaser.arriveAndAwaitAdvance(); + + // -- complete upload process + completeMultipartUpload(); + } + + closed = true; + } + + /** + * Starts the multipart upload process + * + * @return An instance of {@link InitiateMultipartUploadResult} + * @throws IOException + */ + private InitiateMultipartUploadResult initiateMultipartUpload() throws IOException { + final InitiateMultipartUploadRequest request = // + new InitiateMultipartUploadRequest(objectId.getBucket(), objectId.getKey(), metadata); + + if (storageClass != null) { + request.setStorageClass(storageClass); + } + + if(CommonUtils.isValidString(storageEncryptionKey)) { + request.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(storageEncryptionKey)); + } + + if( cannedAcl != null ) { + log.debug("Setting canned ACL={}; initiateMultipartUpload bucket={}, key={}", cannedAcl, objectId.getBucket(), objectId.getKey()); + request.withCannedACL(cannedAcl); + } + + try { + return s3.initiateMultipartUpload(request); + } catch (final AmazonClientException e) { + throw new IOException("Failed to initiate Amazon S3 multipart upload", e); + } + } + + /** + * Upload the given buffer to the S3 storage using a multipart process + * + * @param buf The buffer holding the data to upload + * @param partNumber The progressive index of this chunk (1-based) + * @param lastPart {@code true} when it is the last chunk + * @throws IOException + */ + private void uploadPart( final ByteBuffer buf, final byte[] checksum, final int partNumber, final boolean lastPart ) throws IOException { + buf.flip(); + buf.mark(); + + int attempt=0; + boolean success=false; + try { + while( !success ) { + attempt++; + int len = buf.limit(); + try { + log.trace("Uploading part {} with length {} attempt {} for {} ", partNumber, len, attempt, objectId); + uploadPart( new ByteBufferInputStream(buf), len, checksum , partNumber, lastPart ); + success=true; + } + catch (AmazonClientException | IOException e) { + if( attempt == request.getMaxAttempts() ) + throw new IOException("Failed to upload multipart data to Amazon S3", e); + + log.debug("Failed to upload part {} attempt {} for {} -- Caused by: {}", partNumber, attempt, objectId, e.getMessage()); + sleep(request.getRetrySleep()); + buf.reset(); + } + } + } + finally { + if (!success) { + closed = true; + abortMultipartUpload(); + } + bufferPool.offer(buf); + } + + } + + private void uploadPart(final InputStream content, final long contentLength, final byte[] checksum, final int partNumber, final boolean lastPart) + throws IOException { + + if (aborted) return; + + final UploadPartRequest request = new UploadPartRequest(); + request.setBucketName(objectId.getBucket()); + request.setKey(objectId.getKey()); + request.setUploadId(uploadId); + request.setPartNumber(partNumber); + request.setPartSize(contentLength); + request.setInputStream(content); + request.setLastPart(lastPart); + request.setMd5Digest(Base64.encodeAsString(checksum)); + + + final PartETag partETag = s3.uploadPart(request).getPartETag(); + log.trace("Uploaded part {} with length {} for {}: {}", partETag.getPartNumber(), contentLength, objectId, partETag.getETag()); + partETags.add(partETag); + + } + + private void sleep( long millis ) { + try { + Thread.sleep(millis); + } + catch (InterruptedException e) { + log.trace("Sleep was interrupted -- Cause: {}", e.getMessage()); + } + } + + /** + * Aborts the multipart upload process + */ + private synchronized void abortMultipartUpload() { + if (aborted) return; + + log.debug("Aborting multipart upload {} for {}", uploadId, objectId); + try { + s3.abortMultipartUpload(new AbortMultipartUploadRequest(objectId.getBucket(), objectId.getKey(), uploadId)); + } + catch (final AmazonClientException e) { + log.warn("Failed to abort multipart upload {}: {}", uploadId, e.getMessage()); + } + aborted = true; + phaser.arriveAndDeregister(); + } + + /** + * Completes the multipart upload process + * @throws IOException + */ + private void completeMultipartUpload() throws IOException { + // if aborted upload just ignore it + if( aborted ) return; + + final int partCount = partETags.size(); + log.trace("Completing upload to {} consisting of {} parts", objectId, partCount); + + try { + s3.completeMultipartUpload(new CompleteMultipartUploadRequest( // + objectId.getBucket(), objectId.getKey(), uploadId, new ArrayList<>(partETags))); + } catch (final AmazonClientException e) { + throw new IOException("Failed to complete Amazon S3 multipart upload", e); + } + + log.trace("Completed upload to {} consisting of {} parts", objectId, partCount); + + uploadId = null; + partETags = null; + } + + /** + * Stores the given buffer using a single-part upload process + * @param buf + * @throws IOException + */ + private void putObject(ByteBuffer buf, byte[] checksum) throws IOException { + buf.flip(); + putObject(new ByteBufferInputStream(buf), buf.limit(), checksum); + } + + /** + * Stores the given buffer using a single-part upload process + * + * @param contentLength + * @param content + * @throws IOException + */ + private void putObject(final InputStream content, final long contentLength, byte[] checksum) throws IOException { + + final ObjectMetadata meta = metadata.clone(); + meta.setContentLength(contentLength); + meta.setContentMD5( Base64.encodeAsString(checksum) ); + + final PutObjectRequest request = new PutObjectRequest(objectId.getBucket(), objectId.getKey(), content, meta); + if( cannedAcl!=null ) { + log.trace("Setting canned ACL={} for stream", cannedAcl); + request.withCannedAcl(cannedAcl); + } + + if (storageClass != null) { + request.setStorageClass(storageClass); + } + + if(CommonUtils.isAES256Enabled(storageEncryptionKey)) { + request.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(storageEncryptionKey)); + } + + try { + s3.putObject(request); + } catch (final AmazonClientException e) { + throw new IOException("Failed to put data into Amazon S3 object", e); + } + } + + /** + * @return Number of uploaded chunks + */ + int getPartsCount() { + return partsCount; + } + + + /** holds a singleton executor instance */ + static private volatile ExecutorService executorSingleton; + + /** + * Creates a singleton executor instance. + * + * @param maxThreads + * The max number of allowed threads in the executor pool. + * NOTE: changing the size parameter after the first invocation has no effect. + * @return The executor instance + */ + static synchronized ExecutorService getOrCreateExecutor(int maxThreads) { + if( executorSingleton == null ) { + ThreadPoolExecutor pool = new ThreadPoolExecutor( + maxThreads, + Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new LimitedQueue(maxThreads *3), + new ThreadPoolExecutor.CallerRunsPolicy() ); + + pool.allowCoreThreadTimeOut(true); + executorSingleton = pool; + log.trace("Created singleton upload executor -- max-treads: {}", maxThreads); + } + return executorSingleton; + } + + /** + * Shutdown the executor and clear the singleton + */ + public static synchronized void shutdownExecutor() { + log.trace("Uploader shutdown -- Executor: {}", executorSingleton); + + if( executorSingleton != null ) { + executorSingleton.shutdown(); + log.trace("Uploader await completion"); + awaitExecutorCompletion(); + executorSingleton = null; + log.trace("Uploader shutdown completed"); + } + } + + private static void awaitExecutorCompletion() { + try { + executorSingleton.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + log.trace("Executor await interrupted -- Cause: {}", e.getMessage()); + } + } +} diff --git a/src/main/java/com/upplication/s3fs/util/CommonUtils.java b/src/main/java/com/upplication/s3fs/util/CommonUtils.java new file mode 100644 index 00000000..6384afca --- /dev/null +++ b/src/main/java/com/upplication/s3fs/util/CommonUtils.java @@ -0,0 +1,38 @@ +package com.upplication.s3fs.util; + +import com.amazonaws.services.s3.model.SSEAlgorithm; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class CommonUtils { + private static Logger log = LoggerFactory.getLogger(CommonUtils.class); + + public static boolean isValidString(String str) { + return str != null && !str.trim().isEmpty(); + } + + public static boolean isAES256Enabled(String storageEncryption) { + if ( SSEAlgorithm.AES256.getAlgorithm().equalsIgnoreCase(storageEncryption) ) { + return true; + } + if( CommonUtils.isValidString(storageEncryption) && !isKMSEnabled(storageEncryption) ) { + log.warn("Not a valid S3 server-side encryption type: `{}` -- Currently only aws:kms or AES256 is supported", storageEncryption); + } + return false; + } + + public static boolean isKMSEnabled(String storageEncryption) { + if ( SSEAlgorithm.KMS.getAlgorithm().equalsIgnoreCase(storageEncryption)) { +// if(!CommonUtils.isValidString(props.getProperty("storage_encryption_key"))) { +// log.warn("KMS key for S3 server-side encryption type: `{}` is not passed, so it will use default (aws/s3).", encryption); +// } + return true; + + } else if( CommonUtils.isValidString(storageEncryption) && !isAES256Enabled(storageEncryption) ) { + log.warn("Not a valid S3 server-side encryption type: `{}` -- Currently only AES256 or aws:kms is supported", storageEncryption); + } + return false; + } +} diff --git a/src/main/java/com/upplication/s3fs/util/S3UploadRequest.java b/src/main/java/com/upplication/s3fs/util/S3UploadRequest.java new file mode 100644 index 00000000..5496b399 --- /dev/null +++ b/src/main/java/com/upplication/s3fs/util/S3UploadRequest.java @@ -0,0 +1,146 @@ +/* + * Copyright 2020, Seqera Labs + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.upplication.s3fs.util; + +import java.util.List; +import java.util.Properties; + +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3ObjectId; +import com.amazonaws.services.s3.model.SSEAlgorithm; +import com.amazonaws.services.s3.model.StorageClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Model a S3 multipart upload request + */ +public class S3UploadRequest extends S3MultipartOptions { + + private static final Logger log = LoggerFactory.getLogger(S3UploadRequest.class); + + /** + * ID of the S3 object to store data into. + */ + private S3ObjectId objectId; + + /** + * Amazon S3 storage class to apply to the newly created S3 object, if any. + */ + private StorageClass storageClass; + + /** + * Amazon S3 storage class to apply to the newly created S3 object, if any. + */ + private String storageEncryptionKey; + + /** + * Metadata that will be attached to the stored S3 object. + */ + private ObjectMetadata metadata; + + + + public S3UploadRequest() { + + } + + public S3UploadRequest(Properties props) { + super(props); + setStorageClass(props.getProperty("upload_storage_class")); + setStorageEncryption(props.getProperty("storage_encryption")); + setStorageEncryptionKey(props.getProperty("storage_encryption_key")); + } + + public S3ObjectId getObjectId() { + return objectId; + } + + public StorageClass getStorageClass() { + return storageClass; + } + + public String getStorageEncryptionKey() { return storageEncryptionKey; } + + public ObjectMetadata getMetadata() { + return metadata; + } + + + public S3UploadRequest setObjectId(S3ObjectId objectId) { + this.objectId = objectId; + return this; + } + + public S3UploadRequest setStorageClass(StorageClass storageClass) { + this.storageClass = storageClass; + return this; + } + + public S3UploadRequest setStorageClass(String storageClass) { + if( storageClass==null ) return this; + + try { + setStorageClass( StorageClass.fromValue(storageClass) ); + } + catch( IllegalArgumentException e ) { + log.warn("Not a valid AWS S3 storage class: `{}` -- Using default", storageClass); + } + return this; + } + + + public S3UploadRequest setStorageEncryption(String storageEncryption) { + if( storageEncryption == null) { + return this; + } else if(CommonUtils.isAES256Enabled(storageEncryption)) { + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setSSEAlgorithm(SSEAlgorithm.AES256.getAlgorithm()); + this.setMetadata(objectMetadata); + } else if(CommonUtils.isAES256Enabled(storageEncryption)) { + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setSSEAlgorithm(SSEAlgorithm.KMS.getAlgorithm()); + this.setMetadata(objectMetadata); + } else { + log.warn("Not a valid S3 server-side encryption type: `{}` -- Currently only AES256 or aws:kms is supported", storageEncryption); + } + return this; + } + + public S3UploadRequest setStorageEncryptionKey(String storageEncryptionKey) { + if( storageEncryptionKey == null) { + return this; + } + this.storageEncryptionKey = storageEncryptionKey; + return this; + } + + public S3UploadRequest setMetadata(ObjectMetadata metadata) { + this.metadata = metadata; + return this; + } + + public String toString() { + return "objectId=" + objectId + + "storageClass=" + storageClass + + "metadata=" + metadata + + "storageEncryptionKey=" + storageEncryptionKey + + super.toString(); + } + +}