Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,12 @@ You are now done defining your importer, let's run it!

### Import from a file, path or string

You can import from a file, path or just the CSV content. Please note
that we currently load the entire file in memory. Feel free to
contribute if you need to support CSV files with millions of lines! :)
You can import from a file, path or just the CSV content. To avoid loading
the entire file in memory, enable `batch_load` and optionally provide a
`batch_size` (default is 1000).

```ruby
import = ImportUserCSV.new(file: my_file)
import = ImportUserCSV.new(file: my_file, batch_load: true, batch_size: 2_500)
import = ImportUserCSV.new(path: "tmp/new_users.csv")
import = ImportUserCSV.new(content: "email,name\nbob@example.com,bob")
```
Expand Down
12 changes: 8 additions & 4 deletions lib/csv_importer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

require "csv_importer/version"
require "csv_importer/csv_reader"
require "csv_importer/csv_batch_reader"
require "csv_importer/column_definition"
require "csv_importer/column"
require "csv_importer/header"
Expand Down Expand Up @@ -52,10 +53,12 @@ class Configurator < Struct.new(:config)
# .new(file: my_csv_file)
# .new(path: "subscribers.csv", model: newsletter.subscribers)
#
def initialize(*args, &block)
@csv = CSVReader.new(*args)
def initialize(**kwargs, &block)
@config = self.class.config.dup
@config.attributes = args.last
config_kwargs = kwargs.slice(*@config.attributes.keys)
@config.attributes = config_kwargs # Virtus handles this like a merge
reader_kwargs = kwargs.except(*@config.attributes.keys)
@csv = @config.batch_load ? CSVBatchReader.new(**reader_kwargs) : CSVReader.new(**reader_kwargs)
@report = Report.new
Configurator.new(@config).instance_exec(&block) if block
end
Expand Down Expand Up @@ -94,7 +97,8 @@ def valid_header?
def run!
if valid_header?
@report = Runner.call(rows: rows, when_invalid: config.when_invalid,
after_save_blocks: config.after_save_blocks, report: @report)
after_save_blocks: config.after_save_blocks, report: @report,
batch_load: config.batch_load, batch_size: config.batch_size)
else
@report
end
Expand Down
2 changes: 2 additions & 0 deletions lib/csv_importer/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class Config
attribute :when_invalid, Symbol, default: proc { :skip }
attribute :after_build_blocks, Array[Proc], default: []
attribute :after_save_blocks, Array[Proc], default: []
attribute :batch_load, Boolean, default: false
attribute :batch_size, Integer, default: 1000

def initialize_copy(orig)
super
Expand Down
134 changes: 134 additions & 0 deletions lib/csv_importer/csv_batch_reader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
require 'csv'
require 'stringio'

module CSVImporter

# Reads, sanitize and parse a CSV file
class CSVBatchReader < CSVReader

attr_reader :separator

def initialize(**kwargs)
super
# To avoid reprocessing the stream, let's process & cache the first few lines while detecting the separator
@separator = detect_separator(lines.take(10).join("\n"))
end

# Returns the header as an Array of Strings
def header
return @header if @header

parsed = CSV.parse_line(lines.first, col_sep: separator, quote_char: quote_char, skip_blanks: true,
external_encoding: source_encoding)
@header = encode_cells([parsed])[0]
end

# Returns the rows as an Enumerator
def rows
@rows ||= csv_enumerator
end

private

def memoized_enumerator(enum, limit = 10)
cache = []
Enumerator.new do |yielder|
# Yield from cache first
cache.each { |value| yielder << value }

# Fill cache and yield values from the underlying enumerator
while cache.size < limit
begin
value = enum.next
cache << value
yielder << value
rescue StopIteration
break
end
end

# Yield the remaining values directly from the original enumerator
loop do
begin
yielder << enum.next
rescue StopIteration
break
end
end
end
end

def lines
@lines ||= memoized_enumerator(stream_lines(content_stream))
end

def stream_lines(stream, chunk_size = 4096, line_endings_regex = /(\r\n|\r|\n)/)
Enumerator.new do |yielder|
case stream
when StringIO, IO
buffer = "".force_encoding(source_encoding)
until stream.eof?
chunk = stream.read(chunk_size)
buffer << chunk.force_encoding(source_encoding).encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences

while (match = buffer.match(line_endings_regex))
# Yield the part of the buffer before the line ending
line = sanitize_content(buffer[0...match.begin(0)])
yielder << line unless line.empty?
# Remove the processed part (including the line ending)
buffer = buffer[match.end(0)..-1]
end
end

# Yield any remaining content in the buffer after the end of the file
yielder << sanitize_content(buffer) unless buffer.empty?
stream.close if stream.respond_to?(:close)

when String
File.open(stream, 'r:' + source_encoding) do |file|
stream_lines(file, chunk_size, line_endings_regex).each { |line| yielder << line }
end

else
raise ArgumentError, "Unsupported stream type: #{stream.class}"
end
end
end

def csv_enumerator
Enumerator.new do |yielder|
lines.each_with_index do |line, index|
next if index == 0 # skip header
row = CSV.parse_line(
line,
col_sep: separator,
quote_char: quote_char,
skip_blanks: true,
external_encoding: source_encoding
)
yielder << encode_and_sanitize_row(row) if row
end
end
end

def content_stream
if content.is_a?(StringIO)
content
elsif content.is_a?(String)
StringIO.new(content)
elsif file
file
elsif path
File.open(path, 'r')
else
raise Error, "Please provide content, file, or path"
end
end

def encode_and_sanitize_row(row)
row.map do |cell|
cell ? cell.encode(target_encoding).strip : ""
end
end
end
end
7 changes: 4 additions & 3 deletions lib/csv_importer/csv_reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ def read_content

def sanitize_content(csv_content)
csv_content
.encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences
.gsub(/\r\r?\n?/, "\n") # Replaces windows line separators with "\n"
&.encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences
&.gsub(/\r\r?\n?/, "\n") # Replaces windows line separators with "\n"
end

SEPARATORS = [",", ";", "\t"]
SEPARATORS = [",", ";", "\t"].freeze

# in a properly formed CSV, amount of separators on each line should be equal.
def detect_separator(csv_content)
SEPARATORS.min_by do |separator|
csv_content.count(separator)
Expand Down
80 changes: 50 additions & 30 deletions lib/csv_importer/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ def self.call(*args)
attribute :rows, Array[Row]
attribute :when_invalid, Symbol
attribute :after_save_blocks, Array[Proc], default: []
attribute :batch_load, Boolean, default: false
attribute :batch_size, Integer, default: 1000

attribute :report, Report, default: proc { Report.new }

Expand Down Expand Up @@ -42,37 +44,55 @@ def abort_when_invalid?
end

def persist_rows!
if batch_load
process_in_batches
else
process_all_at_once
end
end

def process_in_batches
transaction do
rows.each do |row|
tags = []

if row.model.persisted?
tags << :update
else
tags << :create
end

if row.skip?
tags << :skip
else
if row.model.save
tags << :success
else
tags << :failure
end
end

add_to_report(row, tags)

after_save_blocks.each do |block|
case block.arity
when 0 then block.call
when 1 then block.call(row.model)
when 2 then block.call(row.model, row.csv_attributes)
else
raise ArgumentError, "after_save block of arity #{ block.arity } is not supported"
end
end
rows.each_slice(batch_size) do |batch|
batch.each { |row| process_row(row) }
end
end
end

def process_all_at_once
transaction do
rows.each { |row| process_row(row) }
end
end

def process_row(row)
tags = []

if row.model.persisted?
tags << :update
else
tags << :create
end

if row.skip?
tags << :skip
else
if row.model.save
tags << :success
else
tags << :failure
end
end

add_to_report(row, tags)

after_save_blocks.each do |block|
case block.arity
when 0 then block.call
when 1 then block.call(row.model)
when 2 then block.call(row.model, row.csv_attributes)
else
raise ArgumentError, "after_save block of arity #{ block.arity } is not supported"
end
end
end
Expand Down
Loading