Skip to content

Commit 2ff239f

Browse files
author
Ziad Sawalha
committed
Add batch support
1 parent 517c3f8 commit 2ff239f

File tree

8 files changed

+358
-47
lines changed

8 files changed

+358
-47
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,12 +290,12 @@ You are now done defining your importer, let's run it!
290290

291291
### Import from a file, path or string
292292

293-
You can import from a file, path or just the CSV content. Please note
294-
that we currently load the entire file in memory. Feel free to
295-
contribute if you need to support CSV files with millions of lines! :)
293+
You can import from a file, path or just the CSV content. To avoid loading
294+
the entire file in memory, enable `batch_load` and optionally provide a
295+
`batch_size` (default is 1000).
296296

297297
```ruby
298-
import = ImportUserCSV.new(file: my_file)
298+
import = ImportUserCSV.new(file: my_file, batch_load: true, batch_size: 2_500)
299299
import = ImportUserCSV.new(path: "tmp/new_users.csv")
300300
import = ImportUserCSV.new(content: "email,name\nbob@example.com,bob")
301301
```

lib/csv_importer.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
require "csv_importer/version"
55
require "csv_importer/csv_reader"
6+
require "csv_importer/csv_batch_reader"
67
require "csv_importer/column_definition"
78
require "csv_importer/column"
89
require "csv_importer/header"
@@ -53,9 +54,9 @@ class Configurator < Struct.new(:config)
5354
# .new(path: "subscribers.csv", model: newsletter.subscribers)
5455
#
5556
def initialize(*args, &block)
56-
@csv = CSVReader.new(*args)
5757
@config = self.class.config.dup
5858
@config.attributes = args.last
59+
@csv = config.batch_load ? CSVBatchReader.new(*args) : CSVReader.new(*args)
5960
@report = Report.new
6061
Configurator.new(@config).instance_exec(&block) if block
6162
end
@@ -94,7 +95,8 @@ def valid_header?
9495
def run!
9596
if valid_header?
9697
@report = Runner.call(rows: rows, when_invalid: config.when_invalid,
97-
after_save_blocks: config.after_save_blocks, report: @report)
98+
after_save_blocks: config.after_save_blocks, report: @report,
99+
batch_load: config.batch_load, batch_size: config.batch_size)
98100
else
99101
@report
100102
end

lib/csv_importer/config.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ class Config
99
attribute :when_invalid, Symbol, default: proc { :skip }
1010
attribute :after_build_blocks, Array[Proc], default: []
1111
attribute :after_save_blocks, Array[Proc], default: []
12+
attribute :batch_load, Boolean, default: false
13+
attribute :batch_size, Integer, default: 1000
1214

1315
def initialize_copy(orig)
1416
super
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
require 'csv'
2+
require 'stringio'
3+
4+
module CSVImporter
5+
6+
# Reads, sanitize and parse a CSV file
7+
class CSVBatchReader < CSVReader
8+
def initialize(*args)
9+
super
10+
separator # Call to process & cache first lines
11+
end
12+
13+
# Returns the header as an Array of Strings
14+
def header
15+
return @header if @header
16+
17+
parsed = CSV.parse_line(lines.first, col_sep: separator, quote_char: quote_char, skip_blanks: true,
18+
external_encoding: source_encoding)
19+
@header = encode_cells([parsed])[0]
20+
end
21+
22+
# Returns the rows as an Enumerator
23+
def rows
24+
@csv_enumerator ||= create_csv_enumerator
25+
end
26+
27+
private
28+
29+
def memoized_enumerator(enum, limit = 10)
30+
cache = []
31+
Enumerator.new do |yielder|
32+
# Yield from cache first
33+
cache.each { |value| yielder << value }
34+
35+
# Fill cache and yield values from the underlying enumerator
36+
while cache.size < limit
37+
begin
38+
value = enum.next
39+
cache << value
40+
yielder << value
41+
rescue StopIteration
42+
break
43+
end
44+
end
45+
46+
# Yield the remaining values directly from the original enumerator
47+
loop do
48+
begin
49+
yielder << enum.next
50+
rescue StopIteration
51+
break
52+
end
53+
end
54+
55+
end
56+
end
57+
58+
def lines
59+
@lines ||= memoized_enumerator(stream_lines(file_or_content_io))
60+
end
61+
62+
def separator
63+
@separator ||= begin
64+
detect_separator(lines.take(10).join("\n"))
65+
end
66+
end
67+
68+
def stream_lines(stream, chunk_size = 4096, line_endings_regex = /(\r\n|\r|\n)/)
69+
Enumerator.new do |yielder|
70+
case stream
71+
when StringIO, IO
72+
buffer = "".force_encoding(source_encoding)
73+
until stream.eof?
74+
chunk = stream.read(chunk_size)
75+
buffer << chunk.force_encoding(source_encoding).encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences
76+
77+
while (match = buffer.match(line_endings_regex))
78+
# Yield the part of the buffer before the line ending
79+
line = sanitize_content(buffer[0...match.begin(0)])
80+
yielder << line unless line.empty?
81+
# Remove the processed part (including the line ending)
82+
buffer = buffer[match.end(0)..-1]
83+
end
84+
end
85+
86+
# Yield any remaining content in the buffer after the end of the file
87+
yielder << sanitize_content(buffer) unless buffer.empty?
88+
stream.close if stream.respond_to?(:close)
89+
90+
when String
91+
File.open(stream, 'r:' + source_encoding) do |file|
92+
stream_lines(file, chunk_size, line_endings_regex).each { |line| yielder << line }
93+
end
94+
95+
else
96+
raise ArgumentError, "Unsupported stream type: #{stream.class}"
97+
end
98+
end
99+
end
100+
101+
def create_csv_enumerator
102+
Enumerator.new do |yielder|
103+
lines.each_with_index do |line, index|
104+
next if index == 0 # skip header
105+
row = CSV.parse_line(
106+
line,
107+
col_sep: separator,
108+
quote_char: quote_char,
109+
skip_blanks: true,
110+
external_encoding: source_encoding
111+
)
112+
yielder << encode_and_sanitize_row(row) if row
113+
end
114+
end
115+
end
116+
117+
def file_or_content_io
118+
if content.is_a?(StringIO)
119+
content
120+
elsif content.is_a?(String)
121+
StringIO.new(content)
122+
elsif file
123+
file
124+
elsif path
125+
File.open(path, 'r')
126+
else
127+
raise Error, "Please provide content, file, or path"
128+
end
129+
end
130+
131+
def encode_and_sanitize_row(row)
132+
row.map do |cell|
133+
cell ? cell.encode(target_encoding).strip : ""
134+
end
135+
end
136+
end
137+
end

lib/csv_importer/csv_reader.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,13 @@ def read_content
4949

5050
def sanitize_content(csv_content)
5151
csv_content
52-
.encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences
53-
.gsub(/\r\r?\n?/, "\n") # Replaces windows line separators with "\n"
52+
&.encode(Encoding.find(source_encoding), invalid: :replace, undef: :replace, replace: '') # Remove invalid byte sequences
53+
&.gsub(/\r\r?\n?/, "\n") # Replaces windows line separators with "\n"
5454
end
5555

56-
SEPARATORS = [",", ";", "\t"]
56+
SEPARATORS = [",", ";", "\t"].freeze
5757

58+
# in a properly formed CSV, amount of separators on each line should be equal.
5859
def detect_separator(csv_content)
5960
SEPARATORS.min_by do |separator|
6061
csv_content.count(separator)

lib/csv_importer/runner.rb

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ def self.call(*args)
1212
attribute :rows, Array[Row]
1313
attribute :when_invalid, Symbol
1414
attribute :after_save_blocks, Array[Proc], default: []
15+
attribute :batch_load, Boolean, default: false
16+
attribute :batch_size, Integer, default: 1000
1517

1618
attribute :report, Report, default: proc { Report.new }
1719

@@ -42,37 +44,55 @@ def abort_when_invalid?
4244
end
4345

4446
def persist_rows!
47+
if batch_load
48+
process_in_batches
49+
else
50+
process_all_at_once
51+
end
52+
end
53+
54+
def process_in_batches
4555
transaction do
46-
rows.each do |row|
47-
tags = []
48-
49-
if row.model.persisted?
50-
tags << :update
51-
else
52-
tags << :create
53-
end
54-
55-
if row.skip?
56-
tags << :skip
57-
else
58-
if row.model.save
59-
tags << :success
60-
else
61-
tags << :failure
62-
end
63-
end
64-
65-
add_to_report(row, tags)
66-
67-
after_save_blocks.each do |block|
68-
case block.arity
69-
when 0 then block.call
70-
when 1 then block.call(row.model)
71-
when 2 then block.call(row.model, row.csv_attributes)
72-
else
73-
raise ArgumentError, "after_save block of arity #{ block.arity } is not supported"
74-
end
75-
end
56+
rows.each_slice(batch_size) do |batch|
57+
batch.each { |row| process_row(row) }
58+
end
59+
end
60+
end
61+
62+
def process_all_at_once
63+
transaction do
64+
rows.each { |row| process_row(row) }
65+
end
66+
end
67+
68+
def process_row(row)
69+
tags = []
70+
71+
if row.model.persisted?
72+
tags << :update
73+
else
74+
tags << :create
75+
end
76+
77+
if row.skip?
78+
tags << :skip
79+
else
80+
if row.model.save
81+
tags << :success
82+
else
83+
tags << :failure
84+
end
85+
end
86+
87+
add_to_report(row, tags)
88+
89+
after_save_blocks.each do |block|
90+
case block.arity
91+
when 0 then block.call
92+
when 1 then block.call(row.model)
93+
when 2 then block.call(row.model, row.csv_attributes)
94+
else
95+
raise ArgumentError, "after_save block of arity #{ block.arity } is not supported"
7696
end
7797
end
7898
end

0 commit comments

Comments
 (0)