Skip to content

Commit 2b969b6

Browse files
authored
block reader supports simple OR conditions on the PK column. (#22911)
PR Summary - Added OR-aware parsing in BasePKFilter: the parser now distributes AND over OR, builds Disjuncts, and invalidates the whole OR if any branch can’t be mapped to a PK predicate. This enables expressions such as (pk = 1 AND pk BETWEEN 5 AND 9) OR pk IN (20, 30) to produce multiple atomic filters. - ConstructBlockPKFilter understands those disjuncts, builds per-atom search functions, and unions their offsets so a single filter can cover mixed operators (e.g. pk prefix_eq 'ab' OR pk >= 1024). Mem filters remain single-op. - Strengthened unit tests (TestConstructBasePKFilterWithOr, TestConstructBlockPKFilterWithOr) to cover composite OR shapes and datatype coverage; vectors are freed properly after each test. - Updated BVT SQL suites (block_or_single_pk.sql, block_or_composite_pk.sql, block_or_no_pk.sql) to insert 8K rows with fault injection hooks, add rich OR queries (<10 rows per result, direct row outputs), and cover (a AND b) OR c plus long OR chains across all supported PK types. Examples 1. Query SELECT ... WHERE pk = 5 OR pk BETWEEN 100 AND 120 OR pk IN (512, 1024) now yields a BlockPKFilter whose SortedSearchFunc merges binary search offsets from all three predicates, ensuring every block overlap is found. 2. Complex condition (pk >= 10 AND pk <= 20) OR pk prefix_eq 'ab' OR pk IN ('z1','z2') now expands into three disjuncts; blocks matching any disjunct are scanned, while MemPKFilter correctly deems multi-op filters invalid. Approved by: @heni02, @XuPeng-SH
1 parent 444e0e8 commit 2b969b6

File tree

10 files changed

+1825
-423
lines changed

10 files changed

+1825
-423
lines changed

pkg/vm/engine/readutil/filter_test.go

Lines changed: 487 additions & 7 deletions
Large diffs are not rendered by default.

pkg/vm/engine/readutil/pk_filter.go

Lines changed: 487 additions & 399 deletions
Large diffs are not rendered by default.

pkg/vm/engine/readutil/pk_filter_base.go

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ type BasePKFilter struct {
3838
UB []byte
3939
Vec *vector.Vector
4040
Oid types.T
41+
// Disjuncts holds OR-ed atomic filters; when non-empty, Op/LB/UB/Vec are ignored.
42+
Disjuncts []BasePKFilter
4143
}
4244

4345
func (b *BasePKFilter) String() string {
@@ -137,47 +139,72 @@ func ConstructBasePKFilter(
137139
return ret, nil
138140

139141
case "or":
140-
var filters []BasePKFilter
142+
var (
143+
filters1 []BasePKFilter
144+
filters2 []BasePKFilter
145+
146+
cannotMerge bool
147+
hasUnsupported bool
148+
)
149+
141150
for idx := range exprImpl.F.Args {
142151
ff, err := ConstructBasePKFilter(exprImpl.F.Args[idx], tblDef, mp)
143152
if err != nil {
144153
return BasePKFilter{}, err
145154
}
146155
if !ff.Valid {
147-
return BasePKFilter{}, err
156+
hasUnsupported = true
157+
continue
148158
}
149159

150-
filters = append(filters, ff)
160+
filters1 = append(filters1, toDisjuncts(ff)...)
161+
filters2 = append(filters2, ff)
151162
}
152163

153-
if len(filters) == 0 {
164+
if hasUnsupported {
154165
return BasePKFilter{}, nil
155166
}
156167

157-
for idx := 0; idx < len(filters)-1; {
158-
f1 := &filters[idx]
159-
f2 := &filters[idx+1]
168+
if len(filters1) == 0 {
169+
return BasePKFilter{}, nil
170+
}
171+
172+
if len(filters1) == 1 {
173+
return filters1[0], nil
174+
}
175+
176+
for idx := 0; idx < len(filters2)-1; {
177+
f1 := &filters2[idx]
178+
f2 := &filters2[idx+1]
160179
ff, err := mergeFilters(f1, f2, function.OR, mp)
161180
if err != nil {
162181
return BasePKFilter{}, nil
163182
}
164183

165184
if !ff.Valid {
166-
return BasePKFilter{}, nil
185+
//return BasePKFilter{}, nil
186+
cannotMerge = true
187+
break
167188
}
168189

169190
idx++
170-
filters[idx] = ff
191+
filters2[idx] = ff
171192
}
172193

173-
for idx := 0; idx < len(filters)-1; idx++ {
174-
if filters[idx].Vec != nil {
175-
filters[idx].Vec.Free(mp)
194+
if !cannotMerge {
195+
for idx := 0; idx < len(filters2)-1; idx++ {
196+
if filters2[idx].Vec != nil {
197+
filters2[idx].Vec.Free(mp)
198+
}
176199
}
200+
201+
ret := filters2[len(filters2)-1]
202+
return ret, nil
177203
}
178204

179-
ret := filters[len(filters)-1]
180-
return ret, nil
205+
filter.Valid = true
206+
filter.Disjuncts = filters1
207+
return filter, nil
181208

182209
case ">=":
183210
//a >= ?
@@ -307,3 +334,10 @@ func ConstructBasePKFilter(
307334

308335
return
309336
}
337+
338+
func toDisjuncts(f BasePKFilter) []BasePKFilter {
339+
if len(f.Disjuncts) > 0 {
340+
return f.Disjuncts
341+
}
342+
return []BasePKFilter{f}
343+
}

pkg/vm/engine/readutil/pk_filter_mem.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@ package readutil
1717
import (
1818
"bytes"
1919
"fmt"
20-
"github.com/matrixorigin/matrixone/pkg/logutil"
21-
"github.com/matrixorigin/matrixone/pkg/objectio"
22-
"go.uber.org/zap"
2320

2421
"github.com/matrixorigin/matrixone/pkg/container/types"
2522
"github.com/matrixorigin/matrixone/pkg/container/vector"
2623
"github.com/matrixorigin/matrixone/pkg/fileservice"
24+
"github.com/matrixorigin/matrixone/pkg/logutil"
25+
"github.com/matrixorigin/matrixone/pkg/objectio"
2726
"github.com/matrixorigin/matrixone/pkg/pb/plan"
2827
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
2928
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
3029
"github.com/matrixorigin/matrixone/pkg/vm/engine"
30+
"go.uber.org/zap"
3131
)
3232

3333
type MemPKFilter struct {
@@ -70,6 +70,11 @@ func NewMemPKFilter(
7070
return
7171
}
7272

73+
// Currently only support single atomic filter in memory path.
74+
if len(basePKFilter.Disjuncts) > 0 {
75+
return
76+
}
77+
7378
var lbVal, ubVal any
7479
var packed [][]byte
7580
var packer *types.Packer
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
drop database if exists block_or_cpk;
2+
create database block_or_cpk;
3+
use block_or_cpk;
4+
select enable_fault_injection();
5+
enable_fault_injection()
6+
true
7+
create table cpk_int(a int, b int, c int, primary key(a, b));
8+
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_cpk.cpk_int');
9+
add_fault_point(fj/cn/flush_small_objs, :::, echo, 40, block_or_cpk.cpk_int)
10+
true
11+
insert into cpk_int select g.result, g.result * 2, g.result * 3 from generate_series(1, 8192) g;
12+
select a, b from cpk_int where a = 10 or a in (20, 30, 40) order by a, b;
13+
a b
14+
10 20
15+
20 40
16+
30 60
17+
40 80
18+
select a, b from cpk_int where a between 100 and 102 or a >= 8190 order by a, b;
19+
a b
20+
100 200
21+
101 202
22+
102 204
23+
8190 16380
24+
8191 16382
25+
8192 16384
26+
select a, b from cpk_int where a < 4 or a > 8189 order by a, b;
27+
a b
28+
1 2
29+
2 4
30+
3 6
31+
8190 16380
32+
8191 16382
33+
8192 16384
34+
select a, b from cpk_int where a <= 5 or a >= 8191 order by a, b;
35+
a b
36+
1 2
37+
2 4
38+
3 6
39+
4 8
40+
5 10
41+
8191 16382
42+
8192 16384
43+
select a, b from cpk_int where (a = 11 and a = 12) or a = 13 order by a, b;
44+
a b
45+
13 26
46+
select a, b from cpk_int where a < 4 or a between 256 and 258 or a in (500, 600) or a >= 8191 order by a, b;
47+
a b
48+
1 2
49+
2 4
50+
3 6
51+
256 512
52+
257 514
53+
258 516
54+
500 1000
55+
600 1200
56+
8191 16382
57+
8192 16384
58+
drop table cpk_int;
59+
create table cpk_varchar(a varchar(64), b varchar(64), c int, primary key(a, b));
60+
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_cpk.cpk_varchar');
61+
add_fault_point(fj/cn/flush_small_objs, :::, echo, 40, block_or_cpk.cpk_varchar)
62+
true
63+
insert into cpk_varchar select cast(g.result as varchar), cast(g.result * 2 as varchar), g.result * 3 from generate_series(1, 8192) g;
64+
select a, b from cpk_varchar where a = '10' or a in ('20', '30', '40') order by cast(a as int), cast(b as int);
65+
a b
66+
10 20
67+
20 40
68+
30 60
69+
40 80
70+
select a, b from cpk_varchar where a between '8000' and '8003' or a between '900' and '903' order by cast(a as int), cast(b as int);
71+
a b
72+
900 1800
73+
901 1802
74+
902 1804
75+
903 1806
76+
8000 16000
77+
8001 16002
78+
8002 16004
79+
8003 16006
80+
select a, b from cpk_varchar where a in ('5', '15', '25') or a = '512' order by cast(a as int), cast(b as int);
81+
a b
82+
5 10
83+
15 30
84+
25 50
85+
512 1024
86+
select a, b from cpk_varchar where (a = '200' and a = '201') or a = '300' order by cast(a as int), cast(b as int);
87+
a b
88+
300 600
89+
select a, b from cpk_varchar where a between '2500' and '2502' or a between '6000' and '6001' or a in ('700', '800') or a between '950' and '952' order by cast(a as int), cast(b as int);
90+
a b
91+
700 1400
92+
800 1600
93+
950 1900
94+
951 1902
95+
952 1904
96+
2500 5000
97+
2501 5002
98+
2502 5004
99+
6000 12000
100+
6001 12002
101+
drop table cpk_varchar;
102+
drop database block_or_cpk;
103+
select disable_fault_injection();
104+
disable_fault_injection()
105+
true
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
drop database if exists block_or_cpk;
2+
create database block_or_cpk;
3+
use block_or_cpk;
4+
5+
select enable_fault_injection();
6+
7+
-- composite pk (int, int) - value range filters
8+
create table cpk_int(a int, b int, c int, primary key(a, b));
9+
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_cpk.cpk_int');
10+
insert into cpk_int select g.result, g.result * 2, g.result * 3 from generate_series(1, 8192) g;
11+
select a, b from cpk_int where a = 10 or a in (20, 30, 40) order by a, b;
12+
select a, b from cpk_int where a between 100 and 102 or a >= 8190 order by a, b;
13+
select a, b from cpk_int where a < 4 or a > 8189 order by a, b;
14+
select a, b from cpk_int where a <= 5 or a >= 8191 order by a, b;
15+
select a, b from cpk_int where (a = 11 and a = 12) or a = 13 order by a, b;
16+
select a, b from cpk_int where a < 4 or a between 256 and 258 or a in (500, 600) or a >= 8191 order by a, b;
17+
drop table cpk_int;
18+
19+
-- composite pk (varchar, varchar) - simple comparisons
20+
create table cpk_varchar(a varchar(64), b varchar(64), c int, primary key(a, b));
21+
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_cpk.cpk_varchar');
22+
insert into cpk_varchar select cast(g.result as varchar), cast(g.result * 2 as varchar), g.result * 3 from generate_series(1, 8192) g;
23+
select a, b from cpk_varchar where a = '10' or a in ('20', '30', '40') order by cast(a as int), cast(b as int);
24+
select a, b from cpk_varchar where a between '8000' and '8003' or a between '900' and '903' order by cast(a as int), cast(b as int);
25+
select a, b from cpk_varchar where a in ('5', '15', '25') or a = '512' order by cast(a as int), cast(b as int);
26+
select a, b from cpk_varchar where (a = '200' and a = '201') or a = '300' order by cast(a as int), cast(b as int);
27+
select a, b from cpk_varchar where a between '2500' and '2502' or a between '6000' and '6001' or a in ('700', '800') or a between '950' and '952' order by cast(a as int), cast(b as int);
28+
drop table cpk_varchar;
29+
30+
drop database block_or_cpk;
31+
32+
select disable_fault_injection();
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
drop database if exists block_or_npk;
2+
create database block_or_npk;
3+
use block_or_npk;
4+
select enable_fault_injection();
5+
enable_fault_injection()
6+
true
7+
create table npk_int(a int, b int);
8+
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_npk.npk_int');
9+
add_fault_point(fj/cn/flush_small_objs, :::, echo, 40, block_or_npk.npk_int)
10+
true
11+
insert into npk_int select g.result, g.result * 10 from generate_series(1, 8192) g;
12+
select a from npk_int where a = 10 or a in (20, 30, 40) order by a;
13+
a
14+
10
15+
20
16+
30
17+
40
18+
select a from npk_int where a between 100 and 102 or a >= 8190 order by a;
19+
a
20+
100
21+
101
22+
102
23+
8190
24+
8191
25+
8192
26+
select a from npk_int where a < 4 or a > 8189 order by a;
27+
a
28+
1
29+
2
30+
3
31+
8190
32+
8191
33+
8192
34+
select a from npk_int where a <= 5 or a >= 8191 order by a;
35+
a
36+
1
37+
2
38+
3
39+
4
40+
5
41+
8191
42+
8192
43+
select a from npk_int where (a = 11 and a = 12) or a = 13 order by a;
44+
a
45+
13
46+
select a from npk_int where a < 4 or a between 256 and 258 or a in (500, 600) or a >= 8191 order by a;
47+
a
48+
1
49+
2
50+
3
51+
256
52+
257
53+
258
54+
500
55+
600
56+
8191
57+
8192
58+
drop table npk_int;
59+
create table npk_varchar(a varchar(64), b int);
60+
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_npk.npk_varchar');
61+
add_fault_point(fj/cn/flush_small_objs, :::, echo, 40, block_or_npk.npk_varchar)
62+
true
63+
insert into npk_varchar select cast(g.result as varchar), g.result from generate_series(1, 8192) g;
64+
select a from npk_varchar where a = '10' or a in ('20', '30', '40') order by cast(a as int);
65+
a
66+
10
67+
20
68+
30
69+
40
70+
select a from npk_varchar where a between '8000' and '8003' or a between '900' and '903' order by cast(a as int);
71+
a
72+
900
73+
901
74+
902
75+
903
76+
8000
77+
8001
78+
8002
79+
8003
80+
select a from npk_varchar where a in ('5', '15', '25') or a = '512' order by cast(a as int);
81+
a
82+
5
83+
15
84+
25
85+
512
86+
select a from npk_varchar where (a = '200' and a = '201') or a = '300' order by cast(a as int);
87+
a
88+
300
89+
select a from npk_varchar where a between '2500' and '2502' or a between '6000' and '6001' or a in ('700', '800') or a between '950' and '952' order by cast(a as int);
90+
a
91+
700
92+
800
93+
950
94+
951
95+
952
96+
2500
97+
2501
98+
2502
99+
6000
100+
6001
101+
drop table npk_varchar;
102+
drop database block_or_npk;
103+
select disable_fault_injection();
104+
disable_fault_injection()
105+
true

0 commit comments

Comments
 (0)