|
1 | 1 | #include "vector_data_generator.h" |
2 | 2 |
|
| 3 | +#include <ydb/library/formats/arrow/csv/converter/csv_arrow.h> |
3 | 4 | #include <ydb/library/yql/udfs/common/knn/knn-serializer-shared.h> |
4 | 5 |
|
| 6 | +#include <ydb/public/api/protos/ydb_formats.pb.h> |
| 7 | + |
5 | 8 | #include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_binary.h> |
6 | 9 | #include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_nested.h> |
7 | 10 | #include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_primitive.h> |
@@ -44,14 +47,50 @@ class TTransformingDataGenerator final: public IBulkDataGenerator { |
44 | 47 | return std::make_pair(schema, recordBatch); |
45 | 48 | } |
46 | 49 |
|
47 | | - static std::shared_ptr<arrow::Table> Deserialize(TDataPortion::TCsv* data) { |
| 50 | + std::shared_ptr<arrow::Table> Deserialize(TDataPortion::TCsv* data) { |
| 51 | + Ydb::Formats::CsvSettings csvSettings; |
| 52 | + if (Y_UNLIKELY(!csvSettings.ParseFromString(data->FormatString))) { |
| 53 | + ythrow yexception() << "Unable to parse CsvSettings"; |
| 54 | + } |
| 55 | + |
| 56 | + arrow::csv::ReadOptions readOptions = arrow::csv::ReadOptions::Defaults(); |
| 57 | + readOptions.skip_rows = csvSettings.skip_rows(); |
| 58 | + if (data->Data.size() > NKikimr::NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE) { |
| 59 | + ui32 blockSize = NKikimr::NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE; |
| 60 | + blockSize *= data->Data.size() / blockSize + 1; |
| 61 | + readOptions.block_size = blockSize; |
| 62 | + } |
| 63 | + |
| 64 | + arrow::csv::ParseOptions parseOptions = arrow::csv::ParseOptions::Defaults(); |
| 65 | + const auto& quoting = csvSettings.quoting(); |
| 66 | + if (Y_UNLIKELY(quoting.quote_char().length() > 1)) { |
| 67 | + ythrow yexception() << "Cannot read CSV: Wrong quote char '" << quoting.quote_char() << "'"; |
| 68 | + } |
| 69 | + const char qchar = quoting.quote_char().empty() ? '"' : quoting.quote_char().front(); |
| 70 | + parseOptions.quoting = false; |
| 71 | + parseOptions.quote_char = qchar; |
| 72 | + parseOptions.double_quote = !quoting.double_quote_disabled(); |
| 73 | + if (csvSettings.delimiter()) { |
| 74 | + if (Y_UNLIKELY(csvSettings.delimiter().size() != 1)) { |
| 75 | + ythrow yexception() << "Cannot read CSV: Invalid delimitr in csv: " << csvSettings.delimiter(); |
| 76 | + } |
| 77 | + parseOptions.delimiter = csvSettings.delimiter().front(); |
| 78 | + } |
| 79 | + |
| 80 | + arrow::csv::ConvertOptions convertOptions = arrow::csv::ConvertOptions::Defaults(); |
| 81 | + if (csvSettings.null_value()) { |
| 82 | + convertOptions.null_values = { std::string(csvSettings.null_value().data(), csvSettings.null_value().size()) }; |
| 83 | + convertOptions.strings_can_be_null = true; |
| 84 | + convertOptions.quoted_strings_can_be_null = false; |
| 85 | + } |
| 86 | + |
48 | 87 | auto bufferReader = std::make_shared<arrow::io::BufferReader>(arrow::util::string_view(data->Data.data(), data->Data.size())); |
49 | 88 | auto csvReader = arrow::csv::TableReader::Make( |
50 | 89 | arrow::io::default_io_context(), |
51 | 90 | bufferReader, |
52 | | - arrow::csv::ReadOptions::Defaults(), |
53 | | - arrow::csv::ParseOptions::Defaults(), |
54 | | - arrow::csv::ConvertOptions::Defaults() |
| 91 | + readOptions, |
| 92 | + parseOptions, |
| 93 | + convertOptions |
55 | 94 | ).ValueOrDie(); |
56 | 95 |
|
57 | 96 | return csvReader->Read().ValueOrDie(); |
|
0 commit comments