Skip to content

Commit 376f34f

Browse files
authored
fix: collected profiles lost in cluster mode (#18680)
* fix: collected profiles lost in cluster mode * revert * fix: collected profiles lost in cluster mode * fix: collected profiles lost in cluster mode * fix: collected pruned partitions lost in cluster mode * test: add test for explain analyze in both standalone and cluster modes * fixup * fixup * fixup * fixup * fixup * fix test * chore: bump sqllogictest to 0.38.4
1 parent 6b116c3 commit 376f34f

File tree

13 files changed

+539
-13
lines changed

13 files changed

+539
-13
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json", "va
585585

586586
# Databend Integration Test
587587
quickcheck = "1.0"
588-
sqllogictest = "0.28.0"
588+
sqllogictest = "0.28.4"
589589

590590
[workspace.lints.rust]
591591
async_fn_in_trait = "allow"

src/query/catalog/src/table_context.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,9 @@ pub trait TableContext: Send + Sync {
466466
fn get_running_query_execution_stats(&self) -> Vec<(String, ExecutorStatsSnapshot)> {
467467
unimplemented!()
468468
}
469+
fn merge_pruned_partitions_stats(&self, _other: &HashMap<u32, PartStatistics>) {
470+
unimplemented!()
471+
}
469472
}
470473

471474
pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;

src/query/service/src/interpreters/interpreter_explain.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -539,15 +539,11 @@ impl ExplainInterpreter {
539539

540540
let executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;
541541
executor.execute()?;
542-
self.ctx
543-
.add_query_profiles(&executor.get_inner().fetch_profiling(false));
544542
}
545543
false => {
546544
let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;
547545
executor.start();
548546
while (executor.pull_data()?).is_some() {}
549-
self.ctx
550-
.add_query_profiles(&executor.get_inner().fetch_profiling(false));
551547
}
552548
}
553549
Ok(self

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::time::Duration;
2424
use arrow_flight::flight_service_client::FlightServiceClient;
2525
use arrow_flight::FlightData;
2626
use async_channel::Receiver;
27+
use databend_common_base::base::tokio::sync::oneshot;
2728
use databend_common_base::base::GlobalInstance;
2829
use databend_common_base::runtime::ExecutorStatsSnapshot;
2930
use databend_common_base::runtime::GlobalIORuntime;
@@ -35,6 +36,7 @@ use databend_common_config::GlobalConfig;
3536
use databend_common_exception::ErrorCode;
3637
use databend_common_exception::Result;
3738
use databend_common_grpc::ConnectionFactory;
39+
use databend_common_pipeline_core::always_callback;
3840
use databend_common_pipeline_core::basic_callback;
3941
use databend_common_pipeline_core::ExecutionInfo;
4042
use fastrace::prelude::*;
@@ -941,6 +943,15 @@ impl QueryCoordinator {
941943
}
942944
}
943945

946+
let (finished_profiling_tx, finished_profiling_rx) = oneshot::channel();
947+
if let Some(p) = pipelines.first_mut() {
948+
p.set_on_finished(always_callback(move |info: &ExecutionInfo| {
949+
let profiling = info.profiling.clone();
950+
let _ = finished_profiling_tx.send(profiling);
951+
Ok(())
952+
}));
953+
};
954+
944955
let settings = ExecutorSettings::try_create(info.query_ctx.clone())?;
945956
let executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;
946957

@@ -966,6 +977,7 @@ impl QueryCoordinator {
966977
request_server_exchange,
967978
executor.get_inner(),
968979
perf_guard,
980+
finished_profiling_rx,
969981
);
970982

971983
let span = if let Some(parent) = SpanContext::current_local_parent() {

src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ impl BlockMetaTransform<ExchangeDeserializeMeta> for TransformExchangeDeserializ
130130
DataPacket::DataCacheMetrics(_) => unreachable!(),
131131
DataPacket::FragmentData(v) => Ok(vec![self.recv_data(meta.packet, v)?]),
132132
DataPacket::QueryPerf(_) => unreachable!(),
133+
DataPacket::PartStatistics(_) => unreachable!(),
133134
}
134135
}
135136
}

src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ impl StatisticsReceiver {
187187
ctx.set_nodes_perf(source_target.to_string(), perf);
188188
Ok(false)
189189
}
190+
Ok(Some(DataPacket::PartStatistics(stat))) => {
191+
ctx.merge_pruned_partitions_stats(&stat);
192+
Ok(false)
193+
}
190194
}
191195
}
192196

src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::sync::Arc;
1617
use std::time::Duration;
1718

1819
use async_channel::Sender;
20+
use databend_common_base::base::tokio::sync::oneshot;
1921
use databend_common_base::base::tokio::time::sleep;
2022
use databend_common_base::runtime::MemStat;
2123
use databend_common_base::runtime::QueryPerf;
@@ -25,6 +27,7 @@ use databend_common_base::JoinHandle;
2527
use databend_common_catalog::table_context::TableContext;
2628
use databend_common_exception::ErrorCode;
2729
use databend_common_exception::Result;
30+
use databend_common_pipeline_core::PlanProfile;
2831
use databend_common_storage::MutationStatus;
2932
use futures_util::future::Either;
3033
use log::warn;
@@ -49,6 +52,7 @@ impl StatisticsSender {
4952
exchange: FlightExchange,
5053
executor: Arc<PipelineExecutor>,
5154
perf_guard: Option<QueryPerfGuard>,
55+
profile_rx: oneshot::Receiver<HashMap<u32, PlanProfile>>,
5256
) -> Self {
5357
let spawner = ctx.clone();
5458
let tx = exchange.convert_to_sender();
@@ -106,8 +110,8 @@ impl StatisticsSender {
106110
}
107111
}
108112

109-
if let Err(error) = Self::send_profile(&executor, &tx, true).await {
110-
warn!("Profiles send has error, cause: {:?}.", error);
113+
if let Err(error) = Self::send_final_profile(profile_rx, &tx).await {
114+
warn!("Final profiles send has error, cause: {:?}.", error);
111115
}
112116

113117
if let Err(error) = Self::send_copy_status(&ctx, &tx).await {
@@ -125,6 +129,10 @@ impl StatisticsSender {
125129
if let Err(error) = Self::send_perf(&perf_guard, &tx).await {
126130
warn!("Perf send has error, cause: {:?}.", error);
127131
}
132+
133+
if let Err(error) = Self::send_part_statistics(&ctx, &tx).await {
134+
warn!("PartStatistics send has error, cause: {:?}.", error);
135+
}
128136
}
129137
});
130138

@@ -205,6 +213,7 @@ impl StatisticsSender {
205213
Ok(())
206214
}
207215

216+
#[async_backtrace::framed]
208217
async fn send_profile(
209218
executor: &PipelineExecutor,
210219
tx: &FlightSender,
@@ -220,6 +229,36 @@ impl StatisticsSender {
220229
Ok(())
221230
}
222231

232+
#[async_backtrace::framed]
233+
async fn send_part_statistics(ctx: &Arc<QueryContext>, tx: &FlightSender) -> Result<()> {
234+
let part_stats = ctx.get_pruned_partitions_stats();
235+
236+
if !part_stats.is_empty() {
237+
let data_packet = DataPacket::PartStatistics(part_stats);
238+
tx.send(data_packet).await?;
239+
}
240+
241+
Ok(())
242+
}
243+
244+
#[async_backtrace::framed]
245+
async fn send_final_profile(
246+
mut rx: oneshot::Receiver<HashMap<u32, PlanProfile>>,
247+
tx: &FlightSender,
248+
) -> Result<()> {
249+
// The plans_profile comes from the executor's on_finish callback.
250+
// We use try_recv() instead of blocking recv() because the execution order
251+
// guarantees that on_finish is called before the statistics sender shuts down.
252+
if let Ok(plans_profile) = rx.try_recv() {
253+
if !plans_profile.is_empty() {
254+
let data_packet = DataPacket::QueryProfiles(plans_profile);
255+
tx.send(data_packet).await?;
256+
}
257+
}
258+
259+
Ok(())
260+
}
261+
223262
#[async_backtrace::framed]
224263
async fn send_scan_cache_metrics(
225264
ctx: &Arc<QueryContext>,
@@ -272,8 +311,4 @@ impl StatisticsSender {
272311
}
273312
progress_info
274313
}
275-
276-
// fn fetch_profiling(ctx: &Arc<QueryContext>) -> Result<Vec<PlanProfile>> {
277-
// // ctx.get_exchange_manager()
278-
// }
279314
}

src/query/service/src/servers/flight/v1/packets/packet_data.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use byteorder::BigEndian;
2323
use byteorder::ReadBytesExt;
2424
use byteorder::WriteBytesExt;
2525
use bytes::Bytes;
26+
use databend_common_catalog::plan::PartStatistics;
2627
use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetricValues;
2728
use databend_common_exception::ErrorCode;
2829
use databend_common_exception::Result;
@@ -64,6 +65,7 @@ pub enum DataPacket {
6465
CopyStatus(CopyStatus),
6566
MutationStatus(MutationStatus),
6667
DataCacheMetrics(DataCacheMetricValues),
68+
PartStatistics(HashMap<u32, PartStatistics>),
6769
QueryPerf(String),
6870
}
6971

@@ -83,6 +85,7 @@ impl DataPacket {
8385
DataPacket::QueryProfiles(_) => 0,
8486
DataPacket::DataCacheMetrics(_) => 0,
8587
DataPacket::QueryPerf(_) => 0,
88+
DataPacket::PartStatistics(_) => 0,
8689
}
8790
}
8891
}
@@ -149,6 +152,12 @@ impl TryFrom<DataPacket> for FlightData {
149152
data_header: Default::default(),
150153
flight_descriptor: None,
151154
},
155+
DataPacket::PartStatistics(stat) => FlightData {
156+
app_metadata: vec![0x10].into(),
157+
data_body: serde_json::to_vec(&stat)?.into(),
158+
data_header: Default::default(),
159+
flight_descriptor: None,
160+
},
152161
})
153162
}
154163
}
@@ -215,6 +224,11 @@ impl TryFrom<FlightData> for DataPacket {
215224
.map_err(|_| ErrorCode::BadBytes("Invalid UTF-8 in query performance data."))?;
216225
Ok(DataPacket::QueryPerf(query_perf))
217226
}
227+
0x10 => {
228+
let stat =
229+
serde_json::from_slice::<HashMap<u32, PartStatistics>>(&flight_data.data_body)?;
230+
Ok(DataPacket::PartStatistics(stat))
231+
}
218232
_ => Err(ErrorCode::BadBytes("Unknown flight data packet type.")),
219233
}
220234
}

src/query/service/src/sessions/query_ctx.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2036,6 +2036,10 @@ impl TableContext for QueryContext {
20362036
self.shared.set_pruned_partitions_stats(plan_id, stats);
20372037
}
20382038

2039+
fn merge_pruned_partitions_stats(&self, other: &HashMap<u32, PartStatistics>) {
2040+
self.shared.merge_pruned_partitions_stats(other);
2041+
}
2042+
20392043
fn get_next_broadcast_id(&self) -> u32 {
20402044
self.shared
20412045
.next_broadcast_id

0 commit comments

Comments
 (0)