11import csv
22import os
3+ import io
34import struct
5+ from timeit import default_timer as timer
46import redis
57import click
8+ from backports import csv
69
710# Global variables
811CONFIGS = None # thresholds for batching Redis queries
@@ -55,6 +58,9 @@ def __init__(self, graphname, client):
5558 self .labels = [] # List containing all pending Label objects
5659 self .reltypes = [] # List containing all pending RelationType objects
5760
61+ self .nodes_created = 0 # Total number of nodes created
62+ self .relations_created = 0 # Total number of relations created
63+
5864 # Send all pending inserts to Redis
5965 def send_buffer (self ):
6066 # Do nothing if we have no entities
@@ -68,7 +74,9 @@ def send_buffer(self):
6874 self .initial_query = False
6975
7076 result = self .client .execute_command ("GRAPH.BULK" , self .graphname , * args )
71- print (result )
77+ stats = result .split (', ' .encode ())
78+ self .nodes_created += int (stats [0 ].split (' ' .encode ())[0 ])
79+ self .relations_created += int (stats [1 ].split (' ' .encode ())[0 ])
7280
7381 self .clear_buffer ()
7482
@@ -83,13 +91,17 @@ def clear_buffer(self):
8391 del self .labels [:]
8492 del self .reltypes [:]
8593
94+ def report_completion (self , runtime ):
95+ print ("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds"
96+ % (self .graphname , self .nodes_created , self .relations_created , runtime ))
97+
8698# Superclass for label and relation CSV files
8799class EntityFile (object ):
88100 def __init__ (self , filename ):
89101 # The label or relation type string is the basename of the file
90- self .entity_str = os .path .splitext (os .path .basename (filename ))[0 ].encode ("ascii" )
102+ self .entity_str = os .path .splitext (os .path .basename (filename ))[0 ].encode ('utf-8' )
91103 # Input file handling
92- self .infile = open (filename , 'rt' )
104+ self .infile = io . open (filename , 'rt' , encoding = 'utf-8 ' )
93105 # Initialize CSV reader that ignores leading whitespace in each field
94106 # and does not modify input quote characters
95107 self .reader = csv .reader (self .infile , skipinitialspace = True , quoting = csv .QUOTE_NONE )
@@ -100,6 +112,17 @@ def __init__(self, filename):
100112 self .packed_header = ""
101113 self .binary_entities = []
102114 self .binary_size = 0 # size of binary token
115+ self .count_entities () # number of entities/row in file.
116+
117+ # Count number of rows in file.
118+ def count_entities (self ):
119+ self .entities_count = 0
120+ self .entities_count = sum (1 for line in self .infile )
121+ # discard header row
122+ self .entities_count -= 1
123+ # seek back
124+ self .infile .seek (0 )
125+ return self .entities_count
103126
104127 # Simple input validations for each row of a CSV file
105128 def validate_row (self , expected_col_count , row ):
@@ -119,9 +142,10 @@ def pack_header(self, header):
119142 # String format
120143 fmt = "=%dsI" % (len (self .entity_str ) + 1 ) # Unaligned native, entity_string, count of properties
121144 args = [self .entity_str , prop_count ]
122- for prop in header [self .prop_offset :]:
145+ for p in header [self .prop_offset :]:
146+ prop = p .encode ('utf-8' )
123147 fmt += "%ds" % (len (prop ) + 1 ) # encode string with a null terminator
124- args += [ str . encode (prop )]
148+ args . append (prop )
125149 return struct .pack (fmt , * args )
126150
127151 # Convert a list of properties into a binary string
@@ -160,31 +184,35 @@ def process_entities(self, expected_col_count):
160184 global TOP_NODE_ID
161185 global QUERY_BUF
162186
163- for row in self .reader :
164- self .validate_row (expected_col_count , row )
165- # Add identifier->ID pair to dictionary if we are building relations
166- if NODE_DICT is not None :
167- if row [0 ] in NODE_DICT :
168- print ("Node identifier '%s' was used multiple times - second occurrence at %s:%d"
169- % (row [0 ], self .infile .name , self .reader .line_num ))
170- exit (1 )
171- NODE_DICT [row [0 ]] = TOP_NODE_ID
172- TOP_NODE_ID += 1
173- row_binary = self .pack_props (row )
174- row_binary_len = len (row_binary )
175- # If the addition of this entity will make the binary token grow too large,
176- # send the buffer now.
177- if self .binary_size + row_binary_len > CONFIGS .max_token_size :
178- QUERY_BUF .labels .append (self .to_binary ())
179- QUERY_BUF .send_buffer ()
180- self .reset_partial_binary ()
181- # Push the label onto the query buffer again, as there are more entities to process.
182- QUERY_BUF .labels .append (self .to_binary ())
183-
184- QUERY_BUF .node_count += 1
185- self .binary_size += row_binary_len
186- self .binary_entities .append (row_binary )
187- QUERY_BUF .labels .append (self .to_binary ())
187+ entities_created = 0
188+ with click .progressbar (self .reader , length = self .entities_count , label = self .entity_str ) as reader :
189+ for row in reader :
190+ self .validate_row (expected_col_count , row )
191+ # Add identifier->ID pair to dictionary if we are building relations
192+ if NODE_DICT is not None :
193+ if row [0 ] in NODE_DICT :
194+ print ("Node identifier '%s' was used multiple times - second occurrence at %s:%d"
195+ % (row [0 ], self .infile .name , self .reader .line_num ))
196+ exit (1 )
197+ NODE_DICT [row [0 ]] = TOP_NODE_ID
198+ TOP_NODE_ID += 1
199+ row_binary = self .pack_props (row )
200+ row_binary_len = len (row_binary )
201+ # If the addition of this entity will make the binary token grow too large,
202+ # send the buffer now.
203+ if self .binary_size + row_binary_len > CONFIGS .max_token_size :
204+ QUERY_BUF .labels .append (self .to_binary ())
205+ QUERY_BUF .send_buffer ()
206+ self .reset_partial_binary ()
207+ # Push the label onto the query buffer again, as there are more entities to process.
208+ QUERY_BUF .labels .append (self .to_binary ())
209+
210+ QUERY_BUF .node_count += 1
211+ entities_created += 1
212+ self .binary_size += row_binary_len
213+ self .binary_entities .append (row_binary )
214+ QUERY_BUF .labels .append (self .to_binary ())
215+ print ("%d nodes created with label '%s'" % (entities_created , self .entity_str ))
188216
189217# Handler class for processing relation csv files.
190218class RelationType (EntityFile ):
@@ -211,31 +239,34 @@ def process_header(self):
211239 return expected_col_count
212240
213241 def process_entities (self , expected_col_count ):
214- for row in self .reader :
215- self .validate_row (expected_col_count , row )
216-
217- try :
218- src = NODE_DICT [row [0 ]]
219- dest = NODE_DICT [row [1 ]]
220- except KeyError as e :
221- print ("Relationship specified a non-existent identifier." )
222- raise e
223- fmt = "=QQ" # 8-byte unsigned ints for src and dest
224- row_binary = struct .pack (fmt , src , dest ) + self .pack_props (row )
225- row_binary_len = len (row_binary )
226- # If the addition of this entity will make the binary token grow too large,
227- # send the buffer now.
228- if self .binary_size + row_binary_len > CONFIGS .max_token_size :
229- QUERY_BUF .reltypes .append (self .to_binary ())
230- QUERY_BUF .send_buffer ()
231- self .reset_partial_binary ()
232- # Push the reltype onto the query buffer again, as there are more entities to process.
233- QUERY_BUF .reltypes .append (self .to_binary ())
234-
235- QUERY_BUF .relation_count += 1
236- self .binary_size += row_binary_len
237- self .binary_entities .append (row_binary )
238- QUERY_BUF .reltypes .append (self .to_binary ())
242+ entities_created = 0
243+ with click .progressbar (self .reader , length = self .entities_count , label = self .entity_str ) as reader :
244+ for row in reader :
245+ self .validate_row (expected_col_count , row )
246+ try :
247+ src = NODE_DICT [row [0 ]]
248+ dest = NODE_DICT [row [1 ]]
249+ except KeyError as e :
250+ print ("Relationship specified a non-existent identifier." )
251+ raise e
252+ fmt = "=QQ" # 8-byte unsigned ints for src and dest
253+ row_binary = struct .pack (fmt , src , dest ) + self .pack_props (row )
254+ row_binary_len = len (row_binary )
255+ # If the addition of this entity will make the binary token grow too large,
256+ # send the buffer now.
257+ if self .binary_size + row_binary_len > CONFIGS .max_token_size :
258+ QUERY_BUF .reltypes .append (self .to_binary ())
259+ QUERY_BUF .send_buffer ()
260+ self .reset_partial_binary ()
261+ # Push the reltype onto the query buffer again, as there are more entities to process.
262+ QUERY_BUF .reltypes .append (self .to_binary ())
263+
264+ QUERY_BUF .relation_count += 1
265+ entities_created += 1
266+ self .binary_size += row_binary_len
267+ self .binary_entities .append (row_binary )
268+ QUERY_BUF .reltypes .append (self .to_binary ())
269+ print ("%d relations created for type '%s'" % (entities_created , self .entity_str ))
239270
240271# Convert a single CSV property field into a binary stream.
241272# Supported property types are string, numeric, boolean, and NULL.
@@ -261,8 +292,9 @@ def prop_to_binary(prop_str):
261292
262293 # If we've reached this point, the property is a string
263294 # Encoding len+1 adds a null terminator to the string
264- format_str += "%ds" % (len (prop_str ) + 1 )
265- return struct .pack (format_str , Type .STRING , str .encode (prop_str ))
295+ encoded_str = prop_str .encode ('utf-8' )
296+ format_str += "%ds" % (len (encoded_str ) + 1 )
297+ return struct .pack (format_str , Type .STRING , encoded_str )
266298
267299# For each node input file, validate contents and convert to binary format.
268300# If any buffer limits have been reached, flush all enqueued inserts to Redis.
@@ -293,7 +325,7 @@ def process_entity_csvs(cls, csvs):
293325@click .option ('--relations' , '-r' , multiple = True , help = 'Path to relation csv file' )
294326# Buffer size restrictions
295327@click .option ('--max-token-count' , '-c' , default = 1024 , help = 'max number of processed CSVs to send per query (default 1024)' )
296- @click .option ('--max-buffer-size' , '-b' , default = 4096 , help = 'max buffer size in megabytes (default 4096 )' )
328+ @click .option ('--max-buffer-size' , '-b' , default = 2048 , help = 'max buffer size in megabytes (default 2048 )' )
297329@click .option ('--max-token-size' , '-t' , default = 500 , help = 'max size of each token in megabytes (default 500, max 512)' )
298330
299331def bulk_insert (graph , host , port , password , nodes , relations , max_token_count , max_buffer_size , max_token_size ):
@@ -303,9 +335,9 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count,
303335 global QUERY_BUF
304336
305337 TOP_NODE_ID = 0 # reset global ID variable (in case we are calling bulk_insert from unit tests)
306-
307338 CONFIGS = Configs (max_token_count , max_buffer_size , max_token_size )
308339
340+ start_time = timer ()
309341 # Attempt to connect to Redis server
310342 try :
311343 client = redis .StrictRedis (host = host , port = port , password = password )
@@ -339,5 +371,8 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count,
339371 # Send all remaining tokens to Redis
340372 QUERY_BUF .send_buffer ()
341373
374+ end_time = timer ()
375+ QUERY_BUF .report_completion (end_time - start_time )
376+
342377if __name__ == '__main__' :
343378 bulk_insert ()
0 commit comments