diff --git a/README.md b/README.md index 9e9937c..4e31d67 100644 --- a/README.md +++ b/README.md @@ -290,12 +290,12 @@ You are now done defining your importer, let's run it! ### Import from a file, path or string -You can import from a file, path or just the CSV content. Please note -that we currently load the entire file in memory. Feel free to -contribute if you need to support CSV files with millions of lines! :) +You can import from a file, path or just the CSV content. To avoid loading +the entire file in memory, enable `batch_load` and optionally provide a +`batch_size` (default is 1000). ```ruby -import = ImportUserCSV.new(file: my_file) +import = ImportUserCSV.new(file: my_file, batch_load: true, batch_size: 2_500) import = ImportUserCSV.new(path: "tmp/new_users.csv") import = ImportUserCSV.new(content: "email,name\nbob@example.com,bob") ``` diff --git a/lib/csv_importer.rb b/lib/csv_importer.rb index 3ccff5d..11d12cf 100644 --- a/lib/csv_importer.rb +++ b/lib/csv_importer.rb @@ -3,6 +3,7 @@ require "csv_importer/version" require "csv_importer/csv_reader" +require "csv_importer/csv_batch_reader" require "csv_importer/column_definition" require "csv_importer/column" require "csv_importer/header" @@ -52,10 +53,12 @@ class Configurator < Struct.new(:config) # .new(file: my_csv_file) # .new(path: "subscribers.csv", model: newsletter.subscribers) # - def initialize(*args, &block) - @csv = CSVReader.new(*args) + def initialize(**kwargs, &block) @config = self.class.config.dup - @config.attributes = args.last + config_kwargs = kwargs.slice(*@config.attributes.keys) + @config.attributes = config_kwargs # Virtus handles this like a merge + reader_kwargs = kwargs.except(*@config.attributes.keys) + @csv = @config.batch_load ? CSVBatchReader.new(**reader_kwargs) : CSVReader.new(**reader_kwargs) @report = Report.new Configurator.new(@config).instance_exec(&block) if block end @@ -94,7 +97,8 @@ def valid_header? def run! if valid_header? @report = Runner.call(rows: rows, when_invalid: config.when_invalid, - after_save_blocks: config.after_save_blocks, report: @report) + after_save_blocks: config.after_save_blocks, report: @report, + batch_load: config.batch_load, batch_size: config.batch_size) else @report end diff --git a/lib/csv_importer/config.rb b/lib/csv_importer/config.rb index 1100b7a..55369ee 100644 --- a/lib/csv_importer/config.rb +++ b/lib/csv_importer/config.rb @@ -9,6 +9,8 @@ class Config attribute :when_invalid, Symbol, default: proc { :skip } attribute :after_build_blocks, Array[Proc], default: [] attribute :after_save_blocks, Array[Proc], default: [] + attribute :batch_load, Boolean, default: false + attribute :batch_size, Integer, default: 1000 def initialize_copy(orig) super diff --git a/lib/csv_importer/csv_batch_reader.rb b/lib/csv_importer/csv_batch_reader.rb new file mode 100644 index 0000000..5b305f5 --- /dev/null +++ b/lib/csv_importer/csv_batch_reader.rb @@ -0,0 +1,134 @@ +require 'csv' +require 'stringio' + +module CSVImporter + + # Reads, sanitize and parse a CSV file + class CSVBatchReader < CSVReader + + attr_reader :separator + + def initialize(**kwargs) + super + # To avoid reprocessing the stream, let's process & cache the first few lines while detecting the separator + @separator = detect_separator(lines.take(10).join("\n")) + end + + # Returns the header as an Array of Strings + def header + return @header if @header + + parsed = CSV.parse_line(lines.first, col_sep: separator, quote_char: quote_char, skip_blanks: true, + external_encoding: source_encoding) + @header = encode_cells([parsed])[0] + end + + # Returns the rows as an Enumerator + def rows + @rows ||= csv_enumerator + end + + private + + def memoized_enumerator(enum, limit = 10) + cache = [] + Enumerator.new do |yielder| + # Yield from cache first + cache.each { |value| yielder << value } + + # Fill cache and yield values from the underlying enumerator + while cache.size < limit + begin + value = enum.next + cache << value + yielder << value + rescue StopIteration + break + end + end + + # Yield the remaining values directly from the original enumerator + loop do + begin + yielder << enum.next + rescue StopIteration + break + end + end + end + end + + def lines + @lines ||= memoized_enumerator(stream_lines(content_stream)) + end + + def stream_lines(stream, chunk_size = 4096, line_endings_regex = /(\r\n|\r|\n)/) + Enumerator.new do |yielder| + case stream + when StringIO, IO + buffer = "".force_encoding(source_encoding) + until stream.eof? + chunk = stream.read(chunk_size) + buffer << chunk.force_encoding(source_encoding).encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences + + while (match = buffer.match(line_endings_regex)) + # Yield the part of the buffer before the line ending + line = sanitize_content(buffer[0...match.begin(0)]) + yielder << line unless line.empty? + # Remove the processed part (including the line ending) + buffer = buffer[match.end(0)..-1] + end + end + + # Yield any remaining content in the buffer after the end of the file + yielder << sanitize_content(buffer) unless buffer.empty? + stream.close if stream.respond_to?(:close) + + when String + File.open(stream, 'r:' + source_encoding) do |file| + stream_lines(file, chunk_size, line_endings_regex).each { |line| yielder << line } + end + + else + raise ArgumentError, "Unsupported stream type: #{stream.class}" + end + end + end + + def csv_enumerator + Enumerator.new do |yielder| + lines.each_with_index do |line, index| + next if index == 0 # skip header + row = CSV.parse_line( + line, + col_sep: separator, + quote_char: quote_char, + skip_blanks: true, + external_encoding: source_encoding + ) + yielder << encode_and_sanitize_row(row) if row + end + end + end + + def content_stream + if content.is_a?(StringIO) + content + elsif content.is_a?(String) + StringIO.new(content) + elsif file + file + elsif path + File.open(path, 'r') + else + raise Error, "Please provide content, file, or path" + end + end + + def encode_and_sanitize_row(row) + row.map do |cell| + cell ? cell.encode(target_encoding).strip : "" + end + end + end +end diff --git a/lib/csv_importer/csv_reader.rb b/lib/csv_importer/csv_reader.rb index c5ece38..f121b73 100644 --- a/lib/csv_importer/csv_reader.rb +++ b/lib/csv_importer/csv_reader.rb @@ -49,12 +49,13 @@ def read_content def sanitize_content(csv_content) csv_content - .encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences - .gsub(/\r\r?\n?/, "\n") # Replaces windows line separators with "\n" + &.encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences + &.gsub(/\r\r?\n?/, "\n") # Replaces windows line separators with "\n" end - SEPARATORS = [",", ";", "\t"] + SEPARATORS = [",", ";", "\t"].freeze + # in a properly formed CSV, amount of separators on each line should be equal. def detect_separator(csv_content) SEPARATORS.min_by do |separator| csv_content.count(separator) diff --git a/lib/csv_importer/runner.rb b/lib/csv_importer/runner.rb index e4a473f..97e0ce3 100644 --- a/lib/csv_importer/runner.rb +++ b/lib/csv_importer/runner.rb @@ -12,6 +12,8 @@ def self.call(*args) attribute :rows, Array[Row] attribute :when_invalid, Symbol attribute :after_save_blocks, Array[Proc], default: [] + attribute :batch_load, Boolean, default: false + attribute :batch_size, Integer, default: 1000 attribute :report, Report, default: proc { Report.new } @@ -42,37 +44,55 @@ def abort_when_invalid? end def persist_rows! + if batch_load + process_in_batches + else + process_all_at_once + end + end + + def process_in_batches transaction do - rows.each do |row| - tags = [] - - if row.model.persisted? - tags << :update - else - tags << :create - end - - if row.skip? - tags << :skip - else - if row.model.save - tags << :success - else - tags << :failure - end - end - - add_to_report(row, tags) - - after_save_blocks.each do |block| - case block.arity - when 0 then block.call - when 1 then block.call(row.model) - when 2 then block.call(row.model, row.csv_attributes) - else - raise ArgumentError, "after_save block of arity #{ block.arity } is not supported" - end - end + rows.each_slice(batch_size) do |batch| + batch.each { |row| process_row(row) } + end + end + end + + def process_all_at_once + transaction do + rows.each { |row| process_row(row) } + end + end + + def process_row(row) + tags = [] + + if row.model.persisted? + tags << :update + else + tags << :create + end + + if row.skip? + tags << :skip + else + if row.model.save + tags << :success + else + tags << :failure + end + end + + add_to_report(row, tags) + + after_save_blocks.each do |block| + case block.arity + when 0 then block.call + when 1 then block.call(row.model) + when 2 then block.call(row.model, row.csv_attributes) + else + raise ArgumentError, "after_save block of arity #{ block.arity } is not supported" end end end diff --git a/spec/csv_importer/csv_batch_reader_spec.rb b/spec/csv_importer/csv_batch_reader_spec.rb new file mode 100644 index 0000000..0e992a5 --- /dev/null +++ b/spec/csv_importer/csv_batch_reader_spec.rb @@ -0,0 +1,193 @@ +require "spec_helper" +require 'stringio' +require 'tempfile' + +class StrictIO < StringIO + def read(length = nil, outbuf = nil) + raise "Attempt to read entire file at once!" if length.nil? + super(length, outbuf) + end + + def readlines(sep = $/, limit = nil) + raise "Attempt to read entire file at once!" if limit.nil? + super(sep, limit) + end +end + +class StrictFileIO < File + def read(length = nil, outbuf = nil) + raise "Attempt to read entire file at once!" if length.nil? + super + end + + def rewind + raise "Attempt to reqind file" + end +end + +module CSVImporter + describe CSVBatchReader do + it "removes invalid byte sequences" do + content = "email,first_name,\xFFlast_name\x81".force_encoding('ASCII-8BIT') + reader = CSVBatchReader.new(content: content) + expect(reader.header).to eq ["email", "first_name", "last_name"] + end + + it "handles windows line separators" do + reader = CSVBatchReader.new(content: "email,first_name,last_name\r\r + mark@example.com,mark,example") + expect(reader.header).to eq ["email", "first_name", "last_name"] + end + + it "supports comma separated csv" do + reader = CSVBatchReader.new(content: "email,first_name,last_name") + expect(reader.header).to eq ["email", "first_name", "last_name"] + end + + it "supports semicolon separated csv" do + reader = CSVBatchReader.new(content: "email;first_name;last_name") + expect(reader.header).to eq ["email", "first_name", "last_name"] + end + + it "supports semicolon separated csv when content has lot of commas" do + reader = CSVBatchReader.new(content: "email;first_name;last_name;letter_ids\n + peter@example.com;Peter;Stone;1,2,3,4,5,6,7,8,9,10,11,12,13,14") + expect(reader.header).to eq ["email", "first_name", "last_name", "letter_ids"] + end + + it "supports tab separated csv" do + reader = CSVBatchReader.new(content: "email\tfirst_name\tlast_name") + expect(reader.header).to eq ["email", "first_name", "last_name"] + end + + it "supports custom quote character" do + reader = CSVBatchReader.new(content: "first_name,last_name\n'bob','the builder'", quote_char: "'") + expect(reader.rows.to_a).to eq [["bob", "the builder"]] + end + + it "supports custom encoding" do + reader = CSVBatchReader.new(content: "メール,氏名".encode('SJIS'), encoding: 'SJIS:UTF-8') + expect(reader.header).to eq ["メール", "氏名"] + end + + context "with stream batch processing" do + let(:csv_content) do + StrictIO.new( + "email,first_name,last_name\n" + + "john@example.com,John,Doe\r" + # old Mac format + "jane@example.com,Jane,Doe\r\n" + # Windows format + "bob@example.com,Bob,Smith" + ) + end + + let(:reader) { CSVBatchReader.new(content: csv_content) } + + it "returns header correctly" do + expect(reader.header).to eq ["email", "first_name", "last_name"] + end + + it "returns rows as an enumerator" do + expect(reader.rows).to be_a(Enumerator) + end + + it "yields rows in batches" do + rows = reader.rows.to_a + expect(rows.size).to eq 3 + expect(rows[0]).to eq ["john@example.com", "John", "Doe"] + expect(rows[1]).to eq ["jane@example.com", "Jane", "Doe"] + expect(rows[2]).to eq ["bob@example.com", "Bob", "Smith"] + end + + it "batching mechanism works correctly" do + expect(reader.rows).to be_a(Enumerator) + + reader.rows.each do |row| + expect(row).to eq ["john@example.com", "John", "Doe"] + break # We don't need to process all rows for this test + end + end + + it "processes large files in chunks without loading everything into memory" do + chunk_size = 1000 + total_rows = 2000 + csv_header = "email,first_name,last_name\n" + csv_row = "john@example.com,John,Doe\n" + + # Create a StringIO object that simulates a large CSV file + csv_content = StringIO.new.tap do |io| + io.puts csv_header + total_rows.times { io.puts csv_row } + io.rewind + end + + large_reader = CSVBatchReader.new(file: csv_content) + + expect(large_reader.header).to eq ["email", "first_name", "last_name"] + expect(large_reader.rows).to be_a(Enumerator) + + rows_processed = 0 + max_rows_per_iteration = 0 + + large_reader.rows.each_slice(chunk_size) do |batch| + rows_in_this_batch = batch.size + max_rows_per_iteration = [max_rows_per_iteration, rows_in_this_batch].max + rows_processed += rows_in_this_batch + end + + expect(rows_processed).to eq total_rows + expect(max_rows_per_iteration).to eq chunk_size + end + + it "doesn't load the whole file when reading the header" do + header = "email,first_name,last_name\n" + content = header + ("john@example.com,John,Doe\n" * 100) + strict_io = StrictIO.new(content) + + reader = CSVBatchReader.new(file: strict_io) + + expect { reader.header }.not_to raise_error + expect(reader.header).to eq ["email", "first_name", "last_name"] + end + end + + context "with file stream processing" do + it "parses a temporary file without loading entire content into memory" do + # Create a temporary file with CSV content + temp_file = Tempfile.new(['test_csv', '.csv']) + begin + # Write CSV content to the file + temp_file.write("email,first_name,last_name\n") + 100.times { temp_file.write("user#{_1}@example.com,User,#{_1}\n") } + temp_file.close + + strict_file = StrictFileIO.new(temp_file.path, 'r') + + # Create a CSVBatchReader instance with the strict IO object + reader = CSVBatchReader.new(file: strict_file) + + # Check header + expect(reader.header).to eq ["email", "first_name", "last_name"] + + # Process rows in batches + rows_processed = 0 + reader.rows.each_slice(10) do |batch| + rows_processed += batch.size + # Ensure each row has the correct format + expect(batch.first).to match( + [/user\d+@example.com/, "User", /\d+/] + ) + end + + # Verify all rows were processed + expect(rows_processed).to eq 100 + + # Ensure the file wasn't fully loaded into memory + expect(reader.instance_variable_get(:@content)).to be_nil + expect(reader.instance_variable_get(:@file)).to be_a(File) + ensure + temp_file.unlink + end + end + end + end +end diff --git a/spec/csv_importer_spec.rb b/spec/csv_importer_spec.rb index e2b610d..8e97e35 100644 --- a/spec/csv_importer_spec.rb +++ b/spec/csv_importer_spec.rb @@ -149,16 +149,16 @@ class ImportUserCSVByFirstName ) end - it "records the correct line number for each row" do - csv_content = "email,confirmed,first_name,last_name + it "records the correct line number for each row" do + csv_content = "email,confirmed,first_name,last_name BOB@example.com,true,bob,," - import = ImportUserCSV.new(content: csv_content) - import.run! + import = ImportUserCSV.new(content: csv_content) + import.run! - expect(import.report.valid_rows.size).to eq(1) - expect(import.report.created_rows.size).to eq(1) - expect(import.report.created_rows.first.line_number).to eq(2) - end + expect(import.report.valid_rows.size).to eq(1) + expect(import.report.created_rows.size).to eq(1) + expect(import.report.created_rows.first.line_number).to eq(2) + end end describe "invalid records" do @@ -672,4 +672,12 @@ class ImportUserCSVByFirstName expect(import.report.message).to eq "Import completed: 1 updated, 1 create skipped" end end # describe "skipping" + + it "loads a batch reader when batch_load is true" do + csv_content = "email,confirmed,first_name,last_name +bob@example.com,true,bob,, +mark@example.com,false,mark,new_last_name" + import = ImportUserCSV.new(content: csv_content, batch_load: true) + expect(import.csv).to be_a(CSVImporter::CSVBatchReader) + end end