From 8c7ca45bd7d5101b7114e34f45a939b6e5227a90 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Tue, 7 Oct 2025 11:35:04 -0700 Subject: [PATCH 01/29] Add executor support --- .../lib/aws-sdk-s3/customizations.rb | 1 + .../lib/aws-sdk-s3/customizations/object.rb | 12 +- .../lib/aws-sdk-s3/default_executor.rb | 98 ++++++ .../lib/aws-sdk-s3/file_downloader.rb | 283 +++++++++++------- .../lib/aws-sdk-s3/file_uploader.rb | 14 +- .../lib/aws-sdk-s3/multipart_file_uploader.rb | 115 +++---- gems/aws-sdk-s3/spec/file_downloader_spec.rb | 9 +- gems/aws-sdk-s3/spec/file_uploader_spec.rb | 6 - .../spec/multipart_file_uploader_spec.rb | 8 +- 9 files changed, 350 insertions(+), 196 deletions(-) create mode 100644 gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb index c0dba64b79c..1ffb0c06892 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb @@ -7,6 +7,7 @@ module S3 autoload :Encryption, 'aws-sdk-s3/encryption' autoload :EncryptionV2, 'aws-sdk-s3/encryption_v2' autoload :FilePart, 'aws-sdk-s3/file_part' + autoload :DefaultExecutor, 'aws-sdk-s3/default_executor' autoload :FileUploader, 'aws-sdk-s3/file_uploader' autoload :FileDownloader, 'aws-sdk-s3/file_downloader' autoload :LegacySigner, 'aws-sdk-s3/legacy_signer' diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb index 0a9a9b9d3ca..69e7d6905b9 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb @@ -459,11 +459,17 @@ def upload_stream(options = {}, &block) # @see Client#upload_part def upload_file(source, options = {}) uploading_options = options.dup - uploader = FileUploader.new(multipart_threshold: uploading_options.delete(:multipart_threshold), client: client) + executor = DefaultExecutor.new(max_threads: uploading_options.delete(:thread_count)) + uploader = FileUploader.new( + client: client, + executor: executor, + multipart_threshold: uploading_options.delete(:multipart_threshold) + ) response = Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do uploader.upload(source, uploading_options.merge(bucket: bucket_name, key: key)) end yield response if block_given? + executor.shutdown true end deprecated(:upload_file, use: 'Aws::S3::TransferManager#upload_file', version: 'next major version') @@ -539,10 +545,12 @@ def upload_file(source, options = {}) # @see Client#get_object # @see Client#head_object def download_file(destination, options = {}) - downloader = FileDownloader.new(client: client) + executor = DefaultExecutor.new(max_threads: options[:thread_count]) + downloader = FileDownloader.new(client: client, executor: executor) Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do downloader.download(destination, options.merge(bucket: bucket_name, key: key)) end + executor.shutdown true end deprecated(:download_file, use: 'Aws::S3::TransferManager#download_file', version: 'next major version') diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb new file mode 100644 index 00000000000..0fbdde2f61f --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +module Aws + module S3 + # @api private + class DefaultExecutor + RUNNING = :running + SHUTTING_DOWN = :shutting_down + SHUTDOWN = :shutdown + + def initialize(options = {}) + @max_threads = options[:max_threads] || 10 + @state = RUNNING + @queue = Queue.new + @pool = [] + @mutex = Mutex.new + end + + def post(*args, &block) + @mutex.synchronize do + raise 'Executor has been shutdown and is no longer accepting tasks' unless @state == RUNNING + + @queue << [args, block] + ensure_worker_available + end + true + end + + def kill + @mutex.synchronize do + @state = SHUTDOWN + @pool.each(&:kill) + @pool.clear + @queue.clear + end + true + end + + def shutdown(timeout = nil) + @mutex.synchronize do + return true if @state == SHUTDOWN + + @state = SHUTTING_DOWN + @pool.size.times { @queue << :shutdown } + end + + if timeout + deadline = Time.now + timeout + @pool.each do |thread| + remaining = deadline - Time.now + break if remaining <= 0 + + thread.join([remaining, 0].max) + end + @pool.select(&:alive?).each(&:kill) + else + @pool.each(&:join) + end + + @pool.clear + @state = SHUTDOWN + true + end + + def running? + @state == RUNNING + end + + def shutting_down? + @state == SHUTTING_DOWN + end + + def shutdown? + @state == SHUTDOWN + end + + private + + def ensure_worker_available + return unless @state == RUNNING + + @pool.select!(&:alive?) + @pool << spawn_worker if @pool.size < @max_threads + end + + def spawn_worker + Thread.new do + while (job = @queue.shift) + break if job == :shutdown + + args, block = job + block.call(*args) + end + end + end + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb index 517a06dabea..4181c7eaeac 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb @@ -8,184 +8,243 @@ module Aws module S3 # @api private class FileDownloader - MIN_CHUNK_SIZE = 5 * 1024 * 1024 MAX_PARTS = 10_000 + HEAD_OPTIONS = Set.new(Client.api.operation(:head_object).input.shape.member_names) + GET_OPTIONS = Set.new(Client.api.operation(:get_object).input.shape.member_names) def initialize(options = {}) @client = options[:client] || Client.new + @executor = options[:executor] end # @return [Client] attr_reader :client def download(destination, options = {}) - valid_types = [String, Pathname, File, Tempfile] - unless valid_types.include?(destination.class) - raise ArgumentError, "Invalid destination, expected #{valid_types.join(', ')} but got: #{destination.class}" - end - - @destination = destination - @mode = options.delete(:mode) || 'auto' - @thread_count = options.delete(:thread_count) || 10 - @chunk_size = options.delete(:chunk_size) - @on_checksum_validated = options.delete(:on_checksum_validated) - @progress_callback = options.delete(:progress_callback) - @params = options - validate! + validate_destination!(destination) + opts = build_download_opts(destination, options.dup) + validate_opts!(opts) Aws::Plugins::UserAgent.metric('S3_TRANSFER') do - case @mode - when 'auto' then multipart_download - when 'single_request' then single_request - when 'get_range' - raise ArgumentError, 'In get_range mode, :chunk_size must be provided' unless @chunk_size - - resp = @client.head_object(@params) - multithreaded_get_by_ranges(resp.content_length, resp.etag) - else - raise ArgumentError, "Invalid mode #{@mode} provided, :mode should be single_request, get_range or auto" + case opts[:mode] + when 'auto' then multipart_download(opts) + when 'single_request' then single_request(opts) + when 'get_range' then range_request(opts) end end - File.rename(@temp_path, @destination) if @temp_path + File.rename(opts[:temp_path], destination) if opts[:temp_path] ensure - File.delete(@temp_path) if @temp_path && File.exist?(@temp_path) + cleanup_temp_file!(opts) end private - def validate! - return unless @on_checksum_validated && !@on_checksum_validated.respond_to?(:call) + def build_download_opts(destination, opts) + { + destination: destination, + mode: opts.delete(:mode) || 'auto', + chunk_size: opts.delete(:chunk_size), + on_checksum_validated: opts.delete(:on_checksum_validated), + progress_callback: opts.delete(:progress_callback), + params: opts, + temp_path: nil + } + end + + def cleanup_temp_file!(opts) + return unless opts + + temp_file = opts[:temp_path] + File.delete(temp_file) if temp_file && File.exist?(temp_file) + end + + def download_with_executor(part_list, total_size, opts) + download_attempts = 0 + completion_queue = Queue.new + abort_download = false + error = nil + progress = + if (progress_callback = opts[:progress_callback]) + MultipartProgress.new(part_list, total_size, progress_callback) + end + + while (part = part_list.shift) + break if abort_download + + download_attempts += 1 + @executor.post(part) do |p| + update_progress(progress, p) if progress + + resp = @client.get_object(p.params) + range = extract_range(resp.content_range) + validate_range(range, p.params[:range]) if p.params[:range] + write(resp.body, range, opts) + + if opts[:on_checksum_validated] && resp.checksum_validated + opts[:on_checksum_validated].call(resp.checksum_validated, resp) + end + rescue StandardError => e + abort_download = true + error = e + ensure + completion_queue << :done + end + end + + download_attempts.times { completion_queue.pop } + raise error unless error.nil? + end + + def get_opts(opts) + GET_OPTIONS.each_with_object({}) { |k, h| h[k] = opts[k] if opts.key?(k) } + end + + def head_opts(opts) + HEAD_OPTIONS.each_with_object({}) { |k, h| h[k] = opts[k] if opts.key?(k) } + end - raise ArgumentError, ':on_checksum_validated must be callable' + def compute_chunk(chunk_size, file_size) + raise ArgumentError, ":chunk_size shouldn't exceed total file size." if chunk_size && chunk_size > file_size + + chunk_size || [(file_size.to_f / MAX_PARTS).ceil, MIN_CHUNK_SIZE].max.to_i end - def multipart_download - resp = @client.head_object(@params.merge(part_number: 1)) + def compute_mode(file_size, total_parts, etag, opts) + chunk_size = compute_chunk(opts[:chunk_size], file_size) + part_size = (file_size.to_f / total_parts).ceil + + resolve_temp_path(opts) + if chunk_size < part_size + multithreaded_get_by_ranges(file_size, etag, opts) + else + multithreaded_get_by_parts(total_parts, file_size, etag, opts) + end + end + + def extract_range(value) + value.match(%r{bytes (?\d+-\d+)/\d+})[:range] + end + + def multipart_download(opts) + resp = @client.head_object(head_opts(opts[:params].merge(part_number: 1))) count = resp.parts_count if count.nil? || count <= 1 if resp.content_length <= MIN_CHUNK_SIZE - single_request + single_request(opts) else - multithreaded_get_by_ranges(resp.content_length, resp.etag) + resolve_temp_path(opts) + multithreaded_get_by_ranges(resp.content_length, resp.etag, opts) end else # covers cases when given object is not uploaded via UploadPart API - resp = @client.head_object(@params) # partNumber is an option + resp = @client.head_object(head_opts(opts[:params])) # partNumber is an option if resp.content_length <= MIN_CHUNK_SIZE - single_request + single_request(opts) else - compute_mode(resp.content_length, count, resp.etag) + compute_mode(resp.content_length, count, resp.etag, opts) end end end - def compute_mode(file_size, count, etag) - chunk_size = compute_chunk(file_size) - part_size = (file_size.to_f / count).ceil - if chunk_size < part_size - multithreaded_get_by_ranges(file_size, etag) - else - multithreaded_get_by_parts(count, file_size, etag) + def multithreaded_get_by_parts(total_parts, file_size, etag, opts) + parts = (1..total_parts).map do |part| + params = get_opts(opts[:params].merge(part_number: part, if_match: etag)) + Part.new(part_number: part, params: params) end + download_with_executor(PartList.new(parts), file_size, opts) end - def compute_chunk(file_size) - raise ArgumentError, ":chunk_size shouldn't exceed total file size." if @chunk_size && @chunk_size > file_size - - @chunk_size || [(file_size.to_f / MAX_PARTS).ceil, MIN_CHUNK_SIZE].max.to_i - end - - def multithreaded_get_by_ranges(file_size, etag) + def multithreaded_get_by_ranges(file_size, etag, opts) offset = 0 - default_chunk_size = compute_chunk(file_size) + default_chunk_size = compute_chunk(opts[:chunk_size], file_size) chunks = [] part_number = 1 # parts start at 1 while offset < file_size progress = offset + default_chunk_size progress = file_size if progress > file_size - params = @params.merge(range: "bytes=#{offset}-#{progress - 1}", if_match: etag) + params = get_opts(opts[:params].merge(range: "bytes=#{offset}-#{progress - 1}", if_match: etag)) chunks << Part.new(part_number: part_number, size: (progress - offset), params: params) part_number += 1 offset = progress end - download_in_threads(PartList.new(chunks), file_size) - end - - def multithreaded_get_by_parts(n_parts, total_size, etag) - parts = (1..n_parts).map do |part| - Part.new(part_number: part, params: @params.merge(part_number: part, if_match: etag)) - end - download_in_threads(PartList.new(parts), total_size) - end - - def download_in_threads(pending, total_size) - threads = [] - progress = MultipartProgress.new(pending, total_size, @progress_callback) if @progress_callback - unless [File, Tempfile].include?(@destination.class) - @temp_path = "#{@destination}.s3tmp.#{SecureRandom.alphanumeric(8)}" - end - @thread_count.times do - thread = Thread.new do - begin - while (part = pending.shift) - if progress - part.params[:on_chunk_received] = - proc do |_chunk, bytes, total| - progress.call(part.part_number, bytes, total) - end - end - resp = @client.get_object(part.params) - range = extract_range(resp.content_range) - validate_range(range, part.params[:range]) if part.params[:range] - write(resp.body, range) - if @on_checksum_validated && resp.checksum_validated - @on_checksum_validated.call(resp.checksum_validated, resp) - end - end - nil - rescue StandardError => e - pending.clear! # keep other threads from downloading other parts - raise e - end - end - threads << thread - end - threads.map(&:value).compact + download_with_executor(PartList.new(chunks), file_size, opts) end - def extract_range(value) - value.match(%r{bytes (?\d+-\d+)/\d+})[:range] + def range_request(opts) + resp = @client.head_object(head_opts(opts[:params])) + resolve_temp_path(opts) + multithreaded_get_by_ranges(resp.content_length, resp.etag, opts) end - def validate_range(actual, expected) - return if actual == expected.match(/bytes=(?\d+-\d+)/)[:range] + def resolve_temp_path(opts) + return if [File, Tempfile].include?(opts[:destination].class) - raise MultipartDownloadError, "multipart download failed: expected range of #{expected} but got #{actual}" + opts[:temp_path] ||= "#{opts[:destination]}.s3tmp.#{SecureRandom.alphanumeric(8)}" end - def write(body, range) - path = @temp_path || @destination - File.write(path, body.read, range.split('-').first.to_i) - end - - def single_request - params = @params.merge(response_target: @destination) - params[:on_chunk_received] = single_part_progress if @progress_callback + def single_request(opts) + params = get_opts(opts[:params]).merge(response_target: opts[:destination]) + params[:on_chunk_received] = single_part_progress(opts) if opts[:progress_callback] resp = @client.get_object(params) - return resp unless @on_checksum_validated + return resp unless opts[:on_checksum_validated] - @on_checksum_validated.call(resp.checksum_validated, resp) if resp.checksum_validated + opts[:on_checksum_validated].call(resp.checksum_validated, resp) if resp.checksum_validated resp end - def single_part_progress + def single_part_progress(opts) proc do |_chunk, bytes_read, total_size| - @progress_callback.call([bytes_read], [total_size], total_size) + opts[:progress_callback].call([bytes_read], [total_size], total_size) + end + end + + def update_progress(progress, part) + part.params[:on_chunk_received] = + proc do |_chunk, bytes, total| + progress.call(part.part_number, bytes, total) + end + end + + def validate_destination!(destination) + valid_types = [String, Pathname, File, Tempfile] + return if valid_types.include?(destination.class) + + raise ArgumentError, "Invalid destination, expected #{valid_types.join(', ')} but got: #{destination.class}" + end + + def validate_opts!(opts) + if opts[:on_checksum_validated] && !opts[:on_checksum_validated].respond_to?(:call) + raise ArgumentError, ':on_checksum_validated must be callable' + end + + valid_modes = %w[auto get_range single_request] + unless valid_modes.include?(opts[:mode]) + msg = "Invalid mode #{opts[:mode]} provided, :mode should be single_request, get_range or auto" + raise ArgumentError, msg + end + + if opts[:mode] == 'get_range' && opts[:chunk_size].nil? + raise ArgumentError, 'In get_range mode, :chunk_size must be provided' + end + + if opts[:chunk_size] && opts[:chunk_size] <= 0 + raise ArgumentError, ':chunk_size must be positive' end end + def validate_range(actual, expected) + return if actual == expected.match(/bytes=(?\d+-\d+)/)[:range] + + raise MultipartDownloadError, "multipart download failed: expected range of #{expected} but got #{actual}" + end + + def write(body, range, opts) + path = opts[:temp_path] || opts[:destination] + File.write(path, body.read, range.split('-').first.to_i) + end + # @api private class Part < Struct.new(:part_number, :size, :params) include Aws::Structure diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_uploader.rb index 587066551ea..62dbb07f8c3 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_uploader.rb @@ -13,8 +13,8 @@ class FileUploader # @option options [Client] :client # @option options [Integer] :multipart_threshold (104857600) def initialize(options = {}) - @options = options @client = options[:client] || Client.new + @executor = options[:executor] @multipart_threshold = options[:multipart_threshold] || DEFAULT_MULTIPART_THRESHOLD end @@ -36,11 +36,9 @@ def initialize(options = {}) # @return [void] def upload(source, options = {}) Aws::Plugins::UserAgent.metric('S3_TRANSFER') do - if File.size(source) >= multipart_threshold - MultipartFileUploader.new(@options).upload(source, options) + if File.size(source) >= @multipart_threshold + MultipartFileUploader.new(client: @client, executor: @executor).upload(source, options) else - # remove multipart parameters not supported by put_object - options.delete(:thread_count) put_object(source, options) end end @@ -48,9 +46,9 @@ def upload(source, options = {}) private - def open_file(source) - if String === source || Pathname === source - File.open(source, 'rb') { |file| yield(file) } + def open_file(source, &block) + if source.is_a?(String) || source.is_a?(Pathname) + File.open(source, 'rb', &block) else yield(source) end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb index bcc05f1fc9e..bab023e1bf7 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb @@ -7,7 +7,6 @@ module Aws module S3 # @api private class MultipartFileUploader - MIN_PART_SIZE = 5 * 1024 * 1024 # 5MB MAX_PARTS = 10_000 DEFAULT_THREAD_COUNT = 10 @@ -21,10 +20,9 @@ class MultipartFileUploader ) # @option options [Client] :client - # @option options [Integer] :thread_count (DEFAULT_THREAD_COUNT) def initialize(options = {}) @client = options[:client] || Client.new - @thread_count = options[:thread_count] || DEFAULT_THREAD_COUNT + @executor = options[:executor] end # @return [Client] @@ -38,11 +36,12 @@ def initialize(options = {}) # It will be invoked with [bytes_read], [total_sizes] # @return [Seahorse::Client::Response] - the CompleteMultipartUploadResponse def upload(source, options = {}) - raise ArgumentError, 'unable to multipart upload files smaller than 5MB' if File.size(source) < MIN_PART_SIZE + file_size = File.size(source) + raise ArgumentError, 'unable to multipart upload files smaller than 5MB' if file_size < MIN_PART_SIZE upload_id = initiate_upload(options) - parts = upload_parts(upload_id, source, options) - complete_upload(upload_id, parts, source, options) + parts = upload_parts(upload_id, source, file_size, options) + complete_upload(upload_id, parts, file_size, options) end private @@ -51,22 +50,22 @@ def initiate_upload(options) @client.create_multipart_upload(create_opts(options)).upload_id end - def complete_upload(upload_id, parts, source, options) + def complete_upload(upload_id, parts, file_size, options) @client.complete_multipart_upload( **complete_opts(options).merge( upload_id: upload_id, multipart_upload: { parts: parts }, - mpu_object_size: File.size(source) + mpu_object_size: file_size ) ) rescue StandardError => e abort_upload(upload_id, options, [e]) end - def upload_parts(upload_id, source, options) + def upload_parts(upload_id, source, file_size, options) completed = PartList.new - pending = PartList.new(compute_parts(upload_id, source, options)) - errors = upload_in_threads(pending, completed, options) + pending = PartList.new(compute_parts(upload_id, source, file_size, options)) + errors = upload_with_executor(pending, completed, options) if errors.empty? completed.to_a.sort_by { |part| part[:part_number] } else @@ -86,17 +85,20 @@ def abort_upload(upload_id, options, errors) raise MultipartUploadError.new(msg, errors + [e]) end - def compute_parts(upload_id, source, options) - size = File.size(source) - default_part_size = compute_default_part_size(size) + def compute_parts(upload_id, source, file_size, options) + default_part_size = compute_default_part_size(file_size) offset = 0 part_number = 1 parts = [] - while offset < size + while offset < file_size parts << upload_part_opts(options).merge( upload_id: upload_id, part_number: part_number, - body: FilePart.new(source: source, offset: offset, size: part_size(size, default_part_size, offset)) + body: FilePart.new( + source: source, + offset: offset, + size: part_size(file_size, default_part_size, offset) + ) ) part_number += 1 offset += default_part_size @@ -108,24 +110,20 @@ def checksum_key?(key) CHECKSUM_KEYS.include?(key) end - def has_checksum_key?(keys) + def has_checksum_keys?(keys) keys.any? { |key| checksum_key?(key) } end def create_opts(options) opts = { checksum_algorithm: Aws::Plugins::ChecksumAlgorithm::DEFAULT_CHECKSUM } - opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_key?(options.keys) - CREATE_OPTIONS.each_with_object(opts) do |key, hash| - hash[key] = options[key] if options.key?(key) - end + opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_keys?(options.keys) + CREATE_OPTIONS.each_with_object(opts) { |k, h| h[k] = options[k] if options.key?(k) } end def complete_opts(options) opts = {} - opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_key?(options.keys) - COMPLETE_OPTIONS.each_with_object(opts) do |key, hash| - hash[key] = options[key] if options.key?(key) - end + opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_keys?(options.keys) + COMPLETE_OPTIONS.each_with_object(opts) { |k, h| h[k] = options[k] if options.key?(k) } end def upload_part_opts(options) @@ -135,43 +133,43 @@ def upload_part_opts(options) end end - def upload_in_threads(pending, completed, options) - threads = [] + def upload_with_executor(pending, completed, options) + upload_attempts = 0 + completion_queue = Queue.new + abort_upload = false + errors = [] + if (callback = options[:progress_callback]) progress = MultipartProgress.new(pending, callback) end - options.fetch(:thread_count, @thread_count).times do - thread = Thread.new do - begin - while (part = pending.shift) - if progress - part[:on_chunk_sent] = - proc do |_chunk, bytes, _total| - progress.call(part[:part_number], bytes) - end - end - resp = @client.upload_part(part) - part[:body].close - completed_part = { etag: resp.etag, part_number: part[:part_number] } - algorithm = resp.context.params[:checksum_algorithm] - k = "checksum_#{algorithm.downcase}".to_sym - completed_part[k] = resp.send(k) - completed.push(completed_part) - end - nil - rescue StandardError => e - # keep other threads from uploading other parts - pending.clear! - e - end + + while (part = pending.shift) + break if abort_upload + + upload_attempts += 1 + @executor.post(part) do |p| + update_progress(progress, p) if progress + resp = @client.upload_part(p) + p[:body].close + completed_part = { etag: resp.etag, part_number: p[:part_number] } + algorithm = resp.context.params[:checksum_algorithm].downcase + k = "checksum_#{algorithm}".to_sym + completed_part[k] = resp.send(k) + completed.push(completed_part) + rescue StandardError => e + abort_upload = true + errors << e + ensure + completion_queue << :done end - threads << thread end - threads.map(&:value).compact + + upload_attempts.times { completion_queue.pop } + errors end - def compute_default_part_size(source_size) - [(source_size.to_f / MAX_PARTS).ceil, MIN_PART_SIZE].max.to_i + def compute_default_part_size(file_size) + [(file_size.to_f / MAX_PARTS).ceil, MIN_PART_SIZE].max.to_i end def part_size(total_size, part_size, offset) @@ -182,6 +180,13 @@ def part_size(total_size, part_size, offset) end end + def update_progress(progress, part) + part[:on_chunk_sent] = + proc do |_chunk, bytes, _total| + progress.call(part[:part_number], bytes) + end + end + # @api private class PartList def initialize(parts = []) diff --git a/gems/aws-sdk-s3/spec/file_downloader_spec.rb b/gems/aws-sdk-s3/spec/file_downloader_spec.rb index 0f3dd8fb2cf..2632377b981 100644 --- a/gems/aws-sdk-s3/spec/file_downloader_spec.rb +++ b/gems/aws-sdk-s3/spec/file_downloader_spec.rb @@ -7,7 +7,8 @@ module Aws module S3 describe FileDownloader do let(:client) { S3::Client.new(stub_responses: true) } - let(:subject) { FileDownloader.new(client: client) } + let(:executor) { DefaultExecutor.new } + let(:subject) { FileDownloader.new(client: client, executor: executor) } let(:tmpdir) { Dir.tmpdir } describe '#initialize' do @@ -198,7 +199,6 @@ module S3 it 'raises when checksum validation fails on multipart object' do client.stub_responses(:get_object, { body: 'body', checksum_sha1: 'invalid' }) - expect(Thread).to receive(:new).and_yield.and_return(double(value: nil)) expect { subject.download(path, parts_params) }.to raise_error(Aws::Errors::ChecksumError) end @@ -208,7 +208,6 @@ module S3 expect(ctx.params[:if_match]).to eq('test-etag') 'PreconditionFailed' }) - expect(Thread).to receive(:new).and_yield.and_return(double(value: nil)) expect { subject.download(path, range_params.merge(chunk_size: one_meg, mode: 'get_range')) } .to raise_error(Aws::S3::Errors::PreconditionFailed) end @@ -219,8 +218,6 @@ module S3 expect(ctx.params[:if_match]).to eq('test-etag') 'PreconditionFailed' }) - - expect(Thread).to receive(:new).and_yield.and_return(double(value: nil)) expect { subject.download(path, parts_params) }.to raise_error(Aws::S3::Errors::PreconditionFailed) end @@ -246,7 +243,6 @@ module S3 it 'raises when range validation fails' do client.stub_responses(:get_object, { body: 'body', content_range: 'bytes 0-3/4' }) - expect(Thread).to receive(:new).and_yield.and_return(double(value: nil)) expect { subject.download(path, range_params.merge(mode: 'get_range', chunk_size: one_meg)) } .to raise_error(Aws::S3::MultipartDownloadError) end @@ -263,7 +259,6 @@ module S3 responses[context.params[:range]] }) - expect(Thread).to receive(:new).and_yield.and_return(double(value: nil)) expect { subject.download(path, range_params.merge(chunk_size: 5 * one_meg, mode: 'get_range')) } .to raise_error(Aws::S3::MultipartDownloadError) expect(File.exist?(path)).to be(true) diff --git a/gems/aws-sdk-s3/spec/file_uploader_spec.rb b/gems/aws-sdk-s3/spec/file_uploader_spec.rb index 64dbaf7cdd2..dc3ab94b052 100644 --- a/gems/aws-sdk-s3/spec/file_uploader_spec.rb +++ b/gems/aws-sdk-s3/spec/file_uploader_spec.rb @@ -81,12 +81,6 @@ module S3 subject.upload(ten_meg_file.path, params) end - - it 'does not fail when given :thread_count' do - expect(client).to receive(:put_object).with(params.merge(body: ten_meg_file)) - - subject.upload(ten_meg_file, params.merge(thread_count: 1)) - end end end end diff --git a/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb b/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb index 82e147986e3..54b50a13a01 100644 --- a/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb +++ b/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb @@ -7,7 +7,8 @@ module Aws module S3 describe MultipartFileUploader do let(:client) { S3::Client.new(stub_responses: true) } - let(:subject) { MultipartFileUploader.new(client: client) } + let(:executor) { DefaultExecutor.new } + let(:subject) { MultipartFileUploader.new(client: client, executor: executor) } let(:params) { { bucket: 'bucket', key: 'key' } } describe '#initialize' do @@ -85,7 +86,6 @@ module S3 end it 'reports progress for multipart uploads' do - allow(Thread).to receive(:new).and_yield.and_return(double(value: nil)) client.stub_responses(:create_multipart_upload, upload_id: 'id') client.stub_responses(:complete_multipart_upload) expect(client).to receive(:upload_part).exactly(24).times do |args| @@ -127,10 +127,6 @@ module S3 end it 'reports when it is unable to abort a failed multipart upload' do - allow(Thread).to receive(:new) do |_, &block| - double(value: block.call) - end - client.stub_responses( :upload_part, [ From c21969a4cbf1431d13a4a00bf172d84ff6216cbd Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Tue, 7 Oct 2025 14:10:36 -0700 Subject: [PATCH 02/29] Add changelog entry --- gems/aws-sdk-s3/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gems/aws-sdk-s3/CHANGELOG.md b/gems/aws-sdk-s3/CHANGELOG.md index 7fd43ebaea8..5447ec027e4 100644 --- a/gems/aws-sdk-s3/CHANGELOG.md +++ b/gems/aws-sdk-s3/CHANGELOG.md @@ -1,6 +1,8 @@ Unreleased Changes ------------------ +* Feature - Add lightweight thread pool executor for multipart `download_file` and `upload_file`. + 1.199.1 (2025-09-25) ------------------ From 39ecf0a2cd08ed3a9530f16c6490f4f7c6801c87 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Tue, 7 Oct 2025 14:10:51 -0700 Subject: [PATCH 03/29] Update TM with executor changes --- .../lib/aws-sdk-s3/transfer_manager.rb | 65 +++++++++++++++---- 1 file changed, 54 insertions(+), 11 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb index 03bd249d5e2..77d995fd073 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb @@ -12,17 +12,55 @@ module S3 # * track transfer progress by using progress listener # class TransferManager + # @example Using default executor (automatic creation and shutdown) + # tm = TransferManager.new # No executor provided + # # DefaultExecutor created, used, and shutdown automatically + # tm.download_file('/path/to/file', bucket: 'bucket', key: 'key') + # + # @example Using custom executor (manual shutdown required) + # require 'concurrent-ruby' + # + # executor = Concurrent::ThreadPoolExecutor.new(max_threads: 5) + # tm = TransferManager.new(executor: executor) + # tm.download_file('/path/to/file1', bucket: 'bucket', key: 'key1') + # executor.shutdown # You must shutdown custom executors + # # @param [Hash] options # @option options [S3::Client] :client (S3::Client.new) # The S3 client to use for {TransferManager} operations. If not provided, a new default client # will be created automatically. + # @option options [Object] :executor + # The executor to use for multipart operations. Must implement the same interface as {DefaultExecutor}. + # If not provided, a new {DefaultExecutor} will be created automatically for each operation and + # shutdown after completion. When provided a custom executor, it will be reused across operations, and + # you are responsible for shutting it down when finished. + # + # **Required Methods:** + # + # * `post(*args, &block)` - Execute a task with given arguments and block + # + # * `shutdown(timeout = nil)` - Gracefully shutdown the executor with optional timeout + # + # * `kill` - Immediately terminate all running tasks + # + # * `running?` - Returns true if executor is accepting new tasks + # + # * `shutting_down?` - Returns true if shutdown has been initiated but not completed + # + # * `shutdown?` - Returns true if executor has been fully shutdown + # def initialize(options = {}) - @client = options.delete(:client) || Client.new + @client = options[:client] || Client.new + @executor = options[:executor] + @options = options end # @return [S3::Client] attr_reader :client + # @return [S3::Client] + attr_reader :executor + # Downloads a file in S3 to a path on disk. # # # small files (< 5MB) are downloaded in a single API call @@ -74,6 +112,7 @@ def initialize(options = {}) # @option options [Integer] :chunk_size required in `"get_range"` mode. # # @option options [Integer] :thread_count (10) Customize threads used in the multipart download. + # Only used when no custom executor is provided (creates {DefaultExecutor} with this thread count). # # @option options [String] :version_id The object version id used to retrieve the object. # @@ -102,8 +141,10 @@ def initialize(options = {}) # @see Client#get_object # @see Client#head_object def download_file(destination, bucket:, key:, **options) - downloader = FileDownloader.new(client: @client) + executor = @executor || DefaultExecutor.new + downloader = FileDownloader.new(client: @client, executor: executor) downloader.download(destination, options.merge(bucket: bucket, key: key)) + executor.shutdown if @options[:executor] true end @@ -139,7 +180,7 @@ def download_file(destination, bucket:, key:, **options) # A file on the local file system that will be uploaded. This can either be a `String` or `Pathname` to the # file, an open `File` object, or an open `Tempfile` object. If you pass an open `File` or `Tempfile` object, # then you are responsible for closing it after the upload completes. When using an open Tempfile, rewind it - # before uploading or else the object will be empty. + # before uploading or else the object will be empty. # # @param [String] bucket # The name of the S3 bucket to upload to. @@ -156,15 +197,14 @@ def download_file(destination, bucket:, key:, **options) # Files larger han or equal to `:multipart_threshold` are uploaded using the S3 multipart upload APIs. # Default threshold is `100MB`. # - # @option options [Integer] :thread_count (10) - # The number of parallel multipart uploads. This option is not used if the file is smaller than - # `:multipart_threshold`. + # @option options [Integer] :thread_count (10) Customize threads used in the multipart upload. + # Only used when no custom executor is provided (creates {DefaultExecutor} with this thread count). # # @option options [Proc] :progress_callback (nil) # A Proc that will be called when each chunk of the upload is sent. # It will be invoked with `[bytes_read]` and `[total_sizes]`. # - # @raise [MultipartUploadError] If an file is being uploaded in parts, and the upload can not be completed, + # @raise [MultipartUploadError] If a file is being uploaded in parts, and the upload can not be completed, # then the upload is aborted and this error is raised. The raised error has a `#errors` method that # returns the failures that caused the upload to be aborted. # @@ -175,13 +215,16 @@ def download_file(destination, bucket:, key:, **options) # @see Client#complete_multipart_upload # @see Client#upload_part def upload_file(source, bucket:, key:, **options) - uploading_options = options.dup + executor = @executor || DefaultExecutor.new + upload_opts = options.dup uploader = FileUploader.new( - multipart_threshold: uploading_options.delete(:multipart_threshold), - client: @client + multipart_threshold: upload_opts.delete(:multipart_threshold), + client: @client, + executor: executor ) - response = uploader.upload(source, uploading_options.merge(bucket: bucket, key: key)) + response = uploader.upload(source, upload_opts.merge(bucket: bucket, key: key)) yield response if block_given? + executor.shutdown if @options[:executor] true end From a3f2b9fd9244405a9b6adc841fbdadc545ea7a50 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Tue, 7 Oct 2025 14:17:50 -0700 Subject: [PATCH 04/29] Remove thread count support from MPU --- gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb index bab023e1bf7..70f0100cec4 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb @@ -9,7 +9,6 @@ module S3 class MultipartFileUploader MIN_PART_SIZE = 5 * 1024 * 1024 # 5MB MAX_PARTS = 10_000 - DEFAULT_THREAD_COUNT = 10 CREATE_OPTIONS = Set.new(Client.api.operation(:create_multipart_upload).input.shape.member_names) COMPLETE_OPTIONS = Set.new(Client.api.operation(:complete_multipart_upload).input.shape.member_names) UPLOAD_PART_OPTIONS = Set.new(Client.api.operation(:upload_part).input.shape.member_names) From 3156f7c0d0fbfef9d18a6ebbd69be75e4eb307f7 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Tue, 7 Oct 2025 14:22:07 -0700 Subject: [PATCH 05/29] Update Object usage of executor --- gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb index 69e7d6905b9..efd621c6977 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb @@ -545,10 +545,11 @@ def upload_file(source, options = {}) # @see Client#get_object # @see Client#head_object def download_file(destination, options = {}) - executor = DefaultExecutor.new(max_threads: options[:thread_count]) + download_options = options.dup + executor = DefaultExecutor.new(max_threads: download_options.delete([:thread_count])) downloader = FileDownloader.new(client: client, executor: executor) Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do - downloader.download(destination, options.merge(bucket: bucket_name, key: key)) + downloader.download(destination, download_options.merge(bucket: bucket_name, key: key)) end executor.shutdown true From 84c996695b529849bad7c7a16bc7ca61e6eb8697 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Wed, 8 Oct 2025 09:41:14 -0700 Subject: [PATCH 06/29] Add documentation/remove unused methods from DefaultExecutor --- .../lib/aws-sdk-s3/default_executor.rb | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb index 0fbdde2f61f..a8cd6a4884d 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb @@ -4,18 +4,23 @@ module Aws module S3 # @api private class DefaultExecutor + DEFAULT_MAX_THREADS = 10 RUNNING = :running SHUTTING_DOWN = :shutting_down SHUTDOWN = :shutdown def initialize(options = {}) - @max_threads = options[:max_threads] || 10 + @max_threads = options[:max_threads] || DEFAULT_MAX_THREADS @state = RUNNING @queue = Queue.new @pool = [] @mutex = Mutex.new end + # Submits a task for execution. + # @param [Object] args Variable number of arguments to pass to the block + # @param [Proc] block The block to be executed + # @return [Boolean] Returns true if the task was submitted successfully def post(*args, &block) @mutex.synchronize do raise 'Executor has been shutdown and is no longer accepting tasks' unless @state == RUNNING @@ -26,6 +31,10 @@ def post(*args, &block) true end + # Immediately terminates all worker threads and clears pending tasks. + # This is a forceful shutdown that doesn't wait for running tasks to complete. + # + # @return [Boolean] true when termination is complete def kill @mutex.synchronize do @state = SHUTDOWN @@ -36,6 +45,12 @@ def kill true end + # Gracefully shuts down the executor, optionally with a timeout. + # Stops accepting new tasks and waits for running tasks to complete. + # + # @param timeout [Numeric, nil] Maximum time in seconds to wait for shutdown. + # If nil, waits indefinitely. If timeout expires, remaining threads are killed. + # @return [Boolean] true when shutdown is complete def shutdown(timeout = nil) @mutex.synchronize do return true if @state == SHUTDOWN @@ -62,18 +77,6 @@ def shutdown(timeout = nil) true end - def running? - @state == RUNNING - end - - def shutting_down? - @state == SHUTTING_DOWN - end - - def shutdown? - @state == SHUTDOWN - end - private def ensure_worker_available From 8e16a3bb503b73c4882228a8b9894006609295d4 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Wed, 8 Oct 2025 10:21:34 -0700 Subject: [PATCH 07/29] Add Default Executor specs --- gems/aws-sdk-s3/spec/default_executor_spec.rb | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 gems/aws-sdk-s3/spec/default_executor_spec.rb diff --git a/gems/aws-sdk-s3/spec/default_executor_spec.rb b/gems/aws-sdk-s3/spec/default_executor_spec.rb new file mode 100644 index 00000000000..26ba45f8a2d --- /dev/null +++ b/gems/aws-sdk-s3/spec/default_executor_spec.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +require_relative 'spec_helper' + +module Aws + module S3 + describe DefaultExecutor do + let(:subject) { DefaultExecutor.new } + + describe '#post' do + it 'executes a block with arguments' do + block = double('block') + expect(block).to receive(:call).with('hello') + + subject.post('hello') { |arg| block.call(arg) } + sleep 0.1 + end + + it 'returns true when a task is submitted' do + expect(subject.post('hello') { |_arg| }).to be(true) + end + + it 'raises when executor is shutdown' do + subject.shutdown + expect { subject.post }.to raise_error(RuntimeError) + end + end + + describe '#shutdown' do + it 'waits for running tasks to be complete' do + result = nil + subject.post do + sleep 0.2 + result = 'done' + end + expect(subject.shutdown).to be(true) + expect(result).to eq('done') + end + + it 'kills threads after timeout' do + result = nil + subject.post do + sleep 1 + result = 'done' + end + expect(subject.shutdown(0.1)).to be(true) + expect(result).to be_nil + end + end + + describe '#kill' do + it 'stops all threads immediately and returns true' do + completed = false + subject.post do + sleep 1 + completed = true + end + + sleep 0.1 + result = subject.kill + + expect(result).to be(true) + expect(completed).to be(false) + end + end + end + end +end From db1cb62e026e3b7b97cc9bcd2868e09beccb30cb Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Wed, 8 Oct 2025 10:49:59 -0700 Subject: [PATCH 08/29] Update TM docs and impl --- .../lib/aws-sdk-s3/transfer_manager.rb | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb index 77d995fd073..586d5e0d873 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb @@ -38,17 +38,9 @@ class TransferManager # **Required Methods:** # # * `post(*args, &block)` - Execute a task with given arguments and block - # # * `shutdown(timeout = nil)` - Gracefully shutdown the executor with optional timeout - # # * `kill` - Immediately terminate all running tasks # - # * `running?` - Returns true if executor is accepting new tasks - # - # * `shutting_down?` - Returns true if shutdown has been initiated but not completed - # - # * `shutdown?` - Returns true if executor has been fully shutdown - # def initialize(options = {}) @client = options[:client] || Client.new @executor = options[:executor] @@ -114,9 +106,8 @@ def initialize(options = {}) # @option options [Integer] :thread_count (10) Customize threads used in the multipart download. # Only used when no custom executor is provided (creates {DefaultExecutor} with this thread count). # - # @option options [String] :version_id The object version id used to retrieve the object. - # - # @see https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html ObjectVersioning + # @option options [String] :version_id The object version id used to retrieve the object. See + # {https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html ObjectVersioning} for further details. # # @option options [String] :checksum_mode ("ENABLED") # When `"ENABLED"` and the object has a stored checksum, it will be used to validate the download and will @@ -141,10 +132,11 @@ def initialize(options = {}) # @see Client#get_object # @see Client#head_object def download_file(destination, bucket:, key:, **options) - executor = @executor || DefaultExecutor.new + download_opts = options.dup + executor = @executor || DefaultExecutor.new(download_opts.delete(:thread_count)) downloader = FileDownloader.new(client: @client, executor: executor) downloader.download(destination, options.merge(bucket: bucket, key: key)) - executor.shutdown if @options[:executor] + executor.shutdown unless @options[:executor] true end @@ -198,7 +190,7 @@ def download_file(destination, bucket:, key:, **options) # Default threshold is `100MB`. # # @option options [Integer] :thread_count (10) Customize threads used in the multipart upload. - # Only used when no custom executor is provided (creates {DefaultExecutor} with this thread count). + # Only used when no custom executor is provided (creates {DefaultExecutor} with the default thread count). # # @option options [Proc] :progress_callback (nil) # A Proc that will be called when each chunk of the upload is sent. @@ -215,8 +207,8 @@ def download_file(destination, bucket:, key:, **options) # @see Client#complete_multipart_upload # @see Client#upload_part def upload_file(source, bucket:, key:, **options) - executor = @executor || DefaultExecutor.new upload_opts = options.dup + executor = @executor || DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count)) uploader = FileUploader.new( multipart_threshold: upload_opts.delete(:multipart_threshold), client: @client, @@ -224,7 +216,7 @@ def upload_file(source, bucket:, key:, **options) ) response = uploader.upload(source, upload_opts.merge(bucket: bucket, key: key)) yield response if block_given? - executor.shutdown if @options[:executor] + executor.shutdown unless @options[:executor] true end From f907c3b6e3358bcc1c1e4003e0950b700647c0c3 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Thu, 9 Oct 2025 10:17:58 -0700 Subject: [PATCH 09/29] Update streaming MPU to use executor --- .../aws-sdk-s3/multipart_stream_uploader.rb | 77 +++++++++---------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb index 60a298c720b..c96b0ffc0fa 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb @@ -11,7 +11,6 @@ module S3 class MultipartStreamUploader DEFAULT_PART_SIZE = 5 * 1024 * 1024 # 5MB - DEFAULT_THREAD_COUNT = 10 CREATE_OPTIONS = Set.new(Client.api.operation(:create_multipart_upload).input.shape.member_names) UPLOAD_PART_OPTIONS = Set.new(Client.api.operation(:upload_part).input.shape.member_names) COMPLETE_UPLOAD_OPTIONS = Set.new(Client.api.operation(:complete_multipart_upload).input.shape.member_names) @@ -19,9 +18,9 @@ class MultipartStreamUploader # @option options [Client] :client def initialize(options = {}) @client = options[:client] || Client.new + @executor = options[:executor] @tempfile = options[:tempfile] @part_size = options[:part_size] || DEFAULT_PART_SIZE - @thread_count = options[:thread_count] || DEFAULT_THREAD_COUNT end # @return [Client] @@ -29,7 +28,6 @@ def initialize(options = {}) # @option options [required,String] :bucket # @option options [required,String] :key - # @option options [Integer] :thread_count (DEFAULT_THREAD_COUNT) # @return [Seahorse::Client::Response] - the CompleteMultipartUploadResponse def upload(options = {}, &block) Aws::Plugins::UserAgent.metric('S3_TRANSFER') do @@ -54,28 +52,32 @@ def complete_upload(upload_id, parts, options) end def upload_parts(upload_id, options, &block) - completed = Queue.new - thread_errors = [] - errors = begin + completed_parts = Queue.new + errors = [] + + begin IO.pipe do |read_pipe, write_pipe| - threads = upload_in_threads( - read_pipe, - completed, - upload_part_opts(options).merge(upload_id: upload_id), - thread_errors - ) + upload_thread = Thread.new do + upload_with_executor( + read_pipe, + completed_parts, + errors, + upload_part_opts(options).merge(upload_id: upload_id) + ) + end + begin block.call(write_pipe) ensure # Ensure the pipe is closed to avoid https://github.com/jruby/jruby/issues/6111 write_pipe.close end - threads.map(&:value).compact + upload_thread.join end rescue StandardError => e - thread_errors + [e] + errors << e end - return ordered_parts(completed) if errors.empty? + return ordered_parts(completed_parts) if errors.empty? abort_upload(upload_id, options, errors) end @@ -128,37 +130,34 @@ def read_to_part_body(read_pipe) end end - def upload_in_threads(read_pipe, completed, options, thread_errors) - mutex = Mutex.new + def upload_with_executor(read_pipe, completed, errors, options) + completion_queue = Queue.new + queued_parts = 0 part_number = 0 - options.fetch(:thread_count, @thread_count).times.map do - thread = Thread.new do - loop do - body, thread_part_number = mutex.synchronize do - [read_to_part_body(read_pipe), part_number += 1] - end - break unless body || thread_part_number == 1 - - begin - part = options.merge(body: body, part_number: thread_part_number) - resp = @client.upload_part(part) - completed_part = create_completed_part(resp, part) - completed.push(completed_part) - ensure - clear_body(body) - end - end - nil + mutex = Mutex.new + loop do + part_body, current_part_num = mutex.synchronize do + [read_to_part_body(read_pipe), part_number += 1] + end + break unless part_body || current_part_num == 1 + + queued_parts += 1 + @executor.post(part_body, current_part_num, options) do |body, num, opts| + part = opts.merge(body: body, part_number: num) + resp = @client.upload_part(part) + completed_part = create_completed_part(resp, part) + completed.push(completed_part) rescue StandardError => e - # keep other threads from uploading other parts mutex.synchronize do - thread_errors.push(e) + errors.push(e) read_pipe.close_read unless read_pipe.closed? end - e + ensure + clear_body(body) + completion_queue << :done end - thread end + queued_parts.times { completion_queue.pop } end def create_completed_part(resp, part) From 7cb940ac8dfa1645a02246754096e98465bce943 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Thu, 9 Oct 2025 10:37:36 -0700 Subject: [PATCH 10/29] More MP Stream updates --- .../lib/aws-sdk-s3/multipart_stream_uploader.rb | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb index c96b0ffc0fa..ae6f75a47a1 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb @@ -66,12 +66,10 @@ def upload_parts(upload_id, options, &block) ) end - begin - block.call(write_pipe) - ensure - # Ensure the pipe is closed to avoid https://github.com/jruby/jruby/issues/6111 - write_pipe.close - end + block.call(write_pipe) + ensure + # Ensure the pipe is closed to avoid https://github.com/jruby/jruby/issues/6111 + write_pipe.close upload_thread.join end rescue StandardError => e From 4003536958ddad68b1b674b71a0c94eefb7d086c Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Thu, 9 Oct 2025 10:37:49 -0700 Subject: [PATCH 11/29] Update specs --- gems/aws-sdk-s3/spec/multipart_stream_uploader_spec.rb | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/gems/aws-sdk-s3/spec/multipart_stream_uploader_spec.rb b/gems/aws-sdk-s3/spec/multipart_stream_uploader_spec.rb index e7c2860f5a7..5413e39826e 100644 --- a/gems/aws-sdk-s3/spec/multipart_stream_uploader_spec.rb +++ b/gems/aws-sdk-s3/spec/multipart_stream_uploader_spec.rb @@ -7,11 +7,16 @@ module Aws module S3 describe MultipartStreamUploader do let(:client) { S3::Client.new(stub_responses: true) } - let(:subject) { MultipartStreamUploader.new(client: client) } + let(:executor) { S3::DefaultExecutor.new } + let(:subject) { MultipartStreamUploader.new(client: client, executor: executor) } let(:params) { { bucket: 'bucket', key: 'key' } } let(:one_mb) { '.' * 1024 * 1024 } let(:seventeen_mb) { one_mb * 17 } + after(:each) do + executor.shutdown + end + describe '#initialize' do it 'constructs a default s3 client when none provided' do client = double('client') @@ -50,7 +55,6 @@ module S3 } ) expect(client).to receive(:complete_multipart_upload).with(expected_params).once - subject.upload(params.merge(content_type: 'text/plain')) { |write_stream| write_stream << seventeen_mb } end @@ -155,7 +159,7 @@ module S3 end context 'when tempfile is true' do - let(:subject) { MultipartStreamUploader.new(client: client, tempfile: true) } + let(:subject) { MultipartStreamUploader.new(client: client, tempfile: true, executor: S3::DefaultExecutor.new) } it 'uses multipart APIs' do client.stub_responses(:create_multipart_upload, upload_id: 'id') From 7dddda9412fc8698a835c75a753e73478d828377 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Thu, 9 Oct 2025 10:38:13 -0700 Subject: [PATCH 12/29] Update interfaces --- gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb | 4 +++- gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb index efd621c6977..c196c534e63 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb @@ -384,9 +384,10 @@ def public_url(options = {}) # @see Client#upload_part def upload_stream(options = {}, &block) uploading_options = options.dup + executor = DefaultExecutor.new(max_threads: uploading_options.delete(:thread_count)) uploader = MultipartStreamUploader.new( client: client, - thread_count: uploading_options.delete(:thread_count), + executor: executor, tempfile: uploading_options.delete(:tempfile), part_size: uploading_options.delete(:part_size) ) @@ -396,6 +397,7 @@ def upload_stream(options = {}, &block) &block ) end + executor.shutdown true end deprecated(:upload_stream, use: 'Aws::S3::TransferManager#upload_stream', version: 'next major version') diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb index 586d5e0d873..733baa0a17c 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb @@ -273,13 +273,15 @@ def upload_file(source, bucket:, key:, **options) # @see Client#upload_part def upload_stream(bucket:, key:, **options, &block) uploading_options = options.dup + executor = @executor || DefaultExecutor.new(max_threads: uploading_options.delete(:thread_count)) uploader = MultipartStreamUploader.new( client: @client, - thread_count: uploading_options.delete(:thread_count), + executor: executor, tempfile: uploading_options.delete(:tempfile), part_size: uploading_options.delete(:part_size) ) uploader.upload(uploading_options.merge(bucket: bucket, key: key), &block) + executor.shutdown unless @options[:executor] true end end From 481f19866f42801c6fc86a4e59b0de9822caf95d Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Thu, 9 Oct 2025 10:44:30 -0700 Subject: [PATCH 13/29] Update specs --- gems/aws-sdk-s3/spec/file_downloader_spec.rb | 3 +-- gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb | 3 +-- gems/aws-sdk-s3/spec/multipart_stream_uploader_spec.rb | 9 ++------- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/gems/aws-sdk-s3/spec/file_downloader_spec.rb b/gems/aws-sdk-s3/spec/file_downloader_spec.rb index 2632377b981..a596c1fa6ba 100644 --- a/gems/aws-sdk-s3/spec/file_downloader_spec.rb +++ b/gems/aws-sdk-s3/spec/file_downloader_spec.rb @@ -7,8 +7,7 @@ module Aws module S3 describe FileDownloader do let(:client) { S3::Client.new(stub_responses: true) } - let(:executor) { DefaultExecutor.new } - let(:subject) { FileDownloader.new(client: client, executor: executor) } + let(:subject) { FileDownloader.new(client: client, executor: DefaultExecutor.new) } let(:tmpdir) { Dir.tmpdir } describe '#initialize' do diff --git a/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb b/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb index 54b50a13a01..0d2cd2fae4f 100644 --- a/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb +++ b/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb @@ -7,8 +7,7 @@ module Aws module S3 describe MultipartFileUploader do let(:client) { S3::Client.new(stub_responses: true) } - let(:executor) { DefaultExecutor.new } - let(:subject) { MultipartFileUploader.new(client: client, executor: executor) } + let(:subject) { MultipartFileUploader.new(client: client, executor: DefaultExecutor.new) } let(:params) { { bucket: 'bucket', key: 'key' } } describe '#initialize' do diff --git a/gems/aws-sdk-s3/spec/multipart_stream_uploader_spec.rb b/gems/aws-sdk-s3/spec/multipart_stream_uploader_spec.rb index 5413e39826e..6262a42b560 100644 --- a/gems/aws-sdk-s3/spec/multipart_stream_uploader_spec.rb +++ b/gems/aws-sdk-s3/spec/multipart_stream_uploader_spec.rb @@ -7,16 +7,11 @@ module Aws module S3 describe MultipartStreamUploader do let(:client) { S3::Client.new(stub_responses: true) } - let(:executor) { S3::DefaultExecutor.new } - let(:subject) { MultipartStreamUploader.new(client: client, executor: executor) } + let(:subject) { MultipartStreamUploader.new(client: client, executor: DefaultExecutor.new) } let(:params) { { bucket: 'bucket', key: 'key' } } let(:one_mb) { '.' * 1024 * 1024 } let(:seventeen_mb) { one_mb * 17 } - after(:each) do - executor.shutdown - end - describe '#initialize' do it 'constructs a default s3 client when none provided' do client = double('client') @@ -159,7 +154,7 @@ module S3 end context 'when tempfile is true' do - let(:subject) { MultipartStreamUploader.new(client: client, tempfile: true, executor: S3::DefaultExecutor.new) } + let(:subject) { MultipartStreamUploader.new(client: client, tempfile: true, executor: DefaultExecutor.new) } it 'uses multipart APIs' do client.stub_responses(:create_multipart_upload, upload_id: 'id') From 88bf44af1be34ad893e113781591adbcdc4b341d Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Thu, 9 Oct 2025 10:45:21 -0700 Subject: [PATCH 14/29] Update changelog --- gems/aws-sdk-s3/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gems/aws-sdk-s3/CHANGELOG.md b/gems/aws-sdk-s3/CHANGELOG.md index 5447ec027e4..20971f2cdd0 100644 --- a/gems/aws-sdk-s3/CHANGELOG.md +++ b/gems/aws-sdk-s3/CHANGELOG.md @@ -1,7 +1,7 @@ Unreleased Changes ------------------ -* Feature - Add lightweight thread pool executor for multipart `download_file` and `upload_file`. +* Feature - Add lightweight thread pool executor for multipart `download_file`, `upload_file` and `upload_stream`. 1.199.1 (2025-09-25) ------------------ From c1a25cd9e6fc4a94acdac481fedbe9c88c2633c6 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Thu, 9 Oct 2025 10:53:48 -0700 Subject: [PATCH 15/29] Minor updates --- gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb index 733baa0a17c..f92b2c36c39 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb @@ -104,7 +104,7 @@ def initialize(options = {}) # @option options [Integer] :chunk_size required in `"get_range"` mode. # # @option options [Integer] :thread_count (10) Customize threads used in the multipart download. - # Only used when no custom executor is provided (creates {DefaultExecutor} with this thread count). + # Only used when no custom executor is provided (creates {DefaultExecutor} with given thread count). # # @option options [String] :version_id The object version id used to retrieve the object. See # {https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html ObjectVersioning} for further details. @@ -190,7 +190,7 @@ def download_file(destination, bucket:, key:, **options) # Default threshold is `100MB`. # # @option options [Integer] :thread_count (10) Customize threads used in the multipart upload. - # Only used when no custom executor is provided (creates {DefaultExecutor} with the default thread count). + # Only used when no custom executor is provided (creates {DefaultExecutor} with the given thread count). # # @option options [Proc] :progress_callback (nil) # A Proc that will be called when each chunk of the upload is sent. @@ -252,7 +252,8 @@ def upload_file(source, bucket:, key:, **options) # {Client#upload_part} can be provided. # # @option options [Integer] :thread_count (10) - # The number of parallel multipart uploads. + # The number of parallel multipart uploads. Only used when no custom executor is provided + # (creates {DefaultExecutor} with the given thread count). # # @option options [Boolean] :tempfile (false) # Normally read data is stored in memory when building the parts in order to complete the underlying From 7522a16f214985301f841896bbabef3a9249872b Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Thu, 9 Oct 2025 11:01:20 -0700 Subject: [PATCH 16/29] Fix failing specs --- gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb | 2 +- gems/aws-sdk-s3/spec/object/upload_stream_spec.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb index f92b2c36c39..1107aefdb07 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb @@ -133,7 +133,7 @@ def initialize(options = {}) # @see Client#head_object def download_file(destination, bucket:, key:, **options) download_opts = options.dup - executor = @executor || DefaultExecutor.new(download_opts.delete(:thread_count)) + executor = @executor || DefaultExecutor.new(max_threads: download_opts.delete(:thread_count)) downloader = FileDownloader.new(client: @client, executor: executor) downloader.download(destination, options.merge(bucket: bucket, key: key)) executor.shutdown unless @options[:executor] diff --git a/gems/aws-sdk-s3/spec/object/upload_stream_spec.rb b/gems/aws-sdk-s3/spec/object/upload_stream_spec.rb index 269b91746eb..d75b58a7515 100644 --- a/gems/aws-sdk-s3/spec/object/upload_stream_spec.rb +++ b/gems/aws-sdk-s3/spec/object/upload_stream_spec.rb @@ -27,9 +27,9 @@ module S3 it 'respects the thread_count option' do custom_thread_count = 20 - expect(Thread).to receive(:new).exactly(custom_thread_count).times.and_return(double(value: nil)) client.stub_responses(:create_multipart_upload, upload_id: 'id') client.stub_responses(:complete_multipart_upload) + expect(DefaultExecutor).to receive(:new).with(max_threads: custom_thread_count).and_call_original subject.upload_stream(thread_count: custom_thread_count) { |_write_stream| } end From 9eea233900ddb164a41d8b28cccc02d849015341 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Fri, 10 Oct 2025 11:20:15 -0700 Subject: [PATCH 17/29] Feedback - address sleep in specs --- gems/aws-sdk-s3/spec/default_executor_spec.rb | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/gems/aws-sdk-s3/spec/default_executor_spec.rb b/gems/aws-sdk-s3/spec/default_executor_spec.rb index 26ba45f8a2d..640dec884c1 100644 --- a/gems/aws-sdk-s3/spec/default_executor_spec.rb +++ b/gems/aws-sdk-s3/spec/default_executor_spec.rb @@ -13,7 +13,7 @@ module S3 expect(block).to receive(:call).with('hello') subject.post('hello') { |arg| block.call(arg) } - sleep 0.1 + sleep 0.01 end it 'returns true when a task is submitted' do @@ -29,10 +29,7 @@ module S3 describe '#shutdown' do it 'waits for running tasks to be complete' do result = nil - subject.post do - sleep 0.2 - result = 'done' - end + subject.post { result = 'done' } expect(subject.shutdown).to be(true) expect(result).to eq('done') end @@ -40,10 +37,10 @@ module S3 it 'kills threads after timeout' do result = nil subject.post do - sleep 1 + sleep 0.02 result = 'done' end - expect(subject.shutdown(0.1)).to be(true) + expect(subject.shutdown(0.01)).to be(true) expect(result).to be_nil end end @@ -52,11 +49,9 @@ module S3 it 'stops all threads immediately and returns true' do completed = false subject.post do - sleep 1 + sleep 0.01 completed = true end - - sleep 0.1 result = subject.kill expect(result).to be(true) From 75b0d96909209460ab11cf5949b9148043f7d99c Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Fri, 10 Oct 2025 11:21:28 -0700 Subject: [PATCH 18/29] Feedback - update method name for cleanup_team_file --- gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb index 4181c7eaeac..3ab854f9403 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb @@ -35,7 +35,7 @@ def download(destination, options = {}) end File.rename(opts[:temp_path], destination) if opts[:temp_path] ensure - cleanup_temp_file!(opts) + cleanup_temp_file(opts) end private @@ -52,7 +52,7 @@ def build_download_opts(destination, opts) } end - def cleanup_temp_file!(opts) + def cleanup_temp_file(opts) return unless opts temp_file = opts[:temp_path] From ad943ee6951d885cae4cddcca21c901dc3652bf2 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Fri, 10 Oct 2025 11:27:45 -0700 Subject: [PATCH 19/29] Feedback - wrap checksum callback --- gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb index 3ab854f9403..5edb8a0e9dc 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb @@ -81,9 +81,7 @@ def download_with_executor(part_list, total_size, opts) validate_range(range, p.params[:range]) if p.params[:range] write(resp.body, range, opts) - if opts[:on_checksum_validated] && resp.checksum_validated - opts[:on_checksum_validated].call(resp.checksum_validated, resp) - end + execute_checksum_callback(resp, opts) rescue StandardError => e abort_download = true error = e @@ -207,6 +205,12 @@ def update_progress(progress, part) end end + def execute_checksum_callback(resp, opts) + return unless opts[:on_checksum_validated] && resp.checksum_validated + + opts[:on_checksum_validated].call(resp.checksum_validated, resp) + end + def validate_destination!(destination) valid_types = [String, Pathname, File, Tempfile] return if valid_types.include?(destination.class) From f1fc86a59f38736fadb777fd63258e6f05e8e66c Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Fri, 10 Oct 2025 11:29:41 -0700 Subject: [PATCH 20/29] Feedback - update method name in MPU --- gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb index 70f0100cec4..1c210839d5a 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb @@ -109,19 +109,19 @@ def checksum_key?(key) CHECKSUM_KEYS.include?(key) end - def has_checksum_keys?(keys) + def has_checksum_key?(keys) keys.any? { |key| checksum_key?(key) } end def create_opts(options) opts = { checksum_algorithm: Aws::Plugins::ChecksumAlgorithm::DEFAULT_CHECKSUM } - opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_keys?(options.keys) + opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_key?(options.keys) CREATE_OPTIONS.each_with_object(opts) { |k, h| h[k] = options[k] if options.key?(k) } end def complete_opts(options) opts = {} - opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_keys?(options.keys) + opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_key?(options.keys) COMPLETE_OPTIONS.each_with_object(opts) { |k, h| h[k] = options[k] if options.key?(k) } end From 09eae68daedfb8be7ccee42670c230351bb4ebbd Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Fri, 10 Oct 2025 11:40:08 -0700 Subject: [PATCH 21/29] Feedback - streamline handling of progress callbacks --- gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb | 12 ++++++------ .../lib/aws-sdk-s3/multipart_file_uploader.rb | 11 ++++++----- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb index 5edb8a0e9dc..d32f6bc60cb 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb @@ -64,18 +64,14 @@ def download_with_executor(part_list, total_size, opts) completion_queue = Queue.new abort_download = false error = nil - progress = - if (progress_callback = opts[:progress_callback]) - MultipartProgress.new(part_list, total_size, progress_callback) - end + progress = MultipartProgress.new(part_list, total_size, opts[:progress_callback]) while (part = part_list.shift) break if abort_download download_attempts += 1 @executor.post(part) do |p| - update_progress(progress, p) if progress - + update_progress(progress, p) resp = @client.get_object(p.params) range = extract_range(resp.content_range) validate_range(range, p.params[:range]) if p.params[:range] @@ -199,6 +195,8 @@ def single_part_progress(opts) end def update_progress(progress, part) + return unless progress.progress_callback + part.params[:on_chunk_received] = proc do |_chunk, bytes, total| progress.call(part.part_number, bytes, total) @@ -288,6 +286,8 @@ def initialize(parts, total_size, progress_callback) @progress_callback = progress_callback end + attr_reader :progress_callback + def call(part_number, bytes_received, total) # part numbers start at 1 @bytes_received[part_number - 1] = bytes_received diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb index 1c210839d5a..e1f05d6ee16 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb @@ -137,17 +137,14 @@ def upload_with_executor(pending, completed, options) completion_queue = Queue.new abort_upload = false errors = [] - - if (callback = options[:progress_callback]) - progress = MultipartProgress.new(pending, callback) - end + progress = MultipartProgress.new(pending, options[:progress_callback]) while (part = pending.shift) break if abort_upload upload_attempts += 1 @executor.post(part) do |p| - update_progress(progress, p) if progress + update_progress(progress, p) resp = @client.upload_part(p) p[:body].close completed_part = { etag: resp.etag, part_number: p[:part_number] } @@ -180,6 +177,8 @@ def part_size(total_size, part_size, offset) end def update_progress(progress, part) + return unless progress.progress_callback + part[:on_chunk_sent] = proc do |_chunk, bytes, _total| progress.call(part[:part_number], bytes) @@ -226,6 +225,8 @@ def initialize(parts, progress_callback) @progress_callback = progress_callback end + attr_reader :progress_callback + def call(part_number, bytes_read) # part numbers start at 1 @bytes_sent[part_number - 1] = bytes_read From e824de0c00830bd7a8d1f4053f1b6e0199271860 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Fri, 10 Oct 2025 12:13:55 -0700 Subject: [PATCH 22/29] Feedback - streamline docs --- gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb | 4 ---- gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb | 5 +---- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb index c196c534e63..1236ca9ccf1 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb @@ -520,10 +520,6 @@ def upload_file(source, options = {}) # # @option options [Integer] :thread_count (10) Customize threads used in the multipart download. # - # @option options [String] :version_id The object version id used to retrieve the object. - # - # @see https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html ObjectVersioning - # # @option options [String] :checksum_mode ("ENABLED") # When `"ENABLED"` and the object has a stored checksum, it will be used to validate the download and will # raise an `Aws::Errors::ChecksumError` if checksum validation fails. You may provide a `on_checksum_validated` diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb index 1107aefdb07..b932d136256 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb @@ -50,7 +50,7 @@ def initialize(options = {}) # @return [S3::Client] attr_reader :client - # @return [S3::Client] + # @return [Object] attr_reader :executor # Downloads a file in S3 to a path on disk. @@ -106,9 +106,6 @@ def initialize(options = {}) # @option options [Integer] :thread_count (10) Customize threads used in the multipart download. # Only used when no custom executor is provided (creates {DefaultExecutor} with given thread count). # - # @option options [String] :version_id The object version id used to retrieve the object. See - # {https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html ObjectVersioning} for further details. - # # @option options [String] :checksum_mode ("ENABLED") # When `"ENABLED"` and the object has a stored checksum, it will be used to validate the download and will # raise an `Aws::Errors::ChecksumError` if checksum validation fails. You may provide a `on_checksum_validated` From cd91eb75fa1e9f06105f2192fc27225ca0130989 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Mon, 13 Oct 2025 07:38:33 -0700 Subject: [PATCH 23/29] Feedback - streamline opts --- .../lib/aws-sdk-s3/customizations/object.rb | 27 +++++++++---------- .../lib/aws-sdk-s3/file_downloader.rb | 2 +- .../lib/aws-sdk-s3/transfer_manager.rb | 18 ++++++------- 3 files changed, 22 insertions(+), 25 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb index 1236ca9ccf1..379c074f107 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb @@ -383,19 +383,16 @@ def public_url(options = {}) # @see Client#complete_multipart_upload # @see Client#upload_part def upload_stream(options = {}, &block) - uploading_options = options.dup - executor = DefaultExecutor.new(max_threads: uploading_options.delete(:thread_count)) + upload_opts = options.merge(bucket: bucket_name, key: key) + executor = DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count)) uploader = MultipartStreamUploader.new( client: client, executor: executor, - tempfile: uploading_options.delete(:tempfile), - part_size: uploading_options.delete(:part_size) + tempfile: upload_opts.delete(:tempfile), + part_size: upload_opts.delete(:part_size) ) Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do - uploader.upload( - uploading_options.merge(bucket: bucket_name, key: key), - &block - ) + uploader.upload(upload_opts, &block) end executor.shutdown true @@ -460,15 +457,15 @@ def upload_stream(options = {}, &block) # @see Client#complete_multipart_upload # @see Client#upload_part def upload_file(source, options = {}) - uploading_options = options.dup - executor = DefaultExecutor.new(max_threads: uploading_options.delete(:thread_count)) + upload_opts = options.merge(bucket: bucket_name, key: key) + executor = DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count)) uploader = FileUploader.new( client: client, executor: executor, - multipart_threshold: uploading_options.delete(:multipart_threshold) + multipart_threshold: upload_opts.delete(:multipart_threshold) ) response = Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do - uploader.upload(source, uploading_options.merge(bucket: bucket_name, key: key)) + uploader.upload(source, upload_opts) end yield response if block_given? executor.shutdown @@ -543,11 +540,11 @@ def upload_file(source, options = {}) # @see Client#get_object # @see Client#head_object def download_file(destination, options = {}) - download_options = options.dup - executor = DefaultExecutor.new(max_threads: download_options.delete([:thread_count])) + download_opts = options.merge(bucket: bucket_name, key: key) + executor = DefaultExecutor.new(max_threads: download_opts.delete([:thread_count])) downloader = FileDownloader.new(client: client, executor: executor) Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do - downloader.download(destination, download_options.merge(bucket: bucket_name, key: key)) + downloader.download(destination, download_opts) end executor.shutdown true diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb index d32f6bc60cb..7e8f545cb01 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb @@ -23,7 +23,7 @@ def initialize(options = {}) def download(destination, options = {}) validate_destination!(destination) - opts = build_download_opts(destination, options.dup) + opts = build_download_opts(destination, options) validate_opts!(opts) Aws::Plugins::UserAgent.metric('S3_TRANSFER') do diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb index b932d136256..c9d0dd1ae69 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb @@ -129,10 +129,10 @@ def initialize(options = {}) # @see Client#get_object # @see Client#head_object def download_file(destination, bucket:, key:, **options) - download_opts = options.dup + download_opts = options.merge(bucket: bucket, key: key) executor = @executor || DefaultExecutor.new(max_threads: download_opts.delete(:thread_count)) downloader = FileDownloader.new(client: @client, executor: executor) - downloader.download(destination, options.merge(bucket: bucket, key: key)) + downloader.download(destination, download_opts) executor.shutdown unless @options[:executor] true end @@ -204,14 +204,14 @@ def download_file(destination, bucket:, key:, **options) # @see Client#complete_multipart_upload # @see Client#upload_part def upload_file(source, bucket:, key:, **options) - upload_opts = options.dup + upload_opts = options.merge(bucket: bucket, key: key) executor = @executor || DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count)) uploader = FileUploader.new( multipart_threshold: upload_opts.delete(:multipart_threshold), client: @client, executor: executor ) - response = uploader.upload(source, upload_opts.merge(bucket: bucket, key: key)) + response = uploader.upload(source, upload_opts) yield response if block_given? executor.shutdown unless @options[:executor] true @@ -270,15 +270,15 @@ def upload_file(source, bucket:, key:, **options) # @see Client#complete_multipart_upload # @see Client#upload_part def upload_stream(bucket:, key:, **options, &block) - uploading_options = options.dup - executor = @executor || DefaultExecutor.new(max_threads: uploading_options.delete(:thread_count)) + upload_opts = options.merge(bucket: bucket, key: key) + executor = @executor || DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count)) uploader = MultipartStreamUploader.new( client: @client, executor: executor, - tempfile: uploading_options.delete(:tempfile), - part_size: uploading_options.delete(:part_size) + tempfile: upload_opts.delete(:tempfile), + part_size: upload_opts.delete(:part_size) ) - uploader.upload(uploading_options.merge(bucket: bucket, key: key), &block) + uploader.upload(upload_opts, &block) executor.shutdown unless @options[:executor] true end From abf78d69657f4afb013fd81edc5b4817d5f07fda Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Mon, 13 Oct 2025 08:56:22 -0700 Subject: [PATCH 24/29] Feedback - remove sleep from specs when possible --- gems/aws-sdk-s3/spec/default_executor_spec.rb | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/gems/aws-sdk-s3/spec/default_executor_spec.rb b/gems/aws-sdk-s3/spec/default_executor_spec.rb index 640dec884c1..ffb6974d096 100644 --- a/gems/aws-sdk-s3/spec/default_executor_spec.rb +++ b/gems/aws-sdk-s3/spec/default_executor_spec.rb @@ -9,11 +9,9 @@ module S3 describe '#post' do it 'executes a block with arguments' do - block = double('block') - expect(block).to receive(:call).with('hello') - - subject.post('hello') { |arg| block.call(arg) } - sleep 0.01 + queue = Queue.new + subject.post('hello') { |arg| queue << arg } + expect(queue.pop).to eq('hello') end it 'returns true when a task is submitted' do @@ -29,33 +27,40 @@ module S3 describe '#shutdown' do it 'waits for running tasks to be complete' do result = nil - subject.post { result = 'done' } + subject.post { result = true } expect(subject.shutdown).to be(true) - expect(result).to eq('done') + expect(result).to be(true) end it 'kills threads after timeout' do - result = nil + started = Queue.new + counter = 0 subject.post do - sleep 0.02 - result = 'done' + counter += 1 + started << 'work started' + sleep 1 + counter += 1 end + started.pop expect(subject.shutdown(0.01)).to be(true) - expect(result).to be_nil + expect(counter).to eq(1) end end describe '#kill' do it 'stops all threads immediately and returns true' do - completed = false + started = Queue.new + counter = 0 subject.post do - sleep 0.01 - completed = true + counter += 1 + started << 'work started' + sleep 1 + counter += 1 end + started.pop result = subject.kill - expect(result).to be(true) - expect(completed).to be(false) + expect(counter).to eq(1) end end end From 04a287fd50c23f542edf36926b9ebc023baa0a46 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Mon, 13 Oct 2025 09:52:18 -0700 Subject: [PATCH 25/29] Feedback - update to use 10 threads only --- .../lib/aws-sdk-s3/multipart_stream_uploader.rb | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb index ae6f75a47a1..422b049130c 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_stream_uploader.rb @@ -53,24 +53,22 @@ def complete_upload(upload_id, parts, options) def upload_parts(upload_id, options, &block) completed_parts = Queue.new + done_signal = Queue.new errors = [] + part_opts = upload_part_opts(options).merge(upload_id: upload_id) begin IO.pipe do |read_pipe, write_pipe| - upload_thread = Thread.new do - upload_with_executor( - read_pipe, - completed_parts, - errors, - upload_part_opts(options).merge(upload_id: upload_id) - ) + @executor.post(read_pipe, completed_parts, errors, part_opts) do |r_pipe, parts, errs, opts| + upload_with_executor(r_pipe, parts, errs, opts) + ensure + done_signal << :done end - block.call(write_pipe) ensure # Ensure the pipe is closed to avoid https://github.com/jruby/jruby/issues/6111 write_pipe.close - upload_thread.join + done_signal.pop end rescue StandardError => e errors << e From 54b9addbf898453b1164de95456080a3e81c3636 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Mon, 13 Oct 2025 11:50:04 -0700 Subject: [PATCH 26/29] Add directory features --- .../lib/aws-sdk-s3/customizations.rb | 13 +- .../aws-sdk-s3/directory_download_error.rb | 14 ++ .../lib/aws-sdk-s3/directory_downloader.rb | 163 ++++++++++++ .../lib/aws-sdk-s3/directory_progress.rb | 24 ++ .../lib/aws-sdk-s3/directory_upload_error.rb | 16 ++ .../lib/aws-sdk-s3/directory_uploader.rb | 233 ++++++++++++++++++ 6 files changed, 460 insertions(+), 3 deletions(-) create mode 100644 gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb create mode 100644 gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb create mode 100644 gems/aws-sdk-s3/lib/aws-sdk-s3/directory_progress.rb create mode 100644 gems/aws-sdk-s3/lib/aws-sdk-s3/directory_upload_error.rb create mode 100644 gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb index 1ffb0c06892..36c75bb3169 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb @@ -6,15 +6,22 @@ module S3 autoload :BucketRegionCache, 'aws-sdk-s3/bucket_region_cache' autoload :Encryption, 'aws-sdk-s3/encryption' autoload :EncryptionV2, 'aws-sdk-s3/encryption_v2' - autoload :FilePart, 'aws-sdk-s3/file_part' + autoload :LegacySigner, 'aws-sdk-s3/legacy_signer' + + # transfer manager + multipart upload/download utilities autoload :DefaultExecutor, 'aws-sdk-s3/default_executor' - autoload :FileUploader, 'aws-sdk-s3/file_uploader' + autoload :FilePart, 'aws-sdk-s3/file_part' autoload :FileDownloader, 'aws-sdk-s3/file_downloader' - autoload :LegacySigner, 'aws-sdk-s3/legacy_signer' + autoload :FileUploader, 'aws-sdk-s3/file_uploader' autoload :MultipartDownloadError, 'aws-sdk-s3/multipart_download_error' autoload :MultipartFileUploader, 'aws-sdk-s3/multipart_file_uploader' autoload :MultipartStreamUploader, 'aws-sdk-s3/multipart_stream_uploader' autoload :MultipartUploadError, 'aws-sdk-s3/multipart_upload_error' + autoload :DirectoryProgress, '../aws-sdk-s3/directory_progress' + autoload :DirectoryDownloadError, '../aws-sdk-s3/directory_download_error' + autoload :DirectoryDownloader, '../aws-sdk-s3/directory_downloader' + autoload :DirectoryUploader, '../aws-sdk-s3/directory_uploader' + autoload :DirectoryUploadError, '../aws-sdk-s3/directory_upload_error' autoload :ObjectCopier, 'aws-sdk-s3/object_copier' autoload :ObjectMultipartCopier, 'aws-sdk-s3/object_multipart_copier' autoload :PresignedPost, 'aws-sdk-s3/presigned_post' diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb new file mode 100644 index 00000000000..acd81c07545 --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb @@ -0,0 +1,14 @@ +module Aws + module S3 + # Raised when DirectoryDownloader fails to download objects from S3 bucket + class DirectoryDownloadError < StandardError + def initialize(message, errors = []) + @errors = errors + super(message) + end + + # @return [Array] The list of errors encountered when downloading objects + attr_reader :errors + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb new file mode 100644 index 00000000000..fbeb9afe6bf --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb @@ -0,0 +1,163 @@ +# frozen_string_literal: true + +module Aws + module S3 + # @api private + class DirectoryDownloader + def initialize(options = {}) + @client = options[:client] + @executor = options[:executor] + @abort_requested = false + @mutex = Mutex.new + end + + attr_reader :abort_requested + + def download(destination, bucket:, **options) + if File.exist?(destination) + raise ArgumentError, 'invalid destination, expected a directory' unless File.directory?(destination) + else + FileUtils.mkdir_p(destination) + end + + download_opts = build_download_opts(destination, bucket, options) + downloader = FileDownloader.new(client: @client, executor: @executor) + producer = ObjectProducer.new(download_opts.merge(client: @client, directory_downloader: self)) + downloads, errors = process_download_queue(producer, downloader, download_opts) + build_result(downloads, errors) + ensure + @abort_requested = false + end + + private + + def request_abort + @mutex.synchronize { @abort_requested = true } + end + def build_download_opts(destination, bucket, opts) + { + destination: destination, + bucket: bucket, + s3_prefix: opts.delete(:s3_prefix), + ignore_failure: opts.delete(:ignore_failure) || false, + filter_callback: opts.delete(:filter_callback), + progress_callback: opts.delete(:progress_callback) + } + end + + def build_result(download_count, errors) + if @abort_requested + msg = "directory download failed: #{errors.map(&:message).join('; ')}" + raise DirectoryDownloadError.new(msg, errors) + else + { + completed_downloads: [download_count - errors.count, 0].max, + failed_downloads: errors.count, + errors: errors.any? ? errors : nil + }.compact + end + end + + def handle_error(executor, opts) + return if opts[:ignore_failure] + + request_abort + executor.kill + end + + def process_download_queue(producer, downloader, opts) + # Separate executor for lightweight queuing tasks, avoiding interference with main @executor lifecycle + queue_executor = DefaultExecutor.new + progress = DirectoryProgress.new(opts[:progress_callback]) if opts[:progress_callback] + download_attempts = 0 + errors = [] + begin + producer.each do |object| + break if @abort_requested + + download_attempts += 1 + queue_executor.post(object) do |o| + dir_path = File.dirname(o[:path]) + FileUtils.mkdir_p(dir_path) unless dir_path == opts[:destination] || Dir.exist?(dir_path) + + downloader.download(o[:path], bucket: opts[:bucket], key: o[:key]) + progress&.call(File.size(o[:path])) + rescue StandardError => e + errors << e + handle_error(queue_executor, opts) + end + end + rescue StandardError => e + errors << e + handle_error(queue_executor, opts) + end + queue_executor.shutdown + [download_attempts, errors] + end + + # @api private + class ObjectProducer + include Enumerable + + DEFAULT_QUEUE_SIZE = 100 + + def initialize(options = {}) + @destination_dir = options[:destination] + @client = options[:client] + @bucket = options[:bucket] + @s3_prefix = options[:s3_prefix] + @filter_callback = options[:filter_callback] + @directory_downloader = options[:directory_downloader] + @object_queue = SizedQueue.new(DEFAULT_QUEUE_SIZE) + end + + def each + producer_thread = Thread.new do + stream_objects + ensure + @object_queue << :done + end + + # Yield objects from internal queue + while (object = @object_queue.shift) != :done + break if @directory_downloader.abort_requested + + yield object + end + ensure + producer_thread.join + end + + private + + def build_object_entry(key) + { path: File.join(@destination_dir, normalize_key(key)), key: key } + end + + # TODO: double check handling of objects that ends with / + def stream_objects(continuation_token: nil) + resp = @client.list_objects_v2(bucket: @bucket, prefix: @s3_prefix, continuation_token: continuation_token) + resp.contents.each do |o| + break if @directory_downloader.abort_requested + next if o.key.end_with?('/') + next unless include_object?(o.key) + + @object_queue << build_object_entry(o.key) + end + stream_objects(continuation_token: resp.next_continuation_token) if resp.next_continuation_token + end + + def include_object?(key) + return true unless @filter_callback + + @filter_callback.call(key) + end + + def normalize_key(key) + key = key.delete_prefix(@s3_prefix) if @s3_prefix + File::SEPARATOR == '/' ? key : key.tr('/', File::SEPARATOR) + end + end + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_progress.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_progress.rb new file mode 100644 index 00000000000..d2100782977 --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_progress.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +module Aws + module S3 + # @api private + class DirectoryProgress + def initialize(progress_callback) + @transferred_bytes = 0 + @transferred_files = 0 + @progress_callback = progress_callback + @mutex = Mutex.new + end + + def call(bytes_transferred) + @mutex.synchronize do + @transferred_bytes += bytes_transferred + @transferred_files += 1 + + @progress_callback.call(@transferred_bytes, @transferred_files) + end + end + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_upload_error.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_upload_error.rb new file mode 100644 index 00000000000..1818e103805 --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_upload_error.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Aws + module S3 + # Raised when DirectoryUploader fails to upload files to S3 bucket + class DirectoryUploadError < StandardError + def initialize(message, errors = []) + @errors = errors + super(message) + end + + # @return [Array] The list of errors encountered when uploading files + attr_reader :errors + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb new file mode 100644 index 00000000000..5a2006327ba --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb @@ -0,0 +1,233 @@ +# frozen_string_literal: true + +require 'set' + +module Aws + module S3 + # @api private + class DirectoryUploader + def initialize(options = {}) + @client = options[:client] + @executor = options[:executor] + @abort_requested = false + @mutex = Mutex.new + end + + attr_reader :abort_requested + + def upload(source_directory, bucket:, **options) + raise ArgumentError, 'Invalid directory' unless Dir.exist?(source_directory) + + upload_opts = build_upload_opts(source_directory, bucket, options) + uploader = FileUploader.new( + multipart_threshold: options[:multipart_threshold], + client: @client, + executor: @executor + ) + producer = FileProducer.new(upload_opts.merge(client: @client, directory_uploader: self)) + uploads, errors = process_upload_queue(producer, uploader, upload_opts) + build_result(uploads, errors) + ensure + @abort_requested = false + end + + private + + def request_abort + @mutex.synchronize { @abort_requested = true } + end + + def build_upload_opts(source_directory, bucket, opts) + { + source_dir: source_directory, + bucket: bucket, + s3_prefix: opts.delete(:s3_prefix), + recursive: opts.delete(:recursive) || false, + follow_symlinks: opts.delete(:follow_symlinks) || false, + filter_callback: opts.delete(:filter_callback), + ignore_failure: opts.delete(:ignore_failure) || false, + progress_callback: opts.delete(:progress_callback) + } + end + + def build_result(upload_count, errors) + if @abort_requested + msg = "directory upload failed: #{errors.map(&:message).join('; ')}" + raise DirectoryUploadError.new(msg, errors) + else + { + completed_uploads: [upload_count - errors.count, 0].max, + failed_uploads: errors.count, + errors: errors.any? ? errors : nil + }.compact + end + end + + def handle_error(executor, opts) + return if opts[:ignore_failure] + + request_abort + executor.kill + end + + def process_upload_queue(producer, uploader, opts) + # Separate executor for lightweight queuing tasks, + # avoiding interference with main @executor lifecycle + queue_executor = DefaultExecutor.new + progress = DirectoryProgress.new(opts[:progress_callback]) if opts[:progress_callback] + upload_attempts = 0 + errors = [] + begin + producer.each do |file| + break if @abort_requested + + upload_attempts += 1 + if file.is_a?(StandardError) + errors << file + next + end + + queue_executor.post(file) do |f| + uploader.upload(f[:path], bucket: opts[:bucket], key: f[:key]) + progress&.call(File.size(f[:path])) + rescue StandardError => e + errors << e + handle_error(queue_executor, opts) + end + end + rescue StandardError => e + errors << e + handle_error(queue_executor, opts) + end + queue_executor.shutdown + [upload_attempts, errors] + end + + # @api private + class FileProducer + include Enumerable + + DEFAULT_QUEUE_SIZE = 100 + + def initialize(options = {}) + @source_dir = options[:source_dir] + @s3_prefix = options[:s3_prefix] + @recursive = options[:recursive] + @follow_symlinks = options[:follow_symlinks] + @ignore_failure = options[:ignore_failure] + @filter_callback = options[:filter_callback] + @directory_uploader = options[:directory_uploader] + @file_queue = SizedQueue.new(DEFAULT_QUEUE_SIZE) + end + + def each + producer_thread = Thread.new do + if @recursive + find_recursively + else + find_directly + end + ensure + @file_queue << :done + end + + while (file = @file_queue.shift) != :done + break if @directory_uploader.abort_requested + + yield file + end + ensure + producer_thread.join + end + + private + + def build_file_entry(file_path, key) + normalized_key = @s3_prefix ? File.join(@s3_prefix, key) : key + { path: file_path, key: normalized_key } + end + + def find_directly + Dir.each_child(@source_dir) do |entry| + break if @directory_uploader.abort_requested + + entry_path = File.join(@source_dir, entry) + if @follow_symlinks + stat = File.stat(entry_path) + next if stat.directory? + else + stat = File.lstat(entry_path) + next if stat.symlink? || stat.directory? + end + next unless include_file?(entry_path, entry) + + @file_queue << build_file_entry(entry_path, entry) + rescue StandardError => e + raise unless @ignore_failure + + @file_queue << e + end + end + + def find_recursively + if @follow_symlinks + visited = Set.new + visited << File.stat(@source_dir).ino + scan_directory(@source_dir, visited: visited) + else + scan_directory(@source_dir) + end + end + + def include_file?(file_path, file_name) + return true unless @filter_callback + + @filter_callback.call(file_path, file_name) + end + + def scan_directory(dir_path, key_prefix: '', visited: nil) + return if @directory_uploader.abort_requested + + Dir.each_child(dir_path) do |entry| + break if @directory_uploader.abort_requested + + full_path = File.join(dir_path, entry) + next unless include_file?(full_path, entry) + + stat = + if @follow_symlinks + File.stat(full_path) + else + lstat = File.lstat(full_path) + next if lstat.symlink? + + lstat + end + + if stat.directory? + handle_directory(full_path, entry, key_prefix, visited) + else + key = key_prefix.empty? ? entry : File.join(key_prefix, entry) + @file_queue << build_file_entry(full_path, key) + end + rescue StandardError => e + raise unless @ignore_failure + + @file_queue << e + end + end + + def handle_directory(dir_path, dir_name, key_prefix, visited) + if @follow_symlinks && visited + stat = File.stat(dir_path) + return if visited.include?(stat.ino) + + visited << stat.ino + end + new_prefix = key_prefix.empty? ? dir_name : File.join(key_prefix, dir_name) + scan_directory(dir_path, key_prefix: new_prefix, visited: visited) + end + end + end + end +end From ca6c2ae11a988ab6668f7ef8e15c761c3c1c7f22 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Mon, 13 Oct 2025 11:50:33 -0700 Subject: [PATCH 27/29] Add temp changelog entry --- gems/aws-sdk-s3/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gems/aws-sdk-s3/CHANGELOG.md b/gems/aws-sdk-s3/CHANGELOG.md index 20971f2cdd0..412dd527205 100644 --- a/gems/aws-sdk-s3/CHANGELOG.md +++ b/gems/aws-sdk-s3/CHANGELOG.md @@ -1,6 +1,8 @@ Unreleased Changes ------------------ +* Feature - TODO + * Feature - Add lightweight thread pool executor for multipart `download_file`, `upload_file` and `upload_stream`. 1.199.1 (2025-09-25) From c9bf8edda954e3553c6bbbbbcd04946e2a1b4801 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Mon, 13 Oct 2025 12:20:30 -0700 Subject: [PATCH 28/29] Minor updates --- gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb | 2 ++ gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb | 5 ++--- gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb | 5 ++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb index acd81c07545..7ec3b35a35d 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module Aws module S3 # Raised when DirectoryDownloader fails to download objects from S3 bucket diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb index fbeb9afe6bf..4703cd72651 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb @@ -20,7 +20,7 @@ def download(destination, bucket:, **options) FileUtils.mkdir_p(destination) end - download_opts = build_download_opts(destination, bucket, options) + download_opts = build_download_opts(destination, bucket, options.dup) downloader = FileDownloader.new(client: @client, executor: @executor) producer = ObjectProducer.new(download_opts.merge(client: @client, directory_downloader: self)) downloads, errors = process_download_queue(producer, downloader, download_opts) @@ -134,12 +134,11 @@ def build_object_entry(key) { path: File.join(@destination_dir, normalize_key(key)), key: key } end - # TODO: double check handling of objects that ends with / def stream_objects(continuation_token: nil) resp = @client.list_objects_v2(bucket: @bucket, prefix: @s3_prefix, continuation_token: continuation_token) resp.contents.each do |o| break if @directory_downloader.abort_requested - next if o.key.end_with?('/') + next if o.key.end_with?('/') && o.size.zero? next unless include_object?(o.key) @object_queue << build_object_entry(o.key) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb index 5a2006327ba..a2ce5e06cc4 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb @@ -18,7 +18,7 @@ def initialize(options = {}) def upload(source_directory, bucket:, **options) raise ArgumentError, 'Invalid directory' unless Dir.exist?(source_directory) - upload_opts = build_upload_opts(source_directory, bucket, options) + upload_opts = build_upload_opts(source_directory, bucket, options.dup) uploader = FileUploader.new( multipart_threshold: options[:multipart_threshold], client: @client, @@ -71,8 +71,7 @@ def handle_error(executor, opts) end def process_upload_queue(producer, uploader, opts) - # Separate executor for lightweight queuing tasks, - # avoiding interference with main @executor lifecycle + # Separate executor for lightweight queuing tasks, avoiding interference with main @executor lifecycle queue_executor = DefaultExecutor.new progress = DirectoryProgress.new(opts[:progress_callback]) if opts[:progress_callback] upload_attempts = 0 From 5c6caa7e19d277bc15211ea9e77610d2ac6e7f1a Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Mon, 13 Oct 2025 13:36:18 -0700 Subject: [PATCH 29/29] Improve directory uploader --- .../lib/aws-sdk-s3/customizations.rb | 10 ++-- .../lib/aws-sdk-s3/directory_uploader.rb | 58 +++++++++++++------ 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb index 36c75bb3169..ee10b40d8e1 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb @@ -17,11 +17,11 @@ module S3 autoload :MultipartFileUploader, 'aws-sdk-s3/multipart_file_uploader' autoload :MultipartStreamUploader, 'aws-sdk-s3/multipart_stream_uploader' autoload :MultipartUploadError, 'aws-sdk-s3/multipart_upload_error' - autoload :DirectoryProgress, '../aws-sdk-s3/directory_progress' - autoload :DirectoryDownloadError, '../aws-sdk-s3/directory_download_error' - autoload :DirectoryDownloader, '../aws-sdk-s3/directory_downloader' - autoload :DirectoryUploader, '../aws-sdk-s3/directory_uploader' - autoload :DirectoryUploadError, '../aws-sdk-s3/directory_upload_error' + autoload :DirectoryProgress, 'aws-sdk-s3/directory_progress' + autoload :DirectoryDownloadError, 'aws-sdk-s3/directory_download_error' + autoload :DirectoryDownloader, '.aws-sdk-s3/directory_downloader' + autoload :DirectoryUploadError, 'aws-sdk-s3/directory_upload_error' + autoload :DirectoryUploader, 'aws-sdk-s3/directory_uploader' autoload :ObjectCopier, 'aws-sdk-s3/object_copier' autoload :ObjectMultipartCopier, 'aws-sdk-s3/object_multipart_copier' autoload :PresignedPost, 'aws-sdk-s3/presigned_post' diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb index a2ce5e06cc4..9bbfcfef48d 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb @@ -18,13 +18,13 @@ def initialize(options = {}) def upload(source_directory, bucket:, **options) raise ArgumentError, 'Invalid directory' unless Dir.exist?(source_directory) - upload_opts = build_upload_opts(source_directory, bucket, options.dup) + upload_opts, producer_opts = build_opts(source_directory, bucket, options.dup) uploader = FileUploader.new( multipart_threshold: options[:multipart_threshold], client: @client, executor: @executor ) - producer = FileProducer.new(upload_opts.merge(client: @client, directory_uploader: self)) + producer = FileProducer.new(producer_opts) uploads, errors = process_upload_queue(producer, uploader, upload_opts) build_result(uploads, errors) ensure @@ -37,17 +37,21 @@ def request_abort @mutex.synchronize { @abort_requested = true } end - def build_upload_opts(source_directory, bucket, opts) - { + def build_opts(source_directory, bucket, opts) + ignore_failure = opts[:ignore_failure] || false + uploader_opts = { progress_callback: opts[:progress_callback], ignore_failure: ignore_failure } + producer_opts = { + directory_uploader: self, source_dir: source_directory, bucket: bucket, - s3_prefix: opts.delete(:s3_prefix), - recursive: opts.delete(:recursive) || false, - follow_symlinks: opts.delete(:follow_symlinks) || false, - filter_callback: opts.delete(:filter_callback), - ignore_failure: opts.delete(:ignore_failure) || false, - progress_callback: opts.delete(:progress_callback) + s3_prefix: opts[:s3_prefix], + recursive: opts[:recursive] || false, + follow_symlinks: opts[:follow_symlinks] || false, + filter_callback: opts[:filter_callback], + request_callback: opts[:request_callback], + ignore_failure: ignore_failure } + [uploader_opts, producer_opts] end def build_result(upload_count, errors) @@ -87,8 +91,8 @@ def process_upload_queue(producer, uploader, opts) end queue_executor.post(file) do |f| - uploader.upload(f[:path], bucket: opts[:bucket], key: f[:key]) - progress&.call(File.size(f[:path])) + uploader.upload(f.path, f.params) + progress&.call(File.size(f.path)) rescue StandardError => e errors << e handle_error(queue_executor, opts) @@ -109,13 +113,15 @@ class FileProducer DEFAULT_QUEUE_SIZE = 100 def initialize(options = {}) + @directory_uploader = options[:directory_uploader] @source_dir = options[:source_dir] + @bucket = options[:bucket] @s3_prefix = options[:s3_prefix] @recursive = options[:recursive] @follow_symlinks = options[:follow_symlinks] - @ignore_failure = options[:ignore_failure] @filter_callback = options[:filter_callback] - @directory_uploader = options[:directory_uploader] + @request_callback = options[:request_callback] + @ignore_failure = options[:ignore_failure] @file_queue = SizedQueue.new(DEFAULT_QUEUE_SIZE) end @@ -141,9 +147,15 @@ def each private - def build_file_entry(file_path, key) + def build_upload_entry(file_path, key) normalized_key = @s3_prefix ? File.join(@s3_prefix, key) : key - { path: file_path, key: normalized_key } + params = { bucket: @bucket, key: normalized_key } + if @request_callback + callback_params = @request_callback.call(file_path, params.dup) + params = params.merge(callback_params) if callback_params.is_a?(Hash) && callback_params.any? + end + + UploadEntry.new(path: file_path, params: params) end def find_directly @@ -160,7 +172,7 @@ def find_directly end next unless include_file?(entry_path, entry) - @file_queue << build_file_entry(entry_path, entry) + @file_queue << build_upload_entry(entry_path, entry) rescue StandardError => e raise unless @ignore_failure @@ -207,7 +219,7 @@ def scan_directory(dir_path, key_prefix: '', visited: nil) handle_directory(full_path, entry, key_prefix, visited) else key = key_prefix.empty? ? entry : File.join(key_prefix, entry) - @file_queue << build_file_entry(full_path, key) + @file_queue << build_upload_entry(full_path, key) end rescue StandardError => e raise unless @ignore_failure @@ -226,6 +238,16 @@ def handle_directory(dir_path, dir_name, key_prefix, visited) new_prefix = key_prefix.empty? ? dir_name : File.join(key_prefix, dir_name) scan_directory(dir_path, key_prefix: new_prefix, visited: visited) end + + # @api private + class UploadEntry + def initialize(options = {}) + @path = options[:path] + @params = options[:params] + end + + attr_reader :path, :params + end end end end