Skip to content

Commit 69269a0

Browse files
authored
feat: support DataType TimestampTz (#18892)
* feat: support DataType `TimestampTimezone` * chore: add `to_timestamp_tz` from timestamp * chore: add interval function for timestamp_tz * chore: add meta test of TimestampTz * chore: fix bug on codex * chore: codefmt * chore: fix e2e test * chore: remove timestamp_tz on old datatype
1 parent 64fe1d3 commit 69269a0

File tree

84 files changed

+2249
-530
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+2249
-530
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/column/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ either = { workspace = true }
2929
ethnum = { workspace = true }
3030
foreign_vec = { workspace = true }
3131
hex = { workspace = true }
32+
jiff = { workspace = true }
3233
log = { workspace = true }
3334
match-template = { workspace = true }
3435
num-traits = { workspace = true }

src/common/column/src/types/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ pub enum PrimitiveType {
8787
DaysMs,
8888
/// months_days_micros(i32, i32, i64)
8989
MonthDayMicros,
90+
/// timestamp_tz(i32, i64)
91+
TimestampTz,
9092
}
9193

9294
mod private {
@@ -115,5 +117,6 @@ mod private {
115117
impl Sealed for OrderedFloat<f64> {}
116118
impl Sealed for super::days_ms {}
117119
impl Sealed for super::months_days_micros {}
120+
impl Sealed for super::timestamp_tz {}
118121
impl Sealed for View {}
119122
}

src/common/column/src/types/native.rs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
use std::cmp::Ordering;
1717
use std::convert::TryFrom;
18+
use std::fmt::Debug;
19+
use std::fmt::Display;
20+
use std::fmt::Formatter;
1821
use std::hash::Hash;
1922
use std::hash::Hasher;
2023
use std::ops::Add;
@@ -27,12 +30,17 @@ use borsh::BorshSerialize;
2730
use bytemuck::Pod;
2831
use bytemuck::Zeroable;
2932
use databend_common_base::base::OrderedFloat;
33+
use jiff::fmt::strtime;
34+
use jiff::tz;
35+
use jiff::Timestamp;
3036
use log::error;
3137
use serde_derive::Deserialize;
3238
use serde_derive::Serialize;
3339

3440
use super::PrimitiveType;
3541

42+
pub const TIMESTAMP_TIMEZONE_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.6f %z";
43+
3644
/// Sealed trait implemented by all physical types that can be allocated,
3745
/// serialized and deserialized by this crate.
3846
/// All O(N) allocations in this crate are done for this trait alone.
@@ -71,6 +79,10 @@ pub trait NativeType:
7179

7280
/// From bytes in big endian
7381
fn from_be_bytes(bytes: Self::Bytes) -> Self;
82+
83+
fn size_of() -> usize {
84+
std::mem::size_of::<Self>()
85+
}
7486
}
7587

7688
macro_rules! native_type {
@@ -428,6 +440,138 @@ impl Neg for months_days_micros {
428440
}
429441
}
430442

443+
/// The in-memory representation of the MonthDayNano variant of the "Interval" logical type.
444+
#[derive(
445+
Debug,
446+
Copy,
447+
Clone,
448+
Default,
449+
Eq,
450+
Zeroable,
451+
Pod,
452+
Serialize,
453+
Deserialize,
454+
BorshSerialize,
455+
BorshDeserialize,
456+
)]
457+
#[allow(non_camel_case_types)]
458+
#[repr(C)]
459+
pub struct timestamp_tz(pub i128);
460+
461+
impl Hash for timestamp_tz {
462+
fn hash<H: Hasher>(&self, state: &mut H) {
463+
self.total_micros().hash(state)
464+
}
465+
}
466+
impl PartialEq for timestamp_tz {
467+
fn eq(&self, other: &Self) -> bool {
468+
self.total_micros() == other.total_micros()
469+
}
470+
}
471+
impl PartialOrd for timestamp_tz {
472+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
473+
Some(self.cmp(other))
474+
}
475+
}
476+
477+
impl Ord for timestamp_tz {
478+
fn cmp(&self, other: &Self) -> Ordering {
479+
let total_micros = self.total_micros();
480+
let other_micros = other.total_micros();
481+
total_micros.cmp(&other_micros)
482+
}
483+
}
484+
485+
impl timestamp_tz {
486+
pub const MICROS_PER_SECOND: i64 = 1_000_000;
487+
488+
pub fn new(timestamp: i64, offset: i32) -> Self {
489+
let ts = timestamp as u64 as i128; // <- 中间加一次 u64 屏蔽符号位
490+
let off = (offset as i128) << 64;
491+
Self(off | ts)
492+
}
493+
494+
#[inline]
495+
pub fn timestamp(&self) -> i64 {
496+
self.0 as u64 as i64
497+
}
498+
499+
#[inline]
500+
pub fn seconds_offset(&self) -> i32 {
501+
(self.0 >> 64) as i32
502+
}
503+
504+
#[inline]
505+
pub fn micros_offset(&self) -> Option<i64> {
506+
(self.seconds_offset() as i64).checked_mul(Self::MICROS_PER_SECOND)
507+
}
508+
509+
#[inline]
510+
pub fn hours_offset(&self) -> i8 {
511+
(self.seconds_offset() / 3600) as i8
512+
}
513+
514+
#[inline]
515+
pub fn total_micros(&self) -> i64 {
516+
self.try_total_micros().unwrap_or_else(|| {
517+
error!(
518+
"interval is out of range: timestamp={}, offset={}",
519+
self.timestamp(),
520+
self.seconds_offset()
521+
);
522+
0
523+
})
524+
}
525+
526+
#[inline]
527+
pub fn try_total_micros(&self) -> Option<i64> {
528+
let offset_micros = self.micros_offset()?;
529+
self.timestamp().checked_sub(offset_micros)
530+
}
531+
}
532+
533+
impl Display for timestamp_tz {
534+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
535+
let timestamp = Timestamp::from_microsecond(self.timestamp()).unwrap();
536+
537+
let offset = tz::Offset::from_seconds(self.seconds_offset()).unwrap();
538+
let string = strtime::format(
539+
TIMESTAMP_TIMEZONE_FORMAT,
540+
&timestamp.to_zoned(offset.to_time_zone()),
541+
)
542+
.unwrap();
543+
write!(f, "{}", string)
544+
}
545+
}
546+
547+
impl NativeType for timestamp_tz {
548+
const PRIMITIVE: PrimitiveType = PrimitiveType::TimestampTz;
549+
type Bytes = [u8; 16];
550+
#[inline]
551+
fn to_le_bytes(&self) -> Self::Bytes {
552+
self.0.to_le_bytes()
553+
}
554+
555+
#[inline]
556+
fn to_be_bytes(&self) -> Self::Bytes {
557+
self.0.to_be_bytes()
558+
}
559+
560+
#[inline]
561+
fn from_le_bytes(bytes: Self::Bytes) -> Self {
562+
let mut buf16 = [0u8; 16];
563+
buf16.copy_from_slice(&bytes);
564+
Self(i128::from_le_bytes(buf16))
565+
}
566+
567+
#[inline]
568+
fn from_be_bytes(bytes: Self::Bytes) -> Self {
569+
let mut buf16 = [0u8; 16];
570+
buf16.copy_from_slice(&bytes);
571+
Self(i128::from_be_bytes(buf16))
572+
}
573+
}
574+
431575
/// Type representation of the Float16 physical type
432576
#[derive(Copy, Clone, Default, Zeroable, Pod)]
433577
#[allow(non_camel_case_types)]

src/common/exception/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ geozero = { workspace = true }
2020
gimli = { workspace = true }
2121
http = { workspace = true }
2222
hyper = { workspace = true }
23+
jiff = { workspace = true }
2324
libc = { workspace = true }
2425
object = { workspace = true }
2526
once_cell = { workspace = true }

src/common/exception/src/exception_code.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ build_exceptions! {
177177
StagePermissionDenied(2506),
178178
}
179179

180-
// Data Format and Parsing Errors [1046, 1057, 1060, 1064, 1072, 1074-1081, 1090, 1201-1202, 2507-2509]
180+
// Data Format and Parsing Errors [1046, 1057, 1060, 1064, 1072, 1074-1081, 1090, 1201-1202, 2507-2509, 2513]
181181
build_exceptions! {
182182
/// Bad bytes
183183
BadBytes(1046),
@@ -215,6 +215,8 @@ build_exceptions! {
215215
IllegalFileFormat(2508),
216216
/// File format already exists
217217
FileFormatAlreadyExists(2509),
218+
/// Jiff error
219+
JiffError(2513),
218220
}
219221

220222
// Cluster and Resource Management Errors [1035, 1045, 1082, 1101, 2401-2410, 4013]

src/common/exception/src/exception_into.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,12 @@ impl From<ParseError> for ErrorCode {
228228
}
229229
}
230230

231+
impl From<jiff::Error> for ErrorCode {
232+
fn from(error: jiff::Error) -> Self {
233+
ErrorCode::JiffError(error.to_string())
234+
}
235+
}
236+
231237
impl From<GeozeroError> for ErrorCode {
232238
fn from(value: GeozeroError) -> Self {
233239
ErrorCode::GeometryError(value.to_string())

src/common/native/src/read/array/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,8 @@ mod fixed_list;
3535
pub use fixed_list::*;
3636
mod interval;
3737
mod map;
38+
mod timestamp_tz;
39+
3840
pub use interval::*;
3941
pub use map::*;
42+
pub use timestamp_tz::*;
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::io::BufReader;
16+
use std::io::Cursor;
17+
18+
use databend_common_column::buffer::Buffer;
19+
use databend_common_column::error::Result;
20+
use databend_common_column::types::timestamp_tz;
21+
use databend_common_expression::types::timestamp_tz::TimestampTzType;
22+
use databend_common_expression::types::ArgType;
23+
use databend_common_expression::Column;
24+
use databend_common_expression::TableDataType;
25+
26+
use crate::compression::integer::decompress_integer;
27+
use crate::nested::InitNested;
28+
use crate::nested::NestedState;
29+
use crate::read::read_basic::read_nested;
30+
use crate::read::NativeReadBuf;
31+
use crate::read::PageIterator;
32+
use crate::PageMeta;
33+
34+
#[derive(Debug)]
35+
pub struct TimestampTzNestedIter<I>
36+
where I: Iterator<Item = Result<(u64, Vec<u8>)>> + PageIterator + Send + Sync
37+
{
38+
iter: I,
39+
data_type: TableDataType,
40+
init: Vec<InitNested>,
41+
scratch: Vec<u8>,
42+
}
43+
44+
impl<I> TimestampTzNestedIter<I>
45+
where I: Iterator<Item = Result<(u64, Vec<u8>)>> + PageIterator + Send + Sync
46+
{
47+
pub fn new(iter: I, data_type: TableDataType, init: Vec<InitNested>) -> Self {
48+
Self {
49+
iter,
50+
data_type,
51+
init,
52+
scratch: vec![],
53+
}
54+
}
55+
}
56+
57+
impl<I> TimestampTzNestedIter<I>
58+
where I: Iterator<Item = Result<(u64, Vec<u8>)>> + PageIterator + Send + Sync
59+
{
60+
fn deserialize(&mut self, num_values: u64, buffer: Vec<u8>) -> Result<(NestedState, Column)> {
61+
let mut reader = BufReader::with_capacity(buffer.len(), Cursor::new(buffer));
62+
let (nested, validity) = read_nested(&mut reader, &self.init, num_values as usize)?;
63+
let length = num_values as usize;
64+
65+
let mut values = Vec::with_capacity(length);
66+
decompress_integer(&mut reader, length, &mut values, &mut self.scratch)?;
67+
assert_eq!(values.len(), length);
68+
69+
let mut buffer = reader.into_inner().into_inner();
70+
self.iter.swap_buffer(&mut buffer);
71+
72+
let column: Buffer<i128> = values.into();
73+
let column: Buffer<timestamp_tz> = unsafe { std::mem::transmute(column) };
74+
let mut col = TimestampTzType::upcast_column(column);
75+
if self.data_type.is_nullable() {
76+
col = col.wrap_nullable(validity);
77+
}
78+
Ok((nested, col))
79+
}
80+
}
81+
82+
impl<I> Iterator for TimestampTzNestedIter<I>
83+
where I: Iterator<Item = Result<(u64, Vec<u8>)>> + PageIterator + Send + Sync
84+
{
85+
type Item = Result<(NestedState, Column)>;
86+
87+
fn next(&mut self) -> Option<Self::Item> {
88+
match self.iter.next() {
89+
Some(Ok((num_values, buffer))) => Some(self.deserialize(num_values, buffer)),
90+
Some(Err(err)) => Some(Result::Err(err)),
91+
None => None,
92+
}
93+
}
94+
95+
fn nth(&mut self, n: usize) -> Option<Self::Item> {
96+
match self.iter.nth(n) {
97+
Some(Ok((num_values, buffer))) => Some(self.deserialize(num_values, buffer)),
98+
Some(Err(err)) => Some(Result::Err(err)),
99+
None => None,
100+
}
101+
}
102+
}
103+
104+
pub fn read_nested_timestamp_tz<R: NativeReadBuf>(
105+
reader: &mut R,
106+
data_type: TableDataType,
107+
init: Vec<InitNested>,
108+
page_metas: Vec<PageMeta>,
109+
) -> Result<Vec<(NestedState, Column)>> {
110+
let mut scratch = vec![];
111+
let mut results = Vec::with_capacity(page_metas.len());
112+
for page_meta in page_metas {
113+
let num_values = page_meta.num_values as usize;
114+
let (nested, validity) = read_nested(reader, &init, num_values)?;
115+
116+
let mut values = Vec::with_capacity(num_values);
117+
decompress_integer(reader, num_values, &mut values, &mut scratch)?;
118+
119+
let column: Buffer<i128> = values.into();
120+
let column: Buffer<timestamp_tz> = unsafe { std::mem::transmute(column) };
121+
let mut col = TimestampTzType::upcast_column(column);
122+
if data_type.is_nullable() {
123+
col = col.wrap_nullable(validity);
124+
}
125+
results.push((nested, col));
126+
}
127+
Ok(results)
128+
}

0 commit comments

Comments
 (0)