Skip to content

Commit e436b37

Browse files
authored
Merge pull request #4 from RedisLabsModules/bulk-unicode-support
Unicode support and progress indicators added
2 parents b756766 + 0c270cd commit e436b37

File tree

2 files changed

+95
-59
lines changed

2 files changed

+95
-59
lines changed

bulk_insert.py

Lines changed: 94 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import csv
22
import os
3+
import io
34
import struct
5+
from timeit import default_timer as timer
46
import redis
57
import click
8+
from backports import csv
69

710
# Global variables
811
CONFIGS = None # thresholds for batching Redis queries
@@ -55,6 +58,9 @@ def __init__(self, graphname, client):
5558
self.labels = [] # List containing all pending Label objects
5659
self.reltypes = [] # List containing all pending RelationType objects
5760

61+
self.nodes_created = 0 # Total number of nodes created
62+
self.relations_created = 0 # Total number of relations created
63+
5864
# Send all pending inserts to Redis
5965
def send_buffer(self):
6066
# Do nothing if we have no entities
@@ -68,7 +74,9 @@ def send_buffer(self):
6874
self.initial_query = False
6975

7076
result = self.client.execute_command("GRAPH.BULK", self.graphname, *args)
71-
print(result)
77+
stats = result.split(', '.encode())
78+
self.nodes_created += int(stats[0].split(' '.encode())[0])
79+
self.relations_created += int(stats[1].split(' '.encode())[0])
7280

7381
self.clear_buffer()
7482

@@ -83,13 +91,17 @@ def clear_buffer(self):
8391
del self.labels[:]
8492
del self.reltypes[:]
8593

94+
def report_completion(self, runtime):
95+
print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds"
96+
% (self.graphname, self.nodes_created, self.relations_created, runtime))
97+
8698
# Superclass for label and relation CSV files
8799
class EntityFile(object):
88100
def __init__(self, filename):
89101
# The label or relation type string is the basename of the file
90-
self.entity_str = os.path.splitext(os.path.basename(filename))[0].encode("ascii")
102+
self.entity_str = os.path.splitext(os.path.basename(filename))[0].encode('utf-8')
91103
# Input file handling
92-
self.infile = open(filename, 'rt')
104+
self.infile = io.open(filename, 'rt', encoding='utf-8')
93105
# Initialize CSV reader that ignores leading whitespace in each field
94106
# and does not modify input quote characters
95107
self.reader = csv.reader(self.infile, skipinitialspace=True, quoting=csv.QUOTE_NONE)
@@ -100,6 +112,17 @@ def __init__(self, filename):
100112
self.packed_header = ""
101113
self.binary_entities = []
102114
self.binary_size = 0 # size of binary token
115+
self.count_entities() # number of entities/row in file.
116+
117+
# Count number of rows in file.
118+
def count_entities(self):
119+
self.entities_count = 0
120+
self.entities_count = sum(1 for line in self.infile)
121+
# discard header row
122+
self.entities_count -= 1
123+
# seek back
124+
self.infile.seek(0)
125+
return self.entities_count
103126

104127
# Simple input validations for each row of a CSV file
105128
def validate_row(self, expected_col_count, row):
@@ -119,9 +142,10 @@ def pack_header(self, header):
119142
# String format
120143
fmt = "=%dsI" % (len(self.entity_str) + 1) # Unaligned native, entity_string, count of properties
121144
args = [self.entity_str, prop_count]
122-
for prop in header[self.prop_offset:]:
145+
for p in header[self.prop_offset:]:
146+
prop = p.encode('utf-8')
123147
fmt += "%ds" % (len(prop) + 1) # encode string with a null terminator
124-
args += [str.encode(prop)]
148+
args.append(prop)
125149
return struct.pack(fmt, *args)
126150

127151
# Convert a list of properties into a binary string
@@ -160,31 +184,35 @@ def process_entities(self, expected_col_count):
160184
global TOP_NODE_ID
161185
global QUERY_BUF
162186

163-
for row in self.reader:
164-
self.validate_row(expected_col_count, row)
165-
# Add identifier->ID pair to dictionary if we are building relations
166-
if NODE_DICT is not None:
167-
if row[0] in NODE_DICT:
168-
print("Node identifier '%s' was used multiple times - second occurrence at %s:%d"
169-
% (row[0], self.infile.name, self.reader.line_num))
170-
exit(1)
171-
NODE_DICT[row[0]] = TOP_NODE_ID
172-
TOP_NODE_ID += 1
173-
row_binary = self.pack_props(row)
174-
row_binary_len = len(row_binary)
175-
# If the addition of this entity will make the binary token grow too large,
176-
# send the buffer now.
177-
if self.binary_size + row_binary_len > CONFIGS.max_token_size:
178-
QUERY_BUF.labels.append(self.to_binary())
179-
QUERY_BUF.send_buffer()
180-
self.reset_partial_binary()
181-
# Push the label onto the query buffer again, as there are more entities to process.
182-
QUERY_BUF.labels.append(self.to_binary())
183-
184-
QUERY_BUF.node_count += 1
185-
self.binary_size += row_binary_len
186-
self.binary_entities.append(row_binary)
187-
QUERY_BUF.labels.append(self.to_binary())
187+
entities_created = 0
188+
with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader:
189+
for row in reader:
190+
self.validate_row(expected_col_count, row)
191+
# Add identifier->ID pair to dictionary if we are building relations
192+
if NODE_DICT is not None:
193+
if row[0] in NODE_DICT:
194+
print("Node identifier '%s' was used multiple times - second occurrence at %s:%d"
195+
% (row[0], self.infile.name, self.reader.line_num))
196+
exit(1)
197+
NODE_DICT[row[0]] = TOP_NODE_ID
198+
TOP_NODE_ID += 1
199+
row_binary = self.pack_props(row)
200+
row_binary_len = len(row_binary)
201+
# If the addition of this entity will make the binary token grow too large,
202+
# send the buffer now.
203+
if self.binary_size + row_binary_len > CONFIGS.max_token_size:
204+
QUERY_BUF.labels.append(self.to_binary())
205+
QUERY_BUF.send_buffer()
206+
self.reset_partial_binary()
207+
# Push the label onto the query buffer again, as there are more entities to process.
208+
QUERY_BUF.labels.append(self.to_binary())
209+
210+
QUERY_BUF.node_count += 1
211+
entities_created += 1
212+
self.binary_size += row_binary_len
213+
self.binary_entities.append(row_binary)
214+
QUERY_BUF.labels.append(self.to_binary())
215+
print("%d nodes created with label '%s'" % (entities_created, self.entity_str))
188216

189217
# Handler class for processing relation csv files.
190218
class RelationType(EntityFile):
@@ -211,31 +239,34 @@ def process_header(self):
211239
return expected_col_count
212240

213241
def process_entities(self, expected_col_count):
214-
for row in self.reader:
215-
self.validate_row(expected_col_count, row)
216-
217-
try:
218-
src = NODE_DICT[row[0]]
219-
dest = NODE_DICT[row[1]]
220-
except KeyError as e:
221-
print("Relationship specified a non-existent identifier.")
222-
raise e
223-
fmt = "=QQ" # 8-byte unsigned ints for src and dest
224-
row_binary = struct.pack(fmt, src, dest) + self.pack_props(row)
225-
row_binary_len = len(row_binary)
226-
# If the addition of this entity will make the binary token grow too large,
227-
# send the buffer now.
228-
if self.binary_size + row_binary_len > CONFIGS.max_token_size:
229-
QUERY_BUF.reltypes.append(self.to_binary())
230-
QUERY_BUF.send_buffer()
231-
self.reset_partial_binary()
232-
# Push the reltype onto the query buffer again, as there are more entities to process.
233-
QUERY_BUF.reltypes.append(self.to_binary())
234-
235-
QUERY_BUF.relation_count += 1
236-
self.binary_size += row_binary_len
237-
self.binary_entities.append(row_binary)
238-
QUERY_BUF.reltypes.append(self.to_binary())
242+
entities_created = 0
243+
with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader:
244+
for row in reader:
245+
self.validate_row(expected_col_count, row)
246+
try:
247+
src = NODE_DICT[row[0]]
248+
dest = NODE_DICT[row[1]]
249+
except KeyError as e:
250+
print("Relationship specified a non-existent identifier.")
251+
raise e
252+
fmt = "=QQ" # 8-byte unsigned ints for src and dest
253+
row_binary = struct.pack(fmt, src, dest) + self.pack_props(row)
254+
row_binary_len = len(row_binary)
255+
# If the addition of this entity will make the binary token grow too large,
256+
# send the buffer now.
257+
if self.binary_size + row_binary_len > CONFIGS.max_token_size:
258+
QUERY_BUF.reltypes.append(self.to_binary())
259+
QUERY_BUF.send_buffer()
260+
self.reset_partial_binary()
261+
# Push the reltype onto the query buffer again, as there are more entities to process.
262+
QUERY_BUF.reltypes.append(self.to_binary())
263+
264+
QUERY_BUF.relation_count += 1
265+
entities_created += 1
266+
self.binary_size += row_binary_len
267+
self.binary_entities.append(row_binary)
268+
QUERY_BUF.reltypes.append(self.to_binary())
269+
print("%d relations created for type '%s'" % (entities_created, self.entity_str))
239270

240271
# Convert a single CSV property field into a binary stream.
241272
# Supported property types are string, numeric, boolean, and NULL.
@@ -261,8 +292,9 @@ def prop_to_binary(prop_str):
261292

262293
# If we've reached this point, the property is a string
263294
# Encoding len+1 adds a null terminator to the string
264-
format_str += "%ds" % (len(prop_str) + 1)
265-
return struct.pack(format_str, Type.STRING, str.encode(prop_str))
295+
encoded_str = prop_str.encode('utf-8')
296+
format_str += "%ds" % (len(encoded_str) + 1)
297+
return struct.pack(format_str, Type.STRING, encoded_str)
266298

267299
# For each node input file, validate contents and convert to binary format.
268300
# If any buffer limits have been reached, flush all enqueued inserts to Redis.
@@ -293,7 +325,7 @@ def process_entity_csvs(cls, csvs):
293325
@click.option('--relations', '-r', multiple=True, help='Path to relation csv file')
294326
# Buffer size restrictions
295327
@click.option('--max-token-count', '-c', default=1024, help='max number of processed CSVs to send per query (default 1024)')
296-
@click.option('--max-buffer-size', '-b', default=4096, help='max buffer size in megabytes (default 4096)')
328+
@click.option('--max-buffer-size', '-b', default=2048, help='max buffer size in megabytes (default 2048)')
297329
@click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)')
298330

299331
def bulk_insert(graph, host, port, password, nodes, relations, max_token_count, max_buffer_size, max_token_size):
@@ -303,9 +335,9 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count,
303335
global QUERY_BUF
304336

305337
TOP_NODE_ID = 0 # reset global ID variable (in case we are calling bulk_insert from unit tests)
306-
307338
CONFIGS = Configs(max_token_count, max_buffer_size, max_token_size)
308339

340+
start_time = timer()
309341
# Attempt to connect to Redis server
310342
try:
311343
client = redis.StrictRedis(host=host, port=port, password=password)
@@ -339,5 +371,8 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count,
339371
# Send all remaining tokens to Redis
340372
QUERY_BUF.send_buffer()
341373

374+
end_time = timer()
375+
QUERY_BUF.report_completion(end_time - start_time)
376+
342377
if __name__ == '__main__':
343378
bulk_insert()

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
redis==2.10.6
22
click>=6.7
3+
backports.csv

0 commit comments

Comments
 (0)