@@ -16,27 +16,50 @@ pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
1616 mut logic : impl FnMut ( & Key , & Val1 , & Val2 ) -> Result ,
1717) {
1818 let mut results = Vec :: new ( ) ;
19+ let push_result = |k : & Key , v1 : & Val1 , v2 : & Val2 | results. push ( logic ( k, v1, v2) ) ;
1920
20- let recent1 = input1. recent ( ) ;
21- let recent2 = input2. recent ( ) ;
22-
23- {
24- // scoped to let `closure` drop borrow of `results`.
21+ join_delta ( input1, input2, push_result) ;
2522
26- let mut closure = |k : & Key , v1 : & Val1 , v2 : & Val2 | results. push ( logic ( k, v1, v2) ) ;
23+ output. insert ( Relation :: from_vec ( results) ) ;
24+ }
2725
28- for batch2 in input2. stable ( ) . iter ( ) {
29- join_helper ( & recent1, & batch2, & mut closure) ;
26+ pub ( crate ) fn join_and_filter_into < ' me , Key : Ord , Val1 : Ord , Val2 : Ord , Result : Ord > (
27+ input1 : & Variable < ( Key , Val1 ) > ,
28+ input2 : impl JoinInput < ' me , ( Key , Val2 ) > ,
29+ output : & Variable < Result > ,
30+ mut logic : impl FnMut ( & Key , & Val1 , & Val2 ) -> Option < Result > ,
31+ ) {
32+ let mut results = Vec :: new ( ) ;
33+ let push_result = |k : & Key , v1 : & Val1 , v2 : & Val2 | {
34+ if let Some ( result) = logic ( k, v1, v2) {
35+ results. push ( result) ;
3036 }
37+ } ;
3138
32- for batch1 in input1. stable ( ) . iter ( ) {
33- join_helper ( & batch1, & recent2, & mut closure) ;
34- }
39+ join_delta ( input1, input2, push_result) ;
3540
36- join_helper ( & recent1, & recent2, & mut closure) ;
41+ output. insert ( Relation :: from_vec ( results) ) ;
42+ }
43+
44+ /// Joins the `recent` tuples of each input with the `stable` tuples of the other, then the
45+ /// `recent` tuples of *both* inputs.
46+ fn join_delta < ' me , Key : Ord , Val1 : Ord , Val2 : Ord > (
47+ input1 : & Variable < ( Key , Val1 ) > ,
48+ input2 : impl JoinInput < ' me , ( Key , Val2 ) > ,
49+ mut result : impl FnMut ( & Key , & Val1 , & Val2 ) ,
50+ ) {
51+ let recent1 = input1. recent ( ) ;
52+ let recent2 = input2. recent ( ) ;
53+
54+ for batch2 in input2. stable ( ) . iter ( ) {
55+ join_helper ( & recent1, & batch2, & mut result) ;
3756 }
3857
39- output. insert ( Relation :: from_vec ( results) ) ;
58+ for batch1 in input1. stable ( ) . iter ( ) {
59+ join_helper ( & batch1, & recent2, & mut result) ;
60+ }
61+
62+ join_helper ( & recent1, & recent2, & mut result) ;
4063}
4164
4265/// Join, but for two relations.
0 commit comments