@@ -2871,7 +2871,7 @@ std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
28712871std::pair<std::vector<std::vector<int64_t >>, std::vector<std::vector<uint64_t >>>
28722872Executor::getRowCountAndOffsetForAllFrags (
28732873 const RelAlgExecutionUnit& ra_exe_unit,
2874- const CartesianProduct< std::vector<std::vector<size_t > >>& frag_ids_crossjoin,
2874+ const std::vector<std::vector<size_t >>& frag_ids_crossjoin,
28752875 const std::vector<InputDescriptor>& input_descs,
28762876 const std::map<TableRef, const TableFragments*>& all_tables_fragments) {
28772877 std::vector<std::vector<int64_t >> all_num_rows;
@@ -2947,6 +2947,8 @@ bool Executor::needLinearizeAllFragments(
29472947 const auto & fragments = selected_fragments[nest_level].fragment_ids ;
29482948 auto need_linearize =
29492949 inner_col_desc.type ()->isArray () || inner_col_desc.type ()->isString ();
2950+ LOG (INFO) << inner_col_desc.type ()->isArray () << " || "
2951+ << inner_col_desc.type ()->isString () << " ) && " << fragments.size () << " > 1" ;
29502952 return need_linearize && fragments.size () > 1 ;
29512953}
29522954
@@ -2984,6 +2986,9 @@ FetchResult Executor::fetchChunks(
29842986 std::vector<std::vector<const int8_t *>> all_frag_col_buffers;
29852987 std::vector<std::vector<int64_t >> all_num_rows;
29862988 std::vector<std::vector<uint64_t >> all_frag_offsets;
2989+
2990+ // in MT case we want to preserve "the order of insertion" into all_frag_col_buffers
2991+ std::vector<std::vector<size_t >> selected_frag_ids_vec;
29872992 if (memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL){
29882993 std::mutex all_frag;
29892994 std::atomic<bool > empty_frags{false };
@@ -2993,7 +2998,6 @@ FetchResult Executor::fetchChunks(
29932998 frag_ids_crossjoin.begin (),
29942999 frag_ids_crossjoin.end (),
29953000 [&](const std::vector<size_t >& selected_frag_ids) {
2996- // for (const auto& selected_frag_ids : frag_ids_crossjoin) {
29973001 std::vector<const int8_t *> frag_col_buffers (
29983002 plan_state_->global_to_local_col_ids_ .size ());
29993003 for (const auto & col_id : col_global_ids) {
@@ -3041,16 +3045,15 @@ FetchResult Executor::fetchChunks(
30413045 chunk_iterators,
30423046 for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
30433047 for_lazy_fetch ? 0 : device_id,
3044- device_allocator,
3045- thread_idx);
3048+ device_allocator);
30463049 } else {
30473050 frag_col_buffers[it->second ] =
30483051 column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
30493052 all_tables_fragments,
30503053 memory_level_for_column,
30513054 device_id,
30523055 device_allocator,
3053- thread_idx);
3056+ /* thread_idx= */ 0 );
30543057 }
30553058 } else {
30563059 frag_col_buffers[it->second ] =
@@ -3065,10 +3068,11 @@ FetchResult Executor::fetchChunks(
30653068 }
30663069 }
30673070 all_frag.lock ();
3071+ selected_frag_ids_vec.push_back (selected_frag_ids);
30683072 all_frag_col_buffers.push_back (frag_col_buffers);
30693073 all_frag.unlock ();
3070- });
3071- });
3074+ });
3075+ });
30723076 if (empty_frags) {
30733077 return {};
30743078 }
@@ -3120,8 +3124,7 @@ FetchResult Executor::fetchChunks(
31203124 chunk_iterators,
31213125 for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
31223126 for_lazy_fetch ? 0 : device_id,
3123- device_allocator,
3124- thread_idx);
3127+ device_allocator);
31253128 } else {
31263129 frag_col_buffers[it->second ] =
31273130 column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
@@ -3143,11 +3146,12 @@ FetchResult Executor::fetchChunks(
31433146 device_allocator);
31443147 }
31453148 }
3149+ selected_frag_ids_vec.push_back (selected_frag_ids);
31463150 all_frag_col_buffers.push_back (frag_col_buffers);
31473151 }
31483152 }
31493153 std::tie (all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags (
3150- ra_exe_unit, frag_ids_crossjoin , ra_exe_unit.input_descs , all_tables_fragments);
3154+ ra_exe_unit, selected_frag_ids_vec , ra_exe_unit.input_descs , all_tables_fragments);
31513155 return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
31523156}
31533157
@@ -3171,6 +3175,7 @@ FetchResult Executor::fetchUnionChunks(
31713175 std::vector<std::vector<const int8_t *>> all_frag_col_buffers;
31723176 std::vector<std::vector<int64_t >> all_num_rows;
31733177 std::vector<std::vector<uint64_t >> all_frag_offsets;
3178+ std::vector<std::vector<size_t >> selected_frag_ids_vec;
31743179
31753180 CHECK (!selected_fragments.empty ());
31763181 CHECK_LE (2u , ra_exe_unit.input_descs .size ());
@@ -3269,12 +3274,16 @@ FetchResult Executor::fetchUnionChunks(
32693274 device_allocator);
32703275 }
32713276 }
3277+ selected_frag_ids_vec.push_back (selected_frag_ids);
32723278 all_frag_col_buffers.push_back (frag_col_buffers);
32733279 }
32743280 std::vector<std::vector<int64_t >> num_rows;
32753281 std::vector<std::vector<uint64_t >> frag_offsets;
3276- std::tie (num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags (
3277- ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs , all_tables_fragments);
3282+ std::tie (num_rows, frag_offsets) =
3283+ getRowCountAndOffsetForAllFrags (ra_exe_unit,
3284+ selected_frag_ids_vec,
3285+ ra_exe_unit.input_descs ,
3286+ all_tables_fragments);
32783287 all_num_rows.insert (all_num_rows.end (), num_rows.begin (), num_rows.end ());
32793288 all_frag_offsets.insert (
32803289 all_frag_offsets.end (), frag_offsets.begin (), frag_offsets.end ());
0 commit comments