Skip to content

Commit 40de2c5

Browse files
mahurtadoswilly22
authored andcommitted
Custom separators & edge loading error management (#8)
* Added support for custom delimiters in CSVs and continue processing after errors on edges
1 parent fca74f1 commit 40de2c5

File tree

1 file changed

+22
-15
lines changed

1 file changed

+22
-15
lines changed

bulk_insert.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class Type:
2626

2727
# User-configurable thresholds for when to send queries to Redis
2828
class Configs(object):
29-
def __init__(self, max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes):
29+
def __init__(self, max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes, skip_invalid_edges):
3030
# Maximum number of tokens per query
3131
# 1024 * 1024 is the hard-coded Redis maximum. We'll set a slightly lower limit so
3232
# that we can safely ignore tokens that aren't binary strings
@@ -39,6 +39,7 @@ def __init__(self, max_token_count, max_buffer_size, max_token_size, skip_invali
3939
self.max_token_size = min(max_token_size * 1000000, 512 * 1000000)
4040

4141
self.skip_invalid_nodes = skip_invalid_nodes
42+
self.skip_invalid_edges = skip_invalid_edges
4243

4344
# QueryBuffer is the class that processes input CSVs and emits their binary formats to the Redis client.
4445
class QueryBuffer(object):
@@ -99,14 +100,14 @@ def report_completion(self, runtime):
99100

100101
# Superclass for label and relation CSV files
101102
class EntityFile(object):
102-
def __init__(self, filename):
103+
def __init__(self, filename, separator):
103104
# The label or relation type string is the basename of the file
104105
self.entity_str = os.path.splitext(os.path.basename(filename))[0]
105106
# Input file handling
106107
self.infile = io.open(filename, 'rt')
107108
# Initialize CSV reader that ignores leading whitespace in each field
108109
# and does not modify input quote characters
109-
self.reader = csv.reader(self.infile, skipinitialspace=True, quoting=QUOTING)
110+
self.reader = csv.reader(self.infile, delimiter=separator, skipinitialspace=True, quoting=QUOTING)
110111

111112
self.prop_offset = 0 # Starting index of properties in row
112113
self.prop_count = 0 # Number of properties per entity
@@ -164,8 +165,8 @@ def to_binary(self):
164165

165166
# Handler class for processing label csv files.
166167
class Label(EntityFile):
167-
def __init__(self, infile):
168-
super(Label, self).__init__(infile)
168+
def __init__(self, infile, separator):
169+
super(Label, self).__init__(infile, separator)
169170
expected_col_count = self.process_header()
170171
self.process_entities(expected_col_count)
171172
self.infile.close()
@@ -220,8 +221,8 @@ def process_entities(self, expected_col_count):
220221

221222
# Handler class for processing relation csv files.
222223
class RelationType(EntityFile):
223-
def __init__(self, infile):
224-
super(RelationType, self).__init__(infile)
224+
def __init__(self, infile, separator):
225+
super(RelationType, self).__init__(infile, separator)
225226
expected_col_count = self.process_header()
226227
self.process_entities(expected_col_count)
227228
self.infile.close()
@@ -251,8 +252,11 @@ def process_entities(self, expected_col_count):
251252
src = NODE_DICT[row[0]]
252253
dest = NODE_DICT[row[1]]
253254
except KeyError as e:
254-
print("Relationship specified a non-existent identifier.")
255-
raise e
255+
print("Relationship specified a non-existent identifier. src: %s; dest: %s" % (row[0], row[1]))
256+
if CONFIGS.skip_invalid_edges is False:
257+
raise e
258+
else:
259+
continue
256260
fmt = "=QQ" # 8-byte unsigned ints for src and dest
257261
row_binary = struct.pack(fmt, src, dest) + self.pack_props(row)
258262
row_binary_len = len(row_binary)
@@ -302,11 +306,11 @@ def prop_to_binary(prop_str):
302306

303307
# For each node input file, validate contents and convert to binary format.
304308
# If any buffer limits have been reached, flush all enqueued inserts to Redis.
305-
def process_entity_csvs(cls, csvs):
309+
def process_entity_csvs(cls, csvs, separator):
306310
global QUERY_BUF
307311
for in_csv in csvs:
308312
# Build entity descriptor from input CSV
309-
entity = cls(in_csv)
313+
entity = cls(in_csv, separator)
310314
added_size = entity.binary_size
311315
# Check to see if the addition of this data will exceed the buffer's capacity
312316
if (QUERY_BUF.buffer_size + added_size >= CONFIGS.max_buffer_size
@@ -327,14 +331,17 @@ def process_entity_csvs(cls, csvs):
327331
# CSV file paths
328332
@click.option('--nodes', '-n', required=True, multiple=True, help='Path to node csv file')
329333
@click.option('--relations', '-r', multiple=True, help='Path to relation csv file')
334+
@click.option('--separator', '-o', default=',', help='Field token separator in csv file')
330335
# Buffer size restrictions
331336
@click.option('--max-token-count', '-c', default=1024, help='max number of processed CSVs to send per query (default 1024)')
332337
@click.option('--max-buffer-size', '-b', default=2048, help='max buffer size in megabytes (default 2048)')
333338
@click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)')
334339
@click.option('--quote-minimal/--no-quote-minimal', '-q/-d', default=False, help='only quote those fields which contain special characters such as delimiter, quotechar or any of the characters in lineterminator')
335340
@click.option('--skip-invalid-nodes', '-s', default=False, is_flag=True, help='ignore nodes that use previously defined IDs')
341+
@click.option('--skip-invalid-edges', '-e', default=False, is_flag=True, help='ignore invalid edges, print an error message and continue loading (True), or stop loading after an edge loading failure (False)')
336342

337-
def bulk_insert(graph, host, port, password, nodes, relations, max_token_count, max_buffer_size, max_token_size, quote_minimal, skip_invalid_nodes):
343+
344+
def bulk_insert(graph, host, port, password, nodes, relations, separator, max_token_count, max_buffer_size, max_token_size, quote_minimal, skip_invalid_nodes, skip_invalid_edges):
338345
global CONFIGS
339346
global NODE_DICT
340347
global TOP_NODE_ID
@@ -350,7 +357,7 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count,
350357
QUOTING=csv.QUOTE_NONE
351358

352359
TOP_NODE_ID = 0 # reset global ID variable (in case we are calling bulk_insert from unit tests)
353-
CONFIGS = Configs(max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes)
360+
CONFIGS = Configs(max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes, skip_invalid_edges)
354361

355362
start_time = timer()
356363
# Attempt to connect to Redis server
@@ -384,10 +391,10 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count,
384391
else:
385392
NODE_DICT = None
386393

387-
process_entity_csvs(Label, nodes)
394+
process_entity_csvs(Label, nodes, separator)
388395

389396
if relations:
390-
process_entity_csvs(RelationType, relations)
397+
process_entity_csvs(RelationType, relations, separator)
391398

392399
# Send all remaining tokens to Redis
393400
QUERY_BUF.send_buffer()

0 commit comments

Comments
 (0)