@@ -2,17 +2,21 @@ use std::sync::Arc;
22
33use arrow:: {
44 array:: {
5- ArrayRef , ArrowPrimitiveType , BooleanBuilder , FixedSizeBinaryBuilder , LargeBinaryBuilder ,
6- ListBuilder , PrimitiveBuilder , RecordBatch , StringBuilder , TimestampMicrosecondBuilder ,
5+ ArrayRef , ArrowPrimitiveType , BooleanBuilder , Decimal128Array , FixedSizeBinaryBuilder ,
6+ LargeBinaryBuilder , ListBuilder , PrimitiveBuilder , RecordBatch , StringBuilder ,
7+ TimestampMicrosecondBuilder ,
78 } ,
89 datatypes:: {
9- DataType , Date32Type , FieldRef , Float32Type , Float64Type , Int32Type , Int64Type , Schema ,
10- Time64MicrosecondType , TimeUnit , TimestampMicrosecondType ,
10+ DataType , Date32Type , Field , FieldRef , Float32Type , Float64Type , Int16Type , Int32Type ,
11+ Int64Type , Schema , Time64MicrosecondType , TimeUnit , TimestampMicrosecondType , UInt32Type ,
1112 } ,
1213 error:: ArrowError ,
1314} ;
1415use chrono:: { NaiveDate , NaiveTime } ;
15- use etl:: types:: { ArrayCell , Cell , DATE_FORMAT , TIME_FORMAT , TIMESTAMP_FORMAT , TableRow } ;
16+ use etl:: types:: {
17+ ArrayCell , Cell , DATE_FORMAT , TIME_FORMAT , TIMESTAMP_FORMAT , TableRow ,
18+ TableSchema as PgTableSchema , Type as PgType ,
19+ } ;
1620
1721pub const UNIX_EPOCH : NaiveDate =
1822 NaiveDate :: from_ymd_opt ( 1970 , 1 , 1 ) . expect ( "unix epoch is a valid date" ) ;
@@ -21,6 +25,30 @@ const MIDNIGHT: NaiveTime = NaiveTime::from_hms_opt(0, 0, 0).expect("midnight is
2125
2226const UUID_BYTE_WIDTH : i32 = 16 ;
2327
28+ /// Extract numeric precision from Postgres atttypmod
29+ /// Based on: https://stackoverflow.com/questions/72725508/how-to-calculate-numeric-precision-and-other-vals-from-atttypmod
30+ fn extract_numeric_precision ( atttypmod : i32 ) -> u8 {
31+ if atttypmod == -1 {
32+ // No limit specified, use maximum precision
33+ 38
34+ } else {
35+ let precision = ( ( atttypmod - 4 ) >> 16 ) & 65535 ;
36+ std:: cmp:: min ( precision as u8 , 38 ) // Cap at Arrow's max precision
37+ }
38+ }
39+
40+ /// Extract numeric scale from Postgres atttypmod
41+ /// Based on: https://stackoverflow.com/questions/72725508/how-to-calculate-numeric-precision-and-other-vals-from-atttypmod
42+ fn extract_numeric_scale ( atttypmod : i32 ) -> i8 {
43+ if atttypmod == -1 {
44+ // No limit specified, use reasonable default scale
45+ 18
46+ } else {
47+ let scale = ( atttypmod - 4 ) & 65535 ;
48+ std:: cmp:: min ( scale as i8 , 38 ) // Cap at reasonable scale
49+ }
50+ }
51+
2452/// Converts a slice of [`TableRow`]s to an Arrow [`RecordBatch`].
2553///
2654/// This function transforms tabular data from the ETL pipeline's internal format
@@ -56,22 +84,31 @@ pub fn rows_to_record_batch(rows: &[TableRow], schema: Schema) -> Result<RecordB
5684fn build_array_for_field ( rows : & [ TableRow ] , field_idx : usize , data_type : & DataType ) -> ArrayRef {
5785 match data_type {
5886 DataType :: Boolean => build_boolean_array ( rows, field_idx) ,
87+ DataType :: Int16 => build_primitive_array :: < Int16Type , _ > ( rows, field_idx, cell_to_i16) ,
5988 DataType :: Int32 => build_primitive_array :: < Int32Type , _ > ( rows, field_idx, cell_to_i32) ,
6089 DataType :: Int64 => build_primitive_array :: < Int64Type , _ > ( rows, field_idx, cell_to_i64) ,
90+ DataType :: UInt32 => build_primitive_array :: < UInt32Type , _ > ( rows, field_idx, cell_to_u32) ,
6191 DataType :: Float32 => build_primitive_array :: < Float32Type , _ > ( rows, field_idx, cell_to_f32) ,
6292 DataType :: Float64 => build_primitive_array :: < Float64Type , _ > ( rows, field_idx, cell_to_f64) ,
6393 DataType :: Utf8 => build_string_array ( rows, field_idx) ,
94+ DataType :: Binary => build_binary_array ( rows, field_idx) ,
6495 DataType :: LargeBinary => build_binary_array ( rows, field_idx) ,
6596 DataType :: Date32 => build_primitive_array :: < Date32Type , _ > ( rows, field_idx, cell_to_date32) ,
6697 DataType :: Time64 ( TimeUnit :: Microsecond ) => {
6798 build_primitive_array :: < Time64MicrosecondType , _ > ( rows, field_idx, cell_to_time64)
6899 }
100+ DataType :: Time64 ( TimeUnit :: Nanosecond ) => {
101+ build_primitive_array :: < Time64MicrosecondType , _ > ( rows, field_idx, cell_to_time64)
102+ }
69103 DataType :: Timestamp ( TimeUnit :: Microsecond , Some ( tz) ) => {
70104 build_timestamptz_array ( rows, field_idx, tz)
71105 }
72106 DataType :: Timestamp ( TimeUnit :: Microsecond , None ) => {
73107 build_primitive_array :: < TimestampMicrosecondType , _ > ( rows, field_idx, cell_to_timestamp)
74108 }
109+ DataType :: Decimal128 ( precision, scale) => {
110+ build_decimal128_array ( rows, field_idx, * precision, * scale)
111+ }
75112 DataType :: FixedSizeBinary ( UUID_BYTE_WIDTH ) => build_uuid_array ( rows, field_idx) ,
76113 DataType :: List ( field) => build_list_array ( rows, field_idx, field. clone ( ) ) ,
77114 _ => build_string_array ( rows, field_idx) ,
@@ -123,6 +160,22 @@ impl_array_builder!(build_boolean_array, BooleanBuilder, cell_to_bool);
123160impl_array_builder ! ( build_string_array, StringBuilder , cell_to_string) ;
124161impl_array_builder ! ( build_binary_array, LargeBinaryBuilder , cell_to_bytes) ;
125162
163+ /// Builds a decimal128 array from [`TableRow`]s for a specific field.
164+ fn build_decimal128_array (
165+ rows : & [ TableRow ] ,
166+ field_idx : usize ,
167+ precision : u8 ,
168+ scale : i8 ,
169+ ) -> ArrayRef {
170+ let values: Vec < Option < i128 > > = rows
171+ . iter ( )
172+ . map ( |row| cell_to_decimal128 ( & row. values [ field_idx] , precision, scale) )
173+ . collect ( ) ;
174+
175+ let decimal_type = DataType :: Decimal128 ( precision, scale) ;
176+ Arc :: new ( Decimal128Array :: from ( values) . with_data_type ( decimal_type) )
177+ }
178+
126179/// Builds a timezone-aware timestamp array from [`TableRow`]s.
127180///
128181/// This function creates an Arrow timestamp array with microsecond precision
@@ -213,6 +266,22 @@ fn cell_to_i64(cell: &Cell) -> Option<i64> {
213266 }
214267}
215268
269+ /// Converts a [`Cell`] to a 16-bit signed integer.
270+ fn cell_to_i16 ( cell : & Cell ) -> Option < i16 > {
271+ match cell {
272+ Cell :: I16 ( v) => Some ( * v) ,
273+ _ => None ,
274+ }
275+ }
276+
277+ /// Converts a [`Cell`] to a 32-bit unsigned integer.
278+ fn cell_to_u32 ( cell : & Cell ) -> Option < u32 > {
279+ match cell {
280+ Cell :: U32 ( v) => Some ( * v) ,
281+ _ => None ,
282+ }
283+ }
284+
216285/// Converts a [`Cell`] to a 32-bit floating-point number.
217286///
218287/// Extracts 32-bit float values from [`Cell::F32`] variants, returning
@@ -235,6 +304,23 @@ fn cell_to_f64(cell: &Cell) -> Option<f64> {
235304 }
236305}
237306
307+ /// Converts a [`Cell`] to a decimal128 value.
308+ fn cell_to_decimal128 ( cell : & Cell , _precision : u8 , scale : i8 ) -> Option < i128 > {
309+ match cell {
310+ Cell :: Numeric ( n) => {
311+ // This is a simplified conversion - ideally we'd preserve the exact decimal representation
312+ if let Ok ( string_val) = n. to_string ( ) . parse :: < f64 > ( ) {
313+ // Scale up by the scale factor and convert to i128
314+ let scaled = ( string_val * 10_f64 . powi ( scale as i32 ) ) as i128 ;
315+ Some ( scaled)
316+ } else {
317+ None
318+ }
319+ }
320+ _ => None ,
321+ }
322+ }
323+
238324/// Converts a [`Cell`] to a byte vector.
239325///
240326/// Extracts binary data from [`Cell::Bytes`] variants by cloning the
@@ -375,20 +461,27 @@ fn cell_to_array_cell(cell: &Cell) -> Option<&ArrayCell> {
375461fn build_list_array ( rows : & [ TableRow ] , field_idx : usize , field : FieldRef ) -> ArrayRef {
376462 match field. data_type ( ) {
377463 DataType :: Boolean => build_boolean_list_array ( rows, field_idx, field) ,
464+ DataType :: Int16 => build_int16_list_array ( rows, field_idx, field) ,
378465 DataType :: Int32 => build_int32_list_array ( rows, field_idx, field) ,
379466 DataType :: Int64 => build_int64_list_array ( rows, field_idx, field) ,
467+ DataType :: UInt32 => build_uint32_list_array ( rows, field_idx, field) ,
380468 DataType :: Float32 => build_float32_list_array ( rows, field_idx, field) ,
381469 DataType :: Float64 => build_float64_list_array ( rows, field_idx, field) ,
382470 DataType :: Utf8 => build_string_list_array ( rows, field_idx, field) ,
471+ DataType :: Binary => build_binary_list_array ( rows, field_idx, field) ,
383472 DataType :: LargeBinary => build_binary_list_array ( rows, field_idx, field) ,
384473 DataType :: Date32 => build_date32_list_array ( rows, field_idx, field) ,
385474 DataType :: Time64 ( TimeUnit :: Microsecond ) => build_time64_list_array ( rows, field_idx, field) ,
475+ DataType :: Time64 ( TimeUnit :: Nanosecond ) => build_time64_list_array ( rows, field_idx, field) ,
386476 DataType :: Timestamp ( TimeUnit :: Microsecond , None ) => {
387477 build_timestamp_list_array ( rows, field_idx, field)
388478 }
389479 DataType :: Timestamp ( TimeUnit :: Microsecond , Some ( _) ) => {
390480 build_timestamptz_list_array ( rows, field_idx, field)
391481 }
482+ DataType :: Decimal128 ( precision, scale) => {
483+ build_decimal128_list_array ( rows, field_idx, field. clone ( ) , * precision, * scale)
484+ }
392485 DataType :: FixedSizeBinary ( UUID_BYTE_WIDTH ) => build_uuid_list_array ( rows, field_idx, field) ,
393486 // For unsupported element types, fall back to string representation
394487 _ => build_list_array_for_strings ( rows, field_idx, field) ,
@@ -421,6 +514,32 @@ fn build_boolean_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef
421514 Arc :: new ( list_builder. finish ( ) )
422515}
423516
517+ /// Builds a list array for 16-bit integer elements.
518+ fn build_int16_list_array ( rows : & [ TableRow ] , field_idx : usize , field : FieldRef ) -> ArrayRef {
519+ let mut list_builder =
520+ ListBuilder :: new ( PrimitiveBuilder :: < Int16Type > :: new ( ) ) . with_field ( field. clone ( ) ) ;
521+
522+ for row in rows {
523+ if let Some ( array_cell) = cell_to_array_cell ( & row. values [ field_idx] ) {
524+ match array_cell {
525+ ArrayCell :: I16 ( vec) => {
526+ for item in vec {
527+ list_builder. values ( ) . append_option ( * item) ;
528+ }
529+ list_builder. append ( true ) ;
530+ }
531+ _ => {
532+ return build_list_array_for_strings ( rows, field_idx, field) ;
533+ }
534+ }
535+ } else {
536+ list_builder. append_null ( ) ;
537+ }
538+ }
539+
540+ Arc :: new ( list_builder. finish ( ) )
541+ }
542+
424543/// Builds a list array for 32-bit integer elements.
425544fn build_int32_list_array ( rows : & [ TableRow ] , field_idx : usize , field : FieldRef ) -> ArrayRef {
426545 let mut list_builder =
@@ -485,6 +604,32 @@ fn build_int64_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef)
485604 Arc :: new ( list_builder. finish ( ) )
486605}
487606
607+ /// Builds a list array for 32-bit unsigned integer elements.
608+ fn build_uint32_list_array ( rows : & [ TableRow ] , field_idx : usize , field : FieldRef ) -> ArrayRef {
609+ let mut list_builder =
610+ ListBuilder :: new ( PrimitiveBuilder :: < UInt32Type > :: new ( ) ) . with_field ( field. clone ( ) ) ;
611+
612+ for row in rows {
613+ if let Some ( array_cell) = cell_to_array_cell ( & row. values [ field_idx] ) {
614+ match array_cell {
615+ ArrayCell :: U32 ( vec) => {
616+ for item in vec {
617+ list_builder. values ( ) . append_option ( * item) ;
618+ }
619+ list_builder. append ( true ) ;
620+ }
621+ _ => {
622+ return build_list_array_for_strings ( rows, field_idx, field) ;
623+ }
624+ }
625+ } else {
626+ list_builder. append_null ( ) ;
627+ }
628+ }
629+
630+ Arc :: new ( list_builder. finish ( ) )
631+ }
632+
488633/// Builds a list array for 32-bit float elements.
489634fn build_float32_list_array ( rows : & [ TableRow ] , field_idx : usize , field : FieldRef ) -> ArrayRef {
490635 let mut list_builder =
@@ -763,6 +908,19 @@ fn build_uuid_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef) -
763908 Arc :: new ( list_builder. finish ( ) )
764909}
765910
911+ /// Builds a list array for Decimal128 elements.
912+ fn build_decimal128_list_array (
913+ rows : & [ TableRow ] ,
914+ field_idx : usize ,
915+ field : FieldRef ,
916+ _precision : u8 ,
917+ _scale : i8 ,
918+ ) -> ArrayRef {
919+ // For now, fall back to string representation for decimal arrays
920+ // This is a simplified implementation that avoids complex Arrow data type manipulation
921+ build_list_array_for_strings ( rows, field_idx, field)
922+ }
923+
766924/// Builds a list array for string elements.
767925///
768926/// This function creates an Arrow list array with string elements by processing
0 commit comments