Skip to content

Commit 033f9c6

Browse files
author
Ziad Sawalha
committed
Add batch support
1 parent 517c3f8 commit 033f9c6

File tree

4 files changed

+250
-37
lines changed

4 files changed

+250
-37
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,12 +290,12 @@ You are now done defining your importer, let's run it!
290290

291291
### Import from a file, path or string
292292

293-
You can import from a file, path or just the CSV content. Please note
294-
that we currently load the entire file in memory. Feel free to
295-
contribute if you need to support CSV files with millions of lines! :)
293+
You can import from a file, path or just the CSV content. To avoid loading
294+
the entire file in memory, enable `use_batches` and optionally provide a
295+
`batch_size` (default is 1000).
296296

297297
```ruby
298-
import = ImportUserCSV.new(file: my_file)
298+
import = ImportUserCSV.new(file: my_file, use_batches: true, batch_size: 2_500)
299299
import = ImportUserCSV.new(path: "tmp/new_users.csv")
300300
import = ImportUserCSV.new(content: "email,name\nbob@example.com,bob")
301301
```

lib/csv_importer/csv_reader.rb

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,15 @@ class CSVReader
99
attribute :path, String
1010
attribute :quote_char, String, default: '"'
1111
attribute :encoding, String, default: 'UTF-8:UTF-8'
12+
attribute :batch_size, Integer, default: 1000
13+
attribute :use_batches, Boolean, default: false
14+
15+
def initialize(*args)
16+
super
17+
@header = nil
18+
@rows = nil
19+
@csv_enumerator = nil
20+
end
1221

1322
def csv_rows
1423
@csv_rows ||= begin
@@ -25,12 +34,23 @@ def csv_rows
2534

2635
# Returns the header as an Array of Strings
2736
def header
28-
@header ||= csv_rows.first
37+
return @header if @header
38+
39+
if use_batches
40+
io = file_or_content_io
41+
separator = detect_separator(io.read(1024))
42+
io.rewind
43+
@header = CSV.parse_line(io.readline, col_sep: separator, quote_char: quote_char)
44+
else
45+
@header = csv_rows.first
46+
end
2947
end
3048

3149
# Returns the rows as an Array of Arrays of Strings
3250
def rows
33-
@rows ||= csv_rows[1..-1]
51+
return @rows ||= csv_rows[1..-1] unless use_batches
52+
53+
@csv_enumerator ||= create_csv_enumerator
3454
end
3555

3656
private
@@ -47,13 +67,52 @@ def read_content
4767
end
4868
end
4969

70+
def create_csv_enumerator
71+
Enumerator.new do |yielder|
72+
io = file_or_content_io
73+
separator = detect_separator(io.read(1024))
74+
io.rewind
75+
76+
CSV.new(
77+
io,
78+
col_sep: separator,
79+
quote_char: quote_char,
80+
skip_blanks: true,
81+
external_encoding: source_encoding
82+
).each_with_index do |row, index|
83+
next if index.zero? # Skip header
84+
yielder << encode_and_sanitize_row(row)
85+
end
86+
end
87+
end
88+
89+
def sanitize_content_stream(io)
90+
Enumerator.new do |yielder|
91+
io.each_line do |line|
92+
yielder << sanitize_content(line)
93+
end
94+
end
95+
end
96+
97+
def file_or_content_io
98+
if content.is_a?(String)
99+
StringIO.new(content)
100+
elsif file
101+
file
102+
elsif path
103+
File.open(path, 'r')
104+
else
105+
raise Error, "Please provide content, file, or path"
106+
end
107+
end
108+
50109
def sanitize_content(csv_content)
51110
csv_content
52111
.encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences
53112
.gsub(/\r\r?\n?/, "\n") # Replaces windows line separators with "\n"
54113
end
55114

56-
SEPARATORS = [",", ";", "\t"]
115+
SEPARATORS = [",", ";", "\t"].freeze
57116

58117
def detect_separator(csv_content)
59118
SEPARATORS.min_by do |separator|
@@ -87,6 +146,12 @@ def encode_cells(rows)
87146
end
88147
end
89148

149+
def encode_and_sanitize_row(row)
150+
row.map do |cell|
151+
cell ? cell.encode(target_encoding).strip : ""
152+
end
153+
end
154+
90155
def source_encoding
91156
encoding.split(':').first || 'UTF-8'
92157
end

lib/csv_importer/runner.rb

Lines changed: 63 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,26 @@ def self.call(*args)
1212
attribute :rows, Array[Row]
1313
attribute :when_invalid, Symbol
1414
attribute :after_save_blocks, Array[Proc], default: []
15+
attribute :batch_size, Integer, default: 1000
16+
attribute :use_batches, Boolean, default: false
1517

1618
attribute :report, Report, default: proc { Report.new }
1719

1820
ImportAborted = Class.new(StandardError)
1921

22+
def initialize(attributes = {})
23+
super
24+
@csv_reader = CSVReader.new(
25+
content: attributes[:content],
26+
file: attributes[:file],
27+
path: attributes[:path],
28+
quote_char: attributes[:quote_char],
29+
encoding: attributes[:encoding],
30+
batch_size: batch_size,
31+
use_batches: use_batches
32+
)
33+
end
34+
2035
# Persist the rows' model and return a `Report`
2136
def call
2237
if rows.empty?
@@ -42,37 +57,55 @@ def abort_when_invalid?
4257
end
4358

4459
def persist_rows!
60+
if use_batches
61+
process_in_batches
62+
else
63+
process_all_at_once
64+
end
65+
end
66+
67+
def process_in_batches
4568
transaction do
46-
rows.each do |row|
47-
tags = []
48-
49-
if row.model.persisted?
50-
tags << :update
51-
else
52-
tags << :create
53-
end
54-
55-
if row.skip?
56-
tags << :skip
57-
else
58-
if row.model.save
59-
tags << :success
60-
else
61-
tags << :failure
62-
end
63-
end
64-
65-
add_to_report(row, tags)
66-
67-
after_save_blocks.each do |block|
68-
case block.arity
69-
when 0 then block.call
70-
when 1 then block.call(row.model)
71-
when 2 then block.call(row.model, row.csv_attributes)
72-
else
73-
raise ArgumentError, "after_save block of arity #{ block.arity } is not supported"
74-
end
75-
end
69+
rows.each_slice(batch_size) do |batch|
70+
batch.each { |row| process_row(row) }
71+
end
72+
end
73+
end
74+
75+
def process_all_at_once
76+
transaction do
77+
rows.each { |row| process_row(row) }
78+
end
79+
end
80+
81+
def process_row(row)
82+
tags = []
83+
84+
if row.model.persisted?
85+
tags << :update
86+
else
87+
tags << :create
88+
end
89+
90+
if row.skip?
91+
tags << :skip
92+
else
93+
if row.model.save
94+
tags << :success
95+
else
96+
tags << :failure
97+
end
98+
end
99+
100+
add_to_report(row, tags)
101+
102+
after_save_blocks.each do |block|
103+
case block.arity
104+
when 0 then block.call
105+
when 1 then block.call(row.model)
106+
when 2 then block.call(row.model, row.csv_attributes)
107+
else
108+
raise ArgumentError, "after_save block of arity #{ block.arity } is not supported"
76109
end
77110
end
78111
end

spec/csv_importer/csv_reader_spec.rb

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require "spec_helper"
2+
require 'stringio'
23

34
module CSVImporter
45
describe CSVReader do
@@ -44,5 +45,119 @@ module CSVImporter
4445
reader = CSVReader.new(content: "メール,氏名".encode('SJIS'), encoding: 'SJIS:UTF-8')
4546
expect(reader.header).to eq ["メール", "氏名"]
4647
end
48+
49+
context "with batch processing" do
50+
let(:csv_content) do
51+
"email,first_name,last_name\n" +
52+
"john@example.com,John,Doe\n" +
53+
"jane@example.com,Jane,Doe\n" +
54+
"bob@example.com,Bob,Smith"
55+
end
56+
57+
let(:reader) { CSVReader.new(content: csv_content, use_batches: true, batch_size: 2) }
58+
59+
it "returns header correctly" do
60+
expect(reader.header).to eq ["email", "first_name", "last_name"]
61+
end
62+
63+
it "returns rows as an enumerator" do
64+
expect(reader.rows).to be_a(Enumerator)
65+
end
66+
67+
it "yields rows in batches" do
68+
rows = reader.rows.to_a
69+
expect(rows.size).to eq 3
70+
expect(rows[0]).to eq ["john@example.com", "John", "Doe"]
71+
expect(rows[1]).to eq ["jane@example.com", "Jane", "Doe"]
72+
expect(rows[2]).to eq ["bob@example.com", "Bob", "Smith"]
73+
end
74+
75+
it "batching mechanism works correctly" do
76+
large_csv_content = "email,first_name,last_name\n" + ("john@example.com,John,Doe\n" * 1_000)
77+
large_reader = CSVReader.new(content: large_csv_content, use_batches: true, batch_size: 100)
78+
79+
expect(large_reader.header).to eq ["email", "first_name", "last_name"]
80+
expect(large_reader.rows).to be_a(Enumerator)
81+
82+
row_count = 0
83+
large_reader.rows.each do |row|
84+
expect(row).to eq ["john@example.com", "John", "Doe"]
85+
row_count += 1
86+
break if row_count >= 100 # We don't need to process all rows for this test
87+
end
88+
89+
expect(row_count).to eq 100
90+
end
91+
92+
it "processes large files in chunks without loading everything into memory" do
93+
chunk_size = 1000
94+
total_rows = 2000
95+
csv_header = "email,first_name,last_name\n"
96+
csv_row = "john@example.com,John,Doe\n"
97+
98+
# Create a StringIO object that simulates a large CSV file
99+
csv_content = StringIO.new.tap do |io|
100+
io.puts csv_header
101+
total_rows.times { io.puts csv_row }
102+
io.rewind
103+
end
104+
105+
large_reader = CSVReader.new(file: csv_content, use_batches: true, batch_size: chunk_size)
106+
107+
expect(large_reader.header).to eq ["email", "first_name", "last_name"]
108+
expect(large_reader.rows).to be_a(Enumerator)
109+
110+
rows_processed = 0
111+
max_rows_per_iteration = 0
112+
113+
large_reader.rows.each_slice(chunk_size) do |batch|
114+
rows_in_this_batch = batch.size
115+
max_rows_per_iteration = [max_rows_per_iteration, rows_in_this_batch].max
116+
rows_processed += rows_in_this_batch
117+
end
118+
119+
expect(rows_processed).to eq total_rows
120+
expect(max_rows_per_iteration).to eq chunk_size
121+
end
122+
123+
it "doesn't load the whole file when reading the header" do
124+
class StrictIOSimulator < StringIO
125+
def initialize(header, content)
126+
super(content)
127+
@header = header
128+
@read_count = 0
129+
end
130+
131+
def gets
132+
@read_count += 1
133+
raise "Attempted to read beyond header!" if @read_count > 1
134+
@header
135+
end
136+
137+
def readline
138+
gets
139+
end
140+
141+
def read(bytes = nil)
142+
raise "Attempted to read content!" if @read_count > 0
143+
super
144+
end
145+
146+
def rewind
147+
super
148+
@read_count = 0
149+
end
150+
end
151+
152+
header = "email,first_name,last_name\n"
153+
content = header + ("john@example.com,John,Doe\n" * 100)
154+
strict_io = StrictIOSimulator.new(header, content)
155+
156+
reader = CSVReader.new(file: strict_io, use_batches: true)
157+
158+
expect { reader.header }.not_to raise_error
159+
expect(reader.header).to eq ["email", "first_name", "last_name"]
160+
end
161+
end
47162
end
48163
end

0 commit comments

Comments
 (0)