Skip to content

Commit 5494a1b

Browse files
dahbka-lisCopilot
andauthored
Support convertations PG types from YQL to Apache Arrow (#28647)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 8de3d06 commit 5494a1b

File tree

6 files changed

+185
-11
lines changed

6 files changed

+185
-11
lines changed

ydb/core/kqp/common/result_set_format/kqp_formats_arrow.cpp

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <yql/essentials/minikql/mkql_type_helper.h>
66
#include <yql/essentials/minikql/mkql_type_ops.h>
7+
#include <yql/essentials/parser/pg_wrapper/interface/codec.h>
78
#include <yql/essentials/public/udf/arrow/block_type_helper.h>
89
#include <yql/essentials/types/binary_json/read.h>
910
#include <yql/essentials/types/binary_json/write.h>
@@ -567,6 +568,21 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder* builder, cons
567568
}
568569
}
569570

571+
void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder* builder, const NMiniKQL::TPgType* pgType) {
572+
YQL_ENSURE(builder->type()->id() == arrow::Type::STRING, "Unexpected builder type");
573+
auto stringBuilder = reinterpret_cast<arrow::StringBuilder*>(builder);
574+
575+
if (!value) {
576+
auto status = stringBuilder->AppendNull();
577+
YQL_ENSURE(status.ok(), "Failed to append null pg value: " << status.ToString());
578+
return;
579+
}
580+
581+
auto textValue = NYql::NCommon::PgValueToNativeText(value, pgType->GetTypeId());
582+
auto status = stringBuilder->Append(textValue.data(), textValue.size());
583+
YQL_ENSURE(status.ok(), "Failed to append pg value: " << status.ToString());
584+
}
585+
570586
} // namespace
571587

572588
bool NeedWrapByExternalOptional(const NMiniKQL::TType* type) {
@@ -576,7 +592,8 @@ bool NeedWrapByExternalOptional(const NMiniKQL::TType* type) {
576592
case NMiniKQL::TType::EKind::EmptyList:
577593
case NMiniKQL::TType::EKind::EmptyDict:
578594
case NMiniKQL::TType::EKind::Optional:
579-
case NMiniKQL::TType::EKind::Variant: {
595+
case NMiniKQL::TType::EKind::Variant:
596+
case NMiniKQL::TType::EKind::Pg: {
580597
return true;
581598
}
582599

@@ -597,7 +614,6 @@ bool NeedWrapByExternalOptional(const NMiniKQL::TType* type) {
597614
case NMiniKQL::TType::EKind::Flow:
598615
case NMiniKQL::TType::EKind::ReservedKind:
599616
case NMiniKQL::TType::EKind::Block:
600-
case NMiniKQL::TType::EKind::Pg:
601617
case NMiniKQL::TType::EKind::Multi:
602618
case NMiniKQL::TType::EKind::Linear: {
603619
YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
@@ -651,6 +667,10 @@ std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TType* type) {
651667
return GetArrowType(static_cast<const NMiniKQL::TTaggedType*>(type)->GetBaseType());
652668
}
653669

670+
case NMiniKQL::TType::EKind::Pg: {
671+
return arrow::utf8();
672+
}
673+
654674
case NMiniKQL::TType::EKind::Type:
655675
case NMiniKQL::TType::EKind::Stream:
656676
case NMiniKQL::TType::EKind::Callable:
@@ -659,7 +679,6 @@ std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TType* type) {
659679
case NMiniKQL::TType::EKind::Flow:
660680
case NMiniKQL::TType::EKind::ReservedKind:
661681
case NMiniKQL::TType::EKind::Block:
662-
case NMiniKQL::TType::EKind::Pg:
663682
case NMiniKQL::TType::EKind::Multi:
664683
case NMiniKQL::TType::EKind::Linear: {
665684
YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
@@ -674,7 +693,8 @@ bool IsArrowCompatible(const NKikimr::NMiniKQL::TType* type) {
674693
case NMiniKQL::TType::EKind::Void:
675694
case NMiniKQL::TType::EKind::EmptyList:
676695
case NMiniKQL::TType::EKind::EmptyDict:
677-
case NMiniKQL::TType::EKind::Data: {
696+
case NMiniKQL::TType::EKind::Data:
697+
case NMiniKQL::TType::EKind::Pg: {
678698
return true;
679699
}
680700

@@ -739,7 +759,6 @@ bool IsArrowCompatible(const NKikimr::NMiniKQL::TType* type) {
739759
case NMiniKQL::TType::EKind::Flow:
740760
case NMiniKQL::TType::EKind::ReservedKind:
741761
case NMiniKQL::TType::EKind::Block:
742-
case NMiniKQL::TType::EKind::Pg:
743762
case NMiniKQL::TType::EKind::Multi:
744763
case NMiniKQL::TType::EKind::Linear: {
745764
return false;
@@ -807,6 +826,11 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder* builder, cons
807826
break;
808827
}
809828

829+
case NMiniKQL::TType::EKind::Pg: {
830+
AppendElement(value, builder, static_cast<const NMiniKQL::TPgType*>(type));
831+
break;
832+
}
833+
810834
case NMiniKQL::TType::EKind::Type:
811835
case NMiniKQL::TType::EKind::Stream:
812836
case NMiniKQL::TType::EKind::Callable:
@@ -815,7 +839,6 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder* builder, cons
815839
case NMiniKQL::TType::EKind::Flow:
816840
case NMiniKQL::TType::EKind::ReservedKind:
817841
case NMiniKQL::TType::EKind::Block:
818-
case NMiniKQL::TType::EKind::Pg:
819842
case NMiniKQL::TType::EKind::Multi:
820843
case NMiniKQL::TType::EKind::Linear: {
821844
YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());

ydb/core/kqp/common/result_set_format/kqp_formats_arrow.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ bool SwitchMiniKQLDataTypeToArrowType(NUdf::EDataSlot typeId, TFunc&& callback)
117117
* @param type The MiniKQL type to check
118118
* @return true if the type needs external Optional wrapping, false otherwise
119119
*
120-
* @note Types that need wrapping: Void, Null, Variant, Optional, EmptyList, EmptyDict
120+
* @note Types that need wrapping: Void, Null, Variant, Optional, EmptyList, EmptyDict, Pg
121121
*/
122122
bool NeedWrapByExternalOptional(const NMiniKQL::TType* type);
123123

@@ -136,6 +136,7 @@ bool NeedWrapByExternalOptional(const NMiniKQL::TType* type);
136136
* - Variant: converted to arrow::DenseUnionType
137137
* - Optional: nested optionals are flattened and represented via struct wrapping
138138
* - Tagged: converted to inner type
139+
* - Pg: converted to arrow::StringType
139140
*
140141
* @param type The MiniKQL type to convert
141142
* @return Shared pointer to corresponding Arrow DataType, or arrow::NullType if unsupported
@@ -152,7 +153,7 @@ std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TType* type);
152153
* @param type The MiniKQL type to validate
153154
* @return true if the type can be converted to Arrow format, false otherwise
154155
*
155-
* @note Incompatible types: Type, Stream, Callable, Any, Resource, Flow, Block, Pg, Multi, Linear
156+
* @note Incompatible types: Type, Stream, Callable, Any, Resource, Flow, Block, Multi, Linear
156157
*/
157158
bool IsArrowCompatible(const NMiniKQL::TType* type);
158159

ydb/core/kqp/common/result_set_format/ut/kqp_formats_arrow_ut.cpp

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010

1111
#include <yql/essentials/minikql/computation/mkql_value_builder.h>
1212
#include <yql/essentials/minikql/mkql_string_util.h>
13+
#include <yql/essentials/parser/pg_wrapper/interface/codec.h>
14+
#include <yql/essentials/parser/pg_wrapper/interface/compare.h>
15+
#include <yql/essentials/parser/pg_wrapper/postgresql/src/backend/catalog/pg_type_d.h>
1316
#include <yql/essentials/public/udf/arrow/defs.h>
1417
#include <yql/essentials/types/binary_json/read.h>
1518
#include <yql/essentials/types/binary_json/write.h>
@@ -22,6 +25,7 @@ using namespace NYql;
2225

2326
inline static constexpr size_t TEST_ARRAY_DATATYPE_SIZE = 1 << 16;
2427
inline static constexpr size_t TEST_ARRAY_NESTED_SIZE = 1 << 8;
28+
inline static constexpr size_t TEST_ARRAY_PG_SIZE = TEST_ARRAY_DATATYPE_SIZE;
2529
inline static constexpr ui8 DECIMAL_PRECISION = 35;
2630
inline static constexpr ui8 DECIMAL_SCALE = 10;
2731
inline static constexpr ui32 VARIANT_NESTED_SIZE = 260;
@@ -800,6 +804,18 @@ struct TTestContext {
800804
return values;
801805
}
802806

807+
TType* GetOptionalPgValueType(ui32 pgTypeId) {
808+
return TOptionalType::Create(GetPgType(pgTypeId), TypeEnv);
809+
}
810+
811+
TUnboxedValueVector CreateOptionalsPgValue(ui32 quantity, ui32 pgTypeId) {
812+
auto values = CreatePgValues(quantity, pgTypeId);
813+
for (size_t i = 0; i < values.size(); ++i) {
814+
values[i] = (i % 2 == 0) ? values[i].MakeOptional() : NUdf::TUnboxedValuePod();
815+
}
816+
return values;
817+
}
818+
803819
TType* GetOptionalOptionalValueType() {
804820
return TOptionalType::Create(GetOptionalDataValueType(), TypeEnv);
805821
}
@@ -1151,6 +1167,37 @@ struct TTestContext {
11511167
}
11521168
return values;
11531169
}
1170+
1171+
TType* GetPgType(ui32 typeId) {
1172+
return TPgType::Create(typeId, TypeEnv);
1173+
}
1174+
1175+
TUnboxedValueVector CreatePgValues(ui32 quantity, ui32 typeId) {
1176+
TUnboxedValueVector values;
1177+
for (ui64 value = 0; value < quantity; ++value) {
1178+
if (value % 4 == 3) {
1179+
values.emplace_back(NUdf::TUnboxedValuePod());
1180+
continue;
1181+
}
1182+
1183+
std::string stringValue;
1184+
switch (typeId) {
1185+
case BOOLOID:
1186+
stringValue = std::to_string(value % 2 == 0);
1187+
break;
1188+
case INT8OID:
1189+
stringValue = std::to_string(value);
1190+
break;
1191+
case TEXTOID:
1192+
stringValue = "text" + std::to_string(value);
1193+
break;
1194+
default:
1195+
UNIT_ASSERT_C(false, "You need to add a new case for type " << typeId);
1196+
}
1197+
values.emplace_back(NYql::NCommon::PgValueFromNativeText(stringValue, typeId));
1198+
}
1199+
return values;
1200+
}
11541201
};
11551202

11561203
void AssertUnboxedValuesAreEqual(NUdf::TUnboxedValue& left, NUdf::TUnboxedValue& right, TType* type) {
@@ -1298,6 +1345,13 @@ void AssertUnboxedValuesAreEqual(NUdf::TUnboxedValue& left, NUdf::TUnboxedValue&
12981345
break;
12991346
}
13001347

1348+
case TType::EKind::Pg: {
1349+
auto pgType = static_cast<const TPgType*>(type);
1350+
auto equate = MakePgEquate(pgType);
1351+
UNIT_ASSERT(equate->Equals(left, right));
1352+
break;
1353+
}
1354+
13011355
default: {
13021356
UNIT_ASSERT_C(false, TStringBuilder() << "Unsupported type: " << type->GetKindAsStr());
13031357
}
@@ -1420,6 +1474,48 @@ void TestSingularTypeConversion() {
14201474
}
14211475
}
14221476

1477+
template <ui32 PgTypeId>
1478+
void TestPgTypeConversion() {
1479+
TTestContext context;
1480+
1481+
auto pgType = context.GetPgType(PgTypeId);
1482+
auto values = context.CreatePgValues(TEST_ARRAY_PG_SIZE, PgTypeId);
1483+
1484+
UNIT_ASSERT(IsArrowCompatible(pgType));
1485+
1486+
auto array = MakeArrowArray(values, pgType);
1487+
UNIT_ASSERT_C(array->ValidateFull().ok(), array->ValidateFull().ToString());
1488+
UNIT_ASSERT_VALUES_EQUAL(array->length(), values.size());
1489+
1490+
UNIT_ASSERT(array->type_id() == arrow::Type::STRING);
1491+
auto stringArray = static_pointer_cast<arrow::StringArray>(array);
1492+
UNIT_ASSERT_VALUES_EQUAL(stringArray->length(), values.size());
1493+
1494+
if (stringArray->length() > 1) {
1495+
switch (PgTypeId) {
1496+
case BOOLOID:
1497+
UNIT_ASSERT_VALUES_EQUAL(stringArray->GetString(0), "t");
1498+
UNIT_ASSERT_VALUES_EQUAL(stringArray->GetString(1), "f");
1499+
break;
1500+
case INT8OID:
1501+
UNIT_ASSERT_VALUES_EQUAL(stringArray->GetString(0), "0");
1502+
UNIT_ASSERT_VALUES_EQUAL(stringArray->GetString(1), "1");
1503+
break;
1504+
case TEXTOID:
1505+
UNIT_ASSERT_VALUES_EQUAL(stringArray->GetString(0), "text0");
1506+
UNIT_ASSERT_VALUES_EQUAL(stringArray->GetString(1), "text1");
1507+
break;
1508+
default:
1509+
UNIT_ASSERT_C(false, "You need to add a new case for type " << PgTypeId);
1510+
}
1511+
}
1512+
1513+
for (size_t i = 0; i < values.size(); ++i) {
1514+
auto arrowValue = ExtractUnboxedValue(array, i, pgType, context.HolderFactory);
1515+
AssertUnboxedValuesAreEqual(arrowValue, values[i], pgType);
1516+
}
1517+
}
1518+
14231519
} // namespace
14241520

14251521
Y_UNIT_TEST_SUITE(KqpFormats_Arrow_Conversion) {
@@ -2332,6 +2428,29 @@ Y_UNIT_TEST_SUITE(KqpFormats_Arrow_Conversion) {
23322428
}
23332429
}
23342430

2431+
Y_UNIT_TEST(NestedType_Optional_PgValue) {
2432+
TTestContext context;
2433+
2434+
auto optionalType = context.GetOptionalPgValueType(INT8OID);
2435+
auto values = context.CreateOptionalsPgValue(TEST_ARRAY_NESTED_SIZE, INT8OID);
2436+
2437+
UNIT_ASSERT(IsArrowCompatible(optionalType));
2438+
2439+
auto array = MakeArrowArray(values, optionalType);
2440+
UNIT_ASSERT_C(array->ValidateFull().ok(), array->ValidateFull().ToString());
2441+
UNIT_ASSERT_VALUES_EQUAL(array->length(), values.size());
2442+
UNIT_ASSERT(array->type_id() == arrow::Type::STRUCT);
2443+
2444+
auto structArray = static_pointer_cast<arrow::StructArray>(array);
2445+
UNIT_ASSERT_VALUES_EQUAL(structArray->num_fields(), 1);
2446+
UNIT_ASSERT(structArray->field(0)->type_id() == arrow::Type::STRING);
2447+
2448+
for (size_t i = 0; i < values.size(); ++i) {
2449+
auto arrowValue = ExtractUnboxedValue(array, i, optionalType, context.HolderFactory);
2450+
AssertUnboxedValuesAreEqual(arrowValue, values[i], optionalType);
2451+
}
2452+
}
2453+
23352454
Y_UNIT_TEST(NestedType_Optional_OptionalValue) {
23362455
TTestContext context;
23372456

@@ -2788,6 +2907,20 @@ Y_UNIT_TEST_SUITE(KqpFormats_Arrow_Conversion) {
27882907
AssertUnboxedValuesAreEqual(arrowValue, values[i], taggedType);
27892908
}
27902909
}
2910+
2911+
// Pg types
2912+
// They are converted using NYql::NCommon::PgValueToNativeText, so testing all types is not required
2913+
Y_UNIT_TEST(PgType_Bool) {
2914+
TestPgTypeConversion<BOOLOID>();
2915+
}
2916+
2917+
Y_UNIT_TEST(PgType_Int8) {
2918+
TestPgTypeConversion<INT8OID>();
2919+
}
2920+
2921+
Y_UNIT_TEST(PgType_Text) {
2922+
TestPgTypeConversion<TEXTOID>();
2923+
}
27912924
}
27922925

27932926
} // namespace NKikimr::NKqp::NFormats

ydb/core/kqp/common/result_set_format/ut/kqp_formats_ut_helpers.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <yql/essentials/minikql/mkql_string_util.h>
66
#include <yql/essentials/minikql/mkql_type_helper.h>
7+
#include <yql/essentials/parser/pg_wrapper/interface/codec.h>
78
#include <yql/essentials/types/dynumber/dynumber.h>
89
#include <yql/essentials/types/binary_json/write.h>
910
#include <yql/essentials/utils/yql_panic.h>
@@ -334,6 +335,18 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array>& arr
334335
return holderFactory.CreateVariantHolder(value.Release(), variantIndex);
335336
}
336337

338+
NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array>& array, ui64 row, const NMiniKQL::TPgType* pgType) {
339+
YQL_ENSURE(array->type_id() == arrow::Type::STRING, "Unexpected array type");
340+
auto stringArray = static_pointer_cast<arrow::StringArray>(array);
341+
342+
if (stringArray->IsNull(row)) {
343+
return NUdf::TUnboxedValuePod();
344+
}
345+
346+
auto data = stringArray->GetView(row);
347+
return NYql::NCommon::PgValueFromNativeText(NUdf::TStringRef(data.data(), data.size()), pgType->GetTypeId());
348+
}
349+
337350
} // namespace
338351

339352
std::unique_ptr<arrow::ArrayBuilder> MakeArrowBuilder(const NMiniKQL::TType* type) {
@@ -404,6 +417,10 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array>& arr
404417
return ExtractUnboxedValue(array, row, static_cast<const NMiniKQL::TTaggedType*>(itemType)->GetBaseType(), holderFactory);
405418
}
406419

420+
case NMiniKQL::TType::EKind::Pg: {
421+
return ExtractUnboxedValue(array, row, static_cast<const NMiniKQL::TPgType*>(itemType));
422+
}
423+
407424
case NMiniKQL::TType::EKind::Type:
408425
case NMiniKQL::TType::EKind::Stream:
409426
case NMiniKQL::TType::EKind::Callable:
@@ -412,7 +429,6 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array>& arr
412429
case NMiniKQL::TType::EKind::Flow:
413430
case NMiniKQL::TType::EKind::ReservedKind:
414431
case NMiniKQL::TType::EKind::Block:
415-
case NMiniKQL::TType::EKind::Pg:
416432
case NMiniKQL::TType::EKind::Multi:
417433
case NMiniKQL::TType::EKind::Linear: {
418434
YQL_ENSURE(false, "Unsupported type: " << itemType->GetKindAsStr());

ydb/core/kqp/common/result_set_format/ut/ya.make

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ YQL_LAST_ABI_VERSION()
1414
PEERDIR(
1515
library/cpp/testing/unittest
1616
yql/essentials/public/udf/service/exception_policy
17-
yql/essentials/sql/pg_dummy
17+
yql/essentials/parser/pg_wrapper
1818
)
1919

2020
END()

ydb/core/kqp/ut/arrow/ya.make

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ SRCS(
1414
PEERDIR(
1515
ydb/core/kqp
1616
ydb/core/kqp/ut/common
17-
yql/essentials/sql/pg_dummy
1817
ydb/public/sdk/cpp/src/client/arrow
18+
yql/essentials/sql/pg
19+
yql/essentials/parser/pg_wrapper
1920
)
2021

2122
YQL_LAST_ABI_VERSION()

0 commit comments

Comments
 (0)