Skip to content

Commit f879767

Browse files
authored
feat: impl Date & Timestamp on RANGE BETWEEN (#18696)
* feat: impl Date & Timestamp on `RANGE BETWEEN` * test: add signed int check test case * chore: codefmt
1 parent 376f34f commit f879767

File tree

5 files changed

+446
-194
lines changed

5 files changed

+446
-194
lines changed

src/query/expression/src/values.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,23 @@ impl ScalarRef<'_> {
655655
}
656656
}
657657

658+
pub fn get_i64(&self) -> Option<i64> {
659+
match self {
660+
ScalarRef::Number(n) => match n {
661+
NumberScalar::Int8(x) => Some(*x as _),
662+
NumberScalar::Int16(x) => Some(*x as _),
663+
NumberScalar::Int32(x) => Some(*x as _),
664+
NumberScalar::Int64(x) => Some(*x as _),
665+
NumberScalar::UInt8(x) => Some(*x as _),
666+
NumberScalar::UInt16(x) => Some(*x as _),
667+
NumberScalar::UInt32(x) => Some(*x as _),
668+
NumberScalar::UInt64(x) => i64::try_from(*x).ok(),
669+
_ => None,
670+
},
671+
_ => None,
672+
}
673+
}
674+
658675
pub fn memory_size(&self) -> usize {
659676
match self {
660677
ScalarRef::Null | ScalarRef::EmptyArray | ScalarRef::EmptyMap => 0,

src/query/service/src/physical_plans/physical_window.rs

Lines changed: 78 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use databend_common_expression::type_check;
2222
use databend_common_expression::type_check::common_super_type;
2323
use databend_common_expression::types::DataType;
2424
use databend_common_expression::types::NumberDataType;
25-
use databend_common_expression::with_number_mapped_type;
2625
use databend_common_expression::Constant;
2726
use databend_common_expression::ConstantFolder;
2827
use databend_common_expression::DataField;
@@ -196,7 +195,7 @@ impl IPhysicalPlan for Window {
196195
let transform = if self.window_frame.units.is_rows() {
197196
let start_bound = FrameBound::try_from(&self.window_frame.start_bound)?;
198197
let end_bound = FrameBound::try_from(&self.window_frame.end_bound)?;
199-
Box::new(TransformWindow::<u64>::try_create_rows(
198+
Box::new(TransformWindow::try_create_rows(
200199
input,
201200
output,
202201
func.clone(),
@@ -206,36 +205,25 @@ impl IPhysicalPlan for Window {
206205
)?) as Box<dyn Processor>
207206
} else {
208207
if order_by.len() == 1 {
209-
// If the length of order_by is 1, there may be a RANGE frame.
210-
let data_type = input_schema
211-
.field(order_by[0].offset)
212-
.data_type()
213-
.remove_nullable();
214-
with_number_mapped_type!(|NUM_TYPE| match data_type {
215-
DataType::Number(NumberDataType::NUM_TYPE) => {
216-
let start_bound = FrameBound::try_from(&self.window_frame.start_bound)?;
217-
let end_bound = FrameBound::try_from(&self.window_frame.end_bound)?;
218-
return Ok(ProcessorPtr::create(Box::new(
219-
TransformWindow::<NUM_TYPE>::try_create_range(
220-
input,
221-
output,
222-
func.clone(),
223-
partition_by.clone(),
224-
order_by.clone(),
225-
(start_bound, end_bound),
226-
)?,
227-
)
228-
as Box<dyn Processor>));
229-
}
230-
_ => {}
231-
})
208+
let start_bound = FrameBound::try_from(&self.window_frame.start_bound)?;
209+
let end_bound = FrameBound::try_from(&self.window_frame.end_bound)?;
210+
return Ok(ProcessorPtr::create(
211+
Box::new(TransformWindow::try_create_range(
212+
input,
213+
output,
214+
func.clone(),
215+
partition_by.clone(),
216+
order_by.clone(),
217+
(start_bound, end_bound),
218+
)?) as Box<dyn Processor>,
219+
));
232220
}
233221

234222
// There is no offset in the RANGE frame. (just CURRENT ROW or UNBOUNDED)
235223
// So we can use any number type to create the transform.
236224
let start_bound = FrameBound::try_from(&self.window_frame.start_bound)?;
237225
let end_bound = FrameBound::try_from(&self.window_frame.end_bound)?;
238-
Box::new(TransformWindow::<u8>::try_create_range(
226+
Box::new(TransformWindow::try_create_range(
239227
input,
240228
output,
241229
func.clone(),
@@ -380,49 +368,74 @@ impl PhysicalPlanBuilder {
380368
};
381369

382370
let mut common_ty = order_by.type_check(&*input_schema)?.data_type().clone();
383-
for scalar in start.iter_mut().chain(end.iter_mut()) {
384-
let ty = scalar.as_ref().infer_data_type();
385-
common_ty = common_super_type(
386-
common_ty.clone(),
387-
ty.clone(),
388-
&BUILTIN_FUNCTIONS.default_cast_rules,
389-
)
390-
.ok_or_else(|| {
391-
ErrorCode::IllegalDataType(format!(
392-
"Cannot find common type for {:?} and {:?}",
393-
&common_ty, &ty
394-
))
395-
})?;
396-
}
371+
if common_ty.remove_nullable().is_timestamp() {
372+
for scalar in start.iter_mut().chain(end.iter_mut()) {
373+
let scalar_ty = scalar.as_ref().infer_data_type();
374+
if !scalar_ty.is_interval() {
375+
return Err(ErrorCode::IllegalDataType(format!("when the type of the order by in window func is Timestamp, Preceding and Following can only be INTERVAL types, but get {}", scalar_ty)));
376+
}
377+
}
378+
} else if common_ty.remove_nullable().is_date() {
379+
let mut last_ty = None;
380+
for scalar in start.iter_mut().chain(end.iter_mut()) {
381+
let scalar_ty = scalar.as_ref().infer_data_type();
382+
if !scalar_ty.is_interval() && !scalar_ty.is_unsigned_numeric() {
383+
return Err(ErrorCode::IllegalDataType(format!("when the type of the order by in window func is Date, Preceding and Following can only be INTERVAL or Unsigned Integer types, but get {}", scalar_ty)));
384+
}
385+
if last_ty.as_ref().is_none_or(|ty| ty == &scalar_ty) {
386+
last_ty = Some(scalar_ty);
387+
continue;
388+
}
389+
return Err(ErrorCode::IllegalDataType(format!("when the type of the order by in window func is Date, Preceding and Following can only be of the same type, but get {} and {}", last_ty.unwrap(), scalar_ty)));
390+
}
391+
} else {
392+
for scalar in start.iter_mut().chain(end.iter_mut()) {
393+
let ty = scalar.as_ref().infer_data_type();
394+
common_ty = common_super_type(
395+
common_ty.clone(),
396+
ty.clone(),
397+
&BUILTIN_FUNCTIONS.default_cast_rules,
398+
)
399+
.ok_or_else(|| {
400+
ErrorCode::IllegalDataType(format!(
401+
"Cannot find common type for {:?} and {:?}",
402+
&common_ty, &ty
403+
))
404+
})?;
405+
}
406+
*order_by = wrap_cast(order_by, &common_ty);
397407

398-
*order_by = wrap_cast(order_by, &common_ty);
399-
for scalar in start.iter_mut().chain(end.iter_mut()) {
400-
let raw_expr = RawExpr::<usize>::Cast {
401-
span: w.span,
402-
is_try: false,
403-
expr: Box::new(RawExpr::Constant {
408+
for scalar in start.iter_mut().chain(end.iter_mut()) {
409+
let raw_expr = RawExpr::<usize>::Cast {
404410
span: w.span,
405-
scalar: scalar.clone(),
406-
data_type: None,
407-
}),
408-
dest_type: common_ty.clone(),
409-
};
410-
let expr = type_check::check(&raw_expr, &BUILTIN_FUNCTIONS)?;
411-
let (expr, _) =
412-
ConstantFolder::fold(&expr, &FunctionContext::default(), &BUILTIN_FUNCTIONS);
413-
if let Expr::Constant(Constant {
414-
scalar: new_scalar, ..
415-
}) = expr
416-
{
417-
if new_scalar.is_positive() {
418-
**scalar = new_scalar;
419-
continue;
411+
is_try: false,
412+
expr: Box::new(RawExpr::Constant {
413+
span: w.span,
414+
scalar: scalar.clone(),
415+
data_type: None,
416+
}),
417+
dest_type: common_ty.clone(),
418+
};
419+
let expr = type_check::check(&raw_expr, &BUILTIN_FUNCTIONS)?;
420+
let (expr, _) = ConstantFolder::fold(
421+
&expr,
422+
&FunctionContext::default(),
423+
&BUILTIN_FUNCTIONS,
424+
);
425+
if let Expr::Constant(Constant {
426+
scalar: new_scalar, ..
427+
}) = expr
428+
{
429+
if new_scalar.is_positive() {
430+
**scalar = new_scalar;
431+
continue;
432+
}
420433
}
434+
return Err(ErrorCode::SemanticError(
435+
"Only positive numbers are allowed in RANGE offset".to_string(),
436+
)
437+
.set_span(w.span));
421438
}
422-
return Err(ErrorCode::SemanticError(
423-
"Only positive numbers are allowed in RANGE offset".to_string(),
424-
)
425-
.set_span(w.span));
426439
}
427440
}
428441

src/query/service/src/pipelines/processors/transforms/window/frame_bound.rs

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,63 +16,38 @@ use std::cmp::Ordering;
1616

1717
use databend_common_exception::ErrorCode;
1818
use databend_common_exception::Result;
19-
use databend_common_expression::types::Number;
2019
use databend_common_expression::Scalar;
2120
use databend_common_sql::plans::WindowFuncFrameBound;
2221

2322
#[derive(Debug, PartialEq)]
24-
pub enum FrameBound<T: Number> {
23+
pub enum FrameBound {
2524
CurrentRow,
26-
Preceding(Option<T>),
27-
Following(Option<T>),
25+
Preceding(Option<Scalar>),
26+
Following(Option<Scalar>),
2827
}
2928

30-
impl<T: Number> FrameBound<T> {
31-
pub fn get_inner(&self) -> Option<T> {
29+
impl FrameBound {
30+
pub fn get_inner(&self) -> Option<&Scalar> {
3231
match self {
33-
FrameBound::Preceding(Some(v)) => Some(*v),
34-
FrameBound::Following(Some(v)) => Some(*v),
32+
FrameBound::Preceding(Some(v)) => Some(v),
33+
FrameBound::Following(Some(v)) => Some(v),
3534
_ => None,
3635
}
3736
}
3837
}
3938

40-
impl<T: Number> TryFrom<&WindowFuncFrameBound> for FrameBound<T> {
39+
impl TryFrom<&WindowFuncFrameBound> for FrameBound {
4140
type Error = ErrorCode;
4241
fn try_from(value: &WindowFuncFrameBound) -> Result<Self> {
4342
match value {
4443
WindowFuncFrameBound::CurrentRow => Ok(FrameBound::CurrentRow),
45-
WindowFuncFrameBound::Preceding(v) => Ok(FrameBound::Preceding(
46-
v.as_ref()
47-
.map(|v| {
48-
if let Scalar::Number(scalar) = v {
49-
T::try_downcast_scalar(scalar).ok_or_else(|| {
50-
ErrorCode::Internal(format!("number, but got {:?}", v))
51-
})
52-
} else {
53-
Err(ErrorCode::Internal(format!("number, but got {:?}", v)))
54-
}
55-
})
56-
.transpose()?,
57-
)),
58-
WindowFuncFrameBound::Following(v) => Ok(FrameBound::Following(
59-
v.as_ref()
60-
.map(|v| {
61-
if let Scalar::Number(scalar) = v {
62-
T::try_downcast_scalar(scalar).ok_or_else(|| {
63-
ErrorCode::Internal(format!("number, but got {:?}", v))
64-
})
65-
} else {
66-
Err(ErrorCode::Internal(format!("number, but got {:?}", v)))
67-
}
68-
})
69-
.transpose()?,
70-
)),
44+
WindowFuncFrameBound::Preceding(v) => Ok(FrameBound::Preceding(v.clone())),
45+
WindowFuncFrameBound::Following(v) => Ok(FrameBound::Following(v.clone())),
7146
}
7247
}
7348
}
7449

75-
impl<T: Number> PartialOrd for FrameBound<T> {
50+
impl PartialOrd for FrameBound {
7651
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
7752
match (self, other) {
7853
(FrameBound::CurrentRow, FrameBound::CurrentRow) => Some(Ordering::Equal),

0 commit comments

Comments
 (0)