Skip to content

Commit d37ade1

Browse files
author
Ziad Sawalha
committed
Add batch support
1 parent 517c3f8 commit d37ade1

File tree

8 files changed

+411
-49
lines changed

8 files changed

+411
-49
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,12 +290,12 @@ You are now done defining your importer, let's run it!
290290

291291
### Import from a file, path or string
292292

293-
You can import from a file, path or just the CSV content. Please note
294-
that we currently load the entire file in memory. Feel free to
295-
contribute if you need to support CSV files with millions of lines! :)
293+
You can import from a file, path or just the CSV content. To avoid loading
294+
the entire file in memory, enable `batch_load` and optionally provide a
295+
`batch_size` (default is 1000).
296296

297297
```ruby
298-
import = ImportUserCSV.new(file: my_file)
298+
import = ImportUserCSV.new(file: my_file, batch_load: true, batch_size: 2_500)
299299
import = ImportUserCSV.new(path: "tmp/new_users.csv")
300300
import = ImportUserCSV.new(content: "email,name\nbob@example.com,bob")
301301
```

lib/csv_importer.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
require "csv_importer/version"
55
require "csv_importer/csv_reader"
6+
require "csv_importer/csv_batch_reader"
67
require "csv_importer/column_definition"
78
require "csv_importer/column"
89
require "csv_importer/header"
@@ -52,10 +53,12 @@ class Configurator < Struct.new(:config)
5253
# .new(file: my_csv_file)
5354
# .new(path: "subscribers.csv", model: newsletter.subscribers)
5455
#
55-
def initialize(*args, &block)
56-
@csv = CSVReader.new(*args)
56+
def initialize(**kwargs, &block)
5757
@config = self.class.config.dup
58-
@config.attributes = args.last
58+
config_kwargs = kwargs.slice(*@config.attributes.keys)
59+
@config.attributes = config_kwargs # Virtus handles this like a merge
60+
reader_kwargs = kwargs.except(*@config.attributes.keys)
61+
@csv = @config.batch_load ? CSVBatchReader.new(**reader_kwargs) : CSVReader.new(**reader_kwargs)
5962
@report = Report.new
6063
Configurator.new(@config).instance_exec(&block) if block
6164
end
@@ -94,7 +97,8 @@ def valid_header?
9497
def run!
9598
if valid_header?
9699
@report = Runner.call(rows: rows, when_invalid: config.when_invalid,
97-
after_save_blocks: config.after_save_blocks, report: @report)
100+
after_save_blocks: config.after_save_blocks, report: @report,
101+
batch_load: config.batch_load, batch_size: config.batch_size)
98102
else
99103
@report
100104
end

lib/csv_importer/config.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ class Config
99
attribute :when_invalid, Symbol, default: proc { :skip }
1010
attribute :after_build_blocks, Array[Proc], default: []
1111
attribute :after_save_blocks, Array[Proc], default: []
12+
attribute :batch_load, Boolean, default: false
13+
attribute :batch_size, Integer, default: 1000
1214

1315
def initialize_copy(orig)
1416
super
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
require 'csv'
2+
require 'stringio'
3+
4+
module CSVImporter
5+
6+
# Reads, sanitize and parse a CSV file
7+
class CSVBatchReader < CSVReader
8+
9+
attr_reader :separator
10+
11+
def initialize(**kwargs)
12+
super
13+
# To avoid reprocessing the stream, let's process & cache the first few lines while detecting the separator
14+
@separator = detect_separator(lines.take(10).join("\n"))
15+
end
16+
17+
# Returns the header as an Array of Strings
18+
def header
19+
return @header if @header
20+
21+
parsed = CSV.parse_line(lines.first, col_sep: separator, quote_char: quote_char, skip_blanks: true,
22+
external_encoding: source_encoding)
23+
@header = encode_cells([parsed])[0]
24+
end
25+
26+
# Returns the rows as an Enumerator
27+
def rows
28+
@rows ||= csv_enumerator
29+
end
30+
31+
private
32+
33+
def memoized_enumerator(enum, limit = 10)
34+
cache = []
35+
Enumerator.new do |yielder|
36+
# Yield from cache first
37+
cache.each { |value| yielder << value }
38+
39+
# Fill cache and yield values from the underlying enumerator
40+
while cache.size < limit
41+
begin
42+
value = enum.next
43+
cache << value
44+
yielder << value
45+
rescue StopIteration
46+
break
47+
end
48+
end
49+
50+
# Yield the remaining values directly from the original enumerator
51+
loop do
52+
begin
53+
yielder << enum.next
54+
rescue StopIteration
55+
break
56+
end
57+
end
58+
end
59+
end
60+
61+
def lines
62+
@lines ||= memoized_enumerator(stream_lines(content_stream))
63+
end
64+
65+
def stream_lines(stream, chunk_size = 4096, line_endings_regex = /(\r\n|\r|\n)/)
66+
Enumerator.new do |yielder|
67+
case stream
68+
when StringIO, IO
69+
buffer = "".force_encoding(source_encoding)
70+
until stream.eof?
71+
chunk = stream.read(chunk_size)
72+
buffer << chunk.force_encoding(source_encoding).encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences
73+
74+
while (match = buffer.match(line_endings_regex))
75+
# Yield the part of the buffer before the line ending
76+
line = sanitize_content(buffer[0...match.begin(0)])
77+
yielder << line unless line.empty?
78+
# Remove the processed part (including the line ending)
79+
buffer = buffer[match.end(0)..-1]
80+
end
81+
end
82+
83+
# Yield any remaining content in the buffer after the end of the file
84+
yielder << sanitize_content(buffer) unless buffer.empty?
85+
stream.close if stream.respond_to?(:close)
86+
87+
when String
88+
File.open(stream, 'r:' + source_encoding) do |file|
89+
stream_lines(file, chunk_size, line_endings_regex).each { |line| yielder << line }
90+
end
91+
92+
else
93+
raise ArgumentError, "Unsupported stream type: #{stream.class}"
94+
end
95+
end
96+
end
97+
98+
def csv_enumerator
99+
Enumerator.new do |yielder|
100+
lines.each_with_index do |line, index|
101+
next if index == 0 # skip header
102+
row = CSV.parse_line(
103+
line,
104+
col_sep: separator,
105+
quote_char: quote_char,
106+
skip_blanks: true,
107+
external_encoding: source_encoding
108+
)
109+
yielder << encode_and_sanitize_row(row) if row
110+
end
111+
end
112+
end
113+
114+
def content_stream
115+
if content.is_a?(StringIO)
116+
content
117+
elsif content.is_a?(String)
118+
StringIO.new(content)
119+
elsif file
120+
file
121+
elsif path
122+
File.open(path, 'r')
123+
else
124+
raise Error, "Please provide content, file, or path"
125+
end
126+
end
127+
128+
def encode_and_sanitize_row(row)
129+
row.map do |cell|
130+
cell ? cell.encode(target_encoding).strip : ""
131+
end
132+
end
133+
end
134+
end

lib/csv_importer/csv_reader.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,13 @@ def read_content
4949

5050
def sanitize_content(csv_content)
5151
csv_content
52-
.encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences
53-
.gsub(/\r\r?\n?/, "\n") # Replaces windows line separators with "\n"
52+
&.encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences
53+
&.gsub(/\r\r?\n?/, "\n") # Replaces windows line separators with "\n"
5454
end
5555

56-
SEPARATORS = [",", ";", "\t"]
56+
SEPARATORS = [",", ";", "\t"].freeze
5757

58+
# in a properly formed CSV, amount of separators on each line should be equal.
5859
def detect_separator(csv_content)
5960
SEPARATORS.min_by do |separator|
6061
csv_content.count(separator)

lib/csv_importer/runner.rb

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ def self.call(*args)
1212
attribute :rows, Array[Row]
1313
attribute :when_invalid, Symbol
1414
attribute :after_save_blocks, Array[Proc], default: []
15+
attribute :batch_load, Boolean, default: false
16+
attribute :batch_size, Integer, default: 1000
1517

1618
attribute :report, Report, default: proc { Report.new }
1719

@@ -42,37 +44,55 @@ def abort_when_invalid?
4244
end
4345

4446
def persist_rows!
47+
if batch_load
48+
process_in_batches
49+
else
50+
process_all_at_once
51+
end
52+
end
53+
54+
def process_in_batches
4555
transaction do
46-
rows.each do |row|
47-
tags = []
48-
49-
if row.model.persisted?
50-
tags << :update
51-
else
52-
tags << :create
53-
end
54-
55-
if row.skip?
56-
tags << :skip
57-
else
58-
if row.model.save
59-
tags << :success
60-
else
61-
tags << :failure
62-
end
63-
end
64-
65-
add_to_report(row, tags)
66-
67-
after_save_blocks.each do |block|
68-
case block.arity
69-
when 0 then block.call
70-
when 1 then block.call(row.model)
71-
when 2 then block.call(row.model, row.csv_attributes)
72-
else
73-
raise ArgumentError, "after_save block of arity #{ block.arity } is not supported"
74-
end
75-
end
56+
rows.each_slice(batch_size) do |batch|
57+
batch.each { |row| process_row(row) }
58+
end
59+
end
60+
end
61+
62+
def process_all_at_once
63+
transaction do
64+
rows.each { |row| process_row(row) }
65+
end
66+
end
67+
68+
def process_row(row)
69+
tags = []
70+
71+
if row.model.persisted?
72+
tags << :update
73+
else
74+
tags << :create
75+
end
76+
77+
if row.skip?
78+
tags << :skip
79+
else
80+
if row.model.save
81+
tags << :success
82+
else
83+
tags << :failure
84+
end
85+
end
86+
87+
add_to_report(row, tags)
88+
89+
after_save_blocks.each do |block|
90+
case block.arity
91+
when 0 then block.call
92+
when 1 then block.call(row.model)
93+
when 2 then block.call(row.model, row.csv_attributes)
94+
else
95+
raise ArgumentError, "after_save block of arity #{ block.arity } is not supported"
7696
end
7797
end
7898
end

0 commit comments

Comments
 (0)