11//! A general purpose `Batcher` implementation based on radix sort.
22
3+ use std:: collections:: VecDeque ;
4+
35use timely:: communication:: message:: RefOrMut ;
46use timely:: progress:: frontier:: Antichain ;
57
@@ -120,60 +122,6 @@ where
120122}
121123
122124
123- use std:: slice:: { from_raw_parts} ;
124-
125- pub struct VecQueue < T > {
126- list : Vec < T > ,
127- head : usize ,
128- tail : usize ,
129- }
130-
131- impl < T > VecQueue < T > {
132- #[ inline]
133- pub fn new ( ) -> Self { VecQueue :: from ( Vec :: new ( ) ) }
134- #[ inline]
135- pub fn pop ( & mut self ) -> T {
136- debug_assert ! ( self . head < self . tail) ;
137- self . head += 1 ;
138- unsafe { :: std:: ptr:: read ( self . list . as_mut_ptr ( ) . offset ( ( self . head as isize ) - 1 ) ) }
139- }
140- #[ inline]
141- pub fn peek ( & self ) -> & T {
142- debug_assert ! ( self . head < self . tail) ;
143- unsafe { self . list . get_unchecked ( self . head ) }
144- }
145- #[ inline]
146- pub fn _peek_tail ( & self ) -> & T {
147- debug_assert ! ( self . head < self . tail) ;
148- unsafe { self . list . get_unchecked ( self . tail -1 ) }
149- }
150- #[ inline]
151- pub fn _slice ( & self ) -> & [ T ] {
152- debug_assert ! ( self . head < self . tail) ;
153- unsafe { from_raw_parts ( self . list . get_unchecked ( self . head ) , self . tail - self . head ) }
154- }
155- #[ inline]
156- pub fn from ( mut list : Vec < T > ) -> Self {
157- let tail = list. len ( ) ;
158- unsafe { list. set_len ( 0 ) ; }
159- VecQueue {
160- list : list,
161- head : 0 ,
162- tail : tail,
163- }
164- }
165- // could leak, if self.head != self.tail.
166- #[ inline]
167- pub fn done ( self ) -> Vec < T > {
168- debug_assert ! ( self . head == self . tail) ;
169- self . list
170- }
171- #[ inline]
172- pub fn len ( & self ) -> usize { self . tail - self . head }
173- #[ inline]
174- pub fn is_empty ( & self ) -> bool { self . head == self . tail }
175- }
176-
177125#[ inline]
178126unsafe fn push_unchecked < T > ( vec : & mut Vec < T > , element : T ) {
179127 debug_assert ! ( vec. len( ) < vec. capacity( ) ) ;
@@ -277,28 +225,28 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
277225 let mut output = Vec :: with_capacity ( list1. len ( ) + list2. len ( ) ) ;
278226 let mut result = self . empty ( ) ;
279227
280- let mut list1 = VecQueue :: from ( list1) ;
281- let mut list2 = VecQueue :: from ( list2) ;
228+ let mut list1 = list1. into_iter ( ) ;
229+ let mut list2 = list2. into_iter ( ) ;
282230
283- let mut head1 = if !list1 . is_empty ( ) { VecQueue :: from ( list1. pop ( ) ) } else { VecQueue :: new ( ) } ;
284- let mut head2 = if !list2 . is_empty ( ) { VecQueue :: from ( list2. pop ( ) ) } else { VecQueue :: new ( ) } ;
231+ let mut head1 = VecDeque :: from ( list1. next ( ) . unwrap_or_default ( ) ) ;
232+ let mut head2 = VecDeque :: from ( list2. next ( ) . unwrap_or_default ( ) ) ;
285233
286234 // while we have valid data in each input, merge.
287235 while !head1. is_empty ( ) && !head2. is_empty ( ) {
288236
289237 while ( result. capacity ( ) - result. len ( ) ) > 0 && head1. len ( ) > 0 && head2. len ( ) > 0 {
290238
291239 let cmp = {
292- let x = head1. peek ( ) ;
293- let y = head2. peek ( ) ;
240+ let x = head1. front ( ) . unwrap ( ) ;
241+ let y = head2. front ( ) . unwrap ( ) ;
294242 ( & x. 0 , & x. 1 ) . cmp ( & ( & y. 0 , & y. 1 ) )
295243 } ;
296244 match cmp {
297- Ordering :: Less => { unsafe { push_unchecked ( & mut result, head1. pop ( ) ) ; } }
298- Ordering :: Greater => { unsafe { push_unchecked ( & mut result, head2. pop ( ) ) ; } }
245+ Ordering :: Less => { unsafe { push_unchecked ( & mut result, head1. pop_front ( ) . unwrap ( ) ) ; } }
246+ Ordering :: Greater => { unsafe { push_unchecked ( & mut result, head2. pop_front ( ) . unwrap ( ) ) ; } }
299247 Ordering :: Equal => {
300- let ( data1, time1, mut diff1) = head1. pop ( ) ;
301- let ( _data2, _time2, diff2) = head2. pop ( ) ;
248+ let ( data1, time1, mut diff1) = head1. pop_front ( ) . unwrap ( ) ;
249+ let ( _data2, _time2, diff2) = head2. pop_front ( ) . unwrap ( ) ;
302250 diff1. plus_equals ( & diff2) ;
303251 if !diff1. is_zero ( ) {
304252 unsafe { push_unchecked ( & mut result, ( data1, time1, diff1) ) ; }
@@ -313,14 +261,14 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
313261 }
314262
315263 if head1. is_empty ( ) {
316- let done1 = head1 . done ( ) ;
264+ let done1 = Vec :: from ( head1 ) ;
317265 if done1. capacity ( ) == Self :: buffer_size ( ) { self . stash . push ( done1) ; }
318- head1 = if !list1 . is_empty ( ) { VecQueue :: from ( list1. pop ( ) ) } else { VecQueue :: new ( ) } ;
266+ head1 = VecDeque :: from ( list1. next ( ) . unwrap_or_default ( ) ) ;
319267 }
320268 if head2. is_empty ( ) {
321- let done2 = head2 . done ( ) ;
269+ let done2 = Vec :: from ( head2 ) ;
322270 if done2. capacity ( ) == Self :: buffer_size ( ) { self . stash . push ( done2) ; }
323- head2 = if !list2 . is_empty ( ) { VecQueue :: from ( list2. pop ( ) ) } else { VecQueue :: new ( ) } ;
271+ head2 = VecDeque :: from ( list2. next ( ) . unwrap_or_default ( ) ) ;
324272 }
325273 }
326274
@@ -329,21 +277,17 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
329277
330278 if !head1. is_empty ( ) {
331279 let mut result = self . empty ( ) ;
332- for _ in 0 .. head1. len ( ) { result. push ( head1 . pop ( ) ) ; }
280+ for item1 in head1 { result. push ( item1 ) ; }
333281 output. push ( result) ;
334282 }
335- while !list1. is_empty ( ) {
336- output. push ( list1. pop ( ) ) ;
337- }
283+ output. extend ( list1) ;
338284
339285 if !head2. is_empty ( ) {
340286 let mut result = self . empty ( ) ;
341- for _ in 0 .. head2. len ( ) { result. push ( head2 . pop ( ) ) ; }
287+ for item2 in head2 { result. push ( item2 ) ; }
342288 output. push ( result) ;
343289 }
344- while !list2. is_empty ( ) {
345- output. push ( list2. pop ( ) ) ;
346- }
290+ output. extend ( list2) ;
347291
348292 output
349293 }
0 commit comments