@@ -60,40 +60,83 @@ def __init__(self, iterable=None, buffer_size=4):
6060 self ._is_view = True
6161 return
6262
63- # Add elements of the iterable.
63+ try :
64+ # If possible try pre-allocating memory.
65+ if len (iterable ) > 0 :
66+ first_element = np .asarray (iterable [0 ])
67+ n_elements = np .sum ([len (iterable [i ])
68+ for i in range (len (iterable ))])
69+ new_shape = (n_elements ,) + first_element .shape [1 :]
70+ self ._data = np .empty (new_shape , dtype = first_element .dtype )
71+ except TypeError :
72+ pass
73+
74+ # Initialize the `ArraySequence` object from iterable's item.
75+ coroutine = self ._extend_using_coroutine ()
76+ coroutine .send (None ) # Run until the first yield.
77+
78+ for e in iterable :
79+ coroutine .send (e )
80+
81+ coroutine .close () # Terminate coroutine.
82+
83+ def _extend_using_coroutine (self , buffer_size = 4 ):
84+ """ Creates a coroutine allowing to append elements.
85+
86+ Parameters
87+ ----------
88+ buffer_size : float, optional
89+ Size (in Mb) for memory pre-allocation.
90+
91+ Returns
92+ -------
93+ coroutine
94+ Coroutine object which expects the values to be appended to this
95+ array sequence.
96+
97+ Notes
98+ -----
99+ This method is essential for
100+ :func:`create_arraysequences_from_generator` as it allows for an
101+ efficient way of creating multiple array sequences in a hyperthreaded
102+ fashion and still benefit from the memory buffering. Whitout this
103+ method the alternative would be to use :meth:`append` which does
104+ not have such buffering mechanism and thus is at least one order of
105+ magnitude slower.
106+ """
64107 offsets = []
65108 lengths = []
66- # Initialize the `ArraySequence` object from iterable's item.
67- offset = 0
68- for i , e in enumerate (iterable ):
69- e = np .asarray (e )
70- if i == 0 :
71- try :
72- n_elements = np .sum ([len (iterable [i ])
73- for i in range (len (iterable ))])
74- new_shape = (n_elements ,) + e .shape [1 :]
75- except TypeError :
76- # Can't get the number of elements in iterable. So,
77- # we use a memory buffer while building the ArraySequence.
109+
110+ offset = 0 if len (self ) == 0 else self ._offsets [- 1 ] + self ._lengths [- 1 ]
111+ try :
112+ first_element = True
113+ while True :
114+ e = (yield )
115+ e = np .asarray (e )
116+ if first_element :
117+ first_element = False
78118 n_rows_buffer = int (buffer_size * 1024 ** 2 // e .nbytes )
79119 new_shape = (n_rows_buffer ,) + e .shape [1 :]
120+ if len (self ) == 0 :
121+ self ._data = np .empty (new_shape , dtype = e .dtype )
80122
81- self ._data = np .empty (new_shape , dtype = e .dtype )
123+ end = offset + len (e )
124+ if end > len (self ._data ):
125+ # Resize needed, adding `len(e)` items plus some buffer.
126+ nb_points = len (self ._data )
127+ nb_points += len (e ) + n_rows_buffer
128+ self ._data .resize ((nb_points ,) + self .common_shape )
82129
83- end = offset + len (e )
84- if end > len (self ._data ):
85- # Resize needed, adding `len(e)` items plus some buffer.
86- nb_points = len (self ._data )
87- nb_points += len (e ) + n_rows_buffer
88- self ._data .resize ((nb_points ,) + self .common_shape )
130+ offsets .append (offset )
131+ lengths .append (len (e ))
132+ self ._data [offset :offset + len (e )] = e
133+ offset += len (e )
89134
90- offsets .append (offset )
91- lengths .append (len (e ))
92- self ._data [offset :offset + len (e )] = e
93- offset += len (e )
135+ except GeneratorExit :
136+ pass
94137
95- self ._offsets = np .asarray ( offsets )
96- self ._lengths = np .asarray ( lengths )
138+ self ._offsets = np .concatenate ([ self . _offsets , offsets ], axis = 0 )
139+ self ._lengths = np .concatenate ([ self . _lengths , lengths ], axis = 0 )
97140
98141 # Clear unused memory.
99142 self ._data .resize ((offset ,) + self .common_shape )
@@ -266,13 +309,6 @@ def __getitem__(self, idx):
266309 seq ._is_view = True
267310 return seq
268311
269- # for name, slice_ in data_per_point_slice.items():
270- # seq = ArraySequence()
271- # seq._data = scalars._data[:, slice_]
272- # seq._offsets = scalars._offsets
273- # seq._lengths = scalars._lengths
274- # tractogram.data_per_point[name] = seq
275-
276312 raise TypeError ("Index must be either an int, a slice, a list of int"
277313 " or a ndarray of bool! Not " + str (type (idx )))
278314
@@ -320,10 +356,27 @@ def load(cls, filename):
320356
321357def create_arraysequences_from_generator (gen , n ):
322358 """ Creates :class:`ArraySequence` objects from a generator yielding tuples
359+
360+ Parameters
361+ ----------
362+ gen : generator
363+ Generator yielding a size `n` tuple containing the values to put in the
364+ array sequences.
365+ n : int
366+ Number of :class:`ArraySequences` object to create.
323367 """
324368 seqs = [ArraySequence () for _ in range (n )]
369+ coroutines = [seq ._extend_using_coroutine () for seq in seqs ]
370+
371+ for coroutine in coroutines :
372+ coroutine .send (None )
373+
325374 for data in gen :
326- for i , seq in enumerate (seqs ):
327- seq .append (data [i ])
375+ for i , coroutine in enumerate (coroutines ):
376+ if data [i ].nbytes > 0 :
377+ coroutine .send (data [i ])
378+
379+ for coroutine in coroutines :
380+ coroutine .close ()
328381
329382 return seqs
0 commit comments