Skip to content

Commit dc8f3c3

Browse files
committed
Copy arrow helpers for building ResultSet to KQP (ydb-platform#25531)
1 parent 2cb2ac2 commit dc8f3c3

35 files changed

+3587
-1544
lines changed

ydb/core/formats/arrow/arrow_batch_builder.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include <ydb/core/formats/arrow/arrow_helpers_minikql.h>
44
#include <ydb/core/formats/arrow/switch/switch_type.h>
55
#include <ydb/core/kqp/common/kqp_types.h>
6-
#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h>
6+
#include <ydb/core/kqp/common/result_set_format/kqp_result_set_arrow.h>
77

88
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
99
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>
@@ -94,7 +94,7 @@ arrow::Status AppendCell(arrow::RecordBatchBuilder& builder, const TCell& cell,
9494

9595
arrow::Status AppendValue(arrow::RecordBatchBuilder& builder, const NUdf::TUnboxedValue& value, ui32 colNum, const NKikimr::NMiniKQL::TType* type) {
9696
try {
97-
NYql::NArrow::AppendElement(value, builder.GetField(colNum), type);
97+
NKqp::NFormats::AppendElement(value, builder.GetField(colNum), type);
9898
} catch (const std::exception& e) {
9999
return arrow::Status::FromArgs(arrow::StatusCode::Invalid, e.what());
100100
}
@@ -248,7 +248,7 @@ arrow::Status TArrowBatchBuilder::Start(const std::vector<std::pair<TString, NSc
248248
return arrow::Status::OK();
249249
}
250250

251-
arrow::Status TArrowBatchBuilder::Start(const std::vector<std::pair<TString, NKikimr::NMiniKQL::TType*>> yqlColumns) {
251+
arrow::Status TArrowBatchBuilder::Start(const std::vector<std::pair<TString, NKikimr::NMiniKQL::TType*>>& yqlColumns) {
252252
YqlSchema = yqlColumns;
253253
auto schema = MakeArrowSchema(yqlColumns, NotNullColumns);
254254
if (!schema.ok()) {

ydb/core/formats/arrow/arrow_batch_builder.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
193193
}
194194

195195
arrow::Status Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns);
196-
arrow::Status Start(const std::vector<std::pair<TString, NKikimr::NMiniKQL::TType*>> columns);
196+
arrow::Status Start(const std::vector<std::pair<TString, NKikimr::NMiniKQL::TType*>>& columns);
197197

198198
std::shared_ptr<arrow::RecordBatch> FlushBatch(bool reinitialize, bool flushEmpty = false);
199199
std::shared_ptr<arrow::RecordBatch> GetBatch() const { return Batch; }
@@ -206,7 +206,7 @@ class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
206206
return YdbSchema;
207207
}
208208

209-
const std::vector<std::pair<TString, NKikimr::NMiniKQL::TType*>> GetYqlSchema() const {
209+
const std::vector<std::pair<TString, NKikimr::NMiniKQL::TType*>>& GetYqlSchema() const {
210210
return YqlSchema;
211211
}
212212

ydb/core/formats/arrow/arrow_helpers_minikql.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#include "arrow_helpers_minikql.h"
22

3-
#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h>
3+
#include <ydb/core/kqp/common/result_set_format/kqp_result_set_arrow.h>
44
#include <util/string/join.h>
55

66
namespace NKikimr::NArrow {
@@ -15,7 +15,7 @@ arrow::Result<arrow::FieldVector> MakeArrowFields(
1515
std::shared_ptr<arrow::DataType> arrowType;
1616

1717
try {
18-
arrowType = NYql::NArrow::GetArrowType(mkqlType);
18+
arrowType = NKqp::NFormats::GetArrowType(mkqlType);
1919
} catch (const yexception& e) {
2020
errors.emplace_back(colName + " error: " + e.what());
2121
}

ydb/core/formats/arrow/ya.make

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ PEERDIR(
2424
ydb/library/services
2525
yql/essentials/core/arrow_kernels/request
2626
yql/essentials/minikql
27-
ydb/library/yql/dq/runtime
27+
ydb/core/kqp/common/result_set_format
2828
)
2929

3030
YQL_LAST_ABI_VERSION()

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
275275
Ydb::Query::SchemaInclusionMode schemaInclusionMode = req->schema_inclusion_mode();
276276
Ydb::ResultSet::Format resultSetFormat = req->result_set_format();
277277

278-
std::optional<NKqp::TArrowFormatSettings> arrowFormatSettings;
278+
std::optional<NKqp::NFormats::TArrowFormatSettings> arrowFormatSettings;
279279
if (req->has_arrow_format_settings()) {
280-
arrowFormatSettings = NKqp::TArrowFormatSettings::ImportFromProto(req->arrow_format_settings());
280+
arrowFormatSettings = NKqp::NFormats::TArrowFormatSettings::ImportFromProto(req->arrow_format_settings());
281281
}
282282

283283
AuditContextAppend(Request_.get(), *req);

ydb/core/kqp/common/events/query.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
#include <ydb/core/resource_pools/resource_pool_settings.h>
44
#include <ydb/core/protos/kqp.pb.h>
5-
#include <ydb/core/kqp/common/kqp_result_set_format_settings.h>
5+
#include <ydb/core/kqp/common/result_set_format/kqp_result_set_format_settings.h>
66
#include <ydb/core/kqp/common/kqp_user_request_context.h>
77
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
88
#include <ydb/core/grpc_services/base/iface.h>
@@ -83,7 +83,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
8383
const ::Ydb::Operations::OperationParams* operationParams,
8484
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
8585
const TString& poolId = "",
86-
std::optional<NKqp::TArrowFormatSettings> arrowFormatSettings = std::nullopt);
86+
std::optional<NFormats::TArrowFormatSettings> arrowFormatSettings = std::nullopt);
8787

8888
TEvQueryRequest() {
8989
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
@@ -391,7 +391,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
391391
return ArrowFormatSettings.has_value();
392392
}
393393

394-
std::optional<NKqp::TArrowFormatSettings> GetArrowFormatSettings() const {
394+
std::optional<NFormats::TArrowFormatSettings> GetArrowFormatSettings() const {
395395
return ArrowFormatSettings;
396396
}
397397

@@ -457,7 +457,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
457457
TIntrusivePtr<TUserRequestContext> UserRequestContext;
458458
TDuration ProgressStatsPeriod;
459459
std::optional<NResourcePool::TPoolSettings> PoolConfig;
460-
std::optional<NKqp::TArrowFormatSettings> ArrowFormatSettings;
460+
std::optional<NFormats::TArrowFormatSettings> ArrowFormatSettings;
461461
bool SaveQueryPhysicalGraph = false; // Used only in execute script queries
462462
std::shared_ptr<const NKikimrKqp::TQueryPhysicalGraph> QueryPhysicalGraph;
463463
i64 Generation = 0;

ydb/core/kqp/common/kqp_event_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
2020
const ::Ydb::Operations::OperationParams* operationParams,
2121
const TQueryRequestSettings& querySettings,
2222
const TString& poolId,
23-
std::optional<NKqp::TArrowFormatSettings> arrowFormatSettings)
23+
std::optional<NKqp::NFormats::TArrowFormatSettings> arrowFormatSettings)
2424
: RequestCtx(ctx)
2525
, RequestActorId(requestActorId)
2626
, Database(CanonizePath(ctx->GetDatabaseName().GetOrElse("")))

0 commit comments

Comments
 (0)