From 716c43f6ed84c24a0ed04c03f88ff3f3e076bf2b Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 26 Jun 2025 17:12:05 +0200 Subject: [PATCH 1/8] Reorganize object store metrics --- quickwit/quickwit-storage/src/metrics.rs | 84 ++---- .../src/object_storage/azure_blob_storage.rs | 28 +- .../src/object_storage/error.rs | 7 +- .../src/object_storage/metrics_wrappers.rs | 264 ++++++++++++++++++ .../src/object_storage/mod.rs | 2 + .../object_storage/s3_compatible_storage.rs | 50 +--- 6 files changed, 318 insertions(+), 117 deletions(-) create mode 100644 quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 43ef588e192..ff62b373a4a 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -16,7 +16,7 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, + GaugeGuard, HistogramVec, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, new_gauge, new_histogram_vec, }; @@ -30,19 +30,13 @@ pub struct StorageMetrics { pub searcher_split_cache: CacheMetrics, pub get_slice_timeout_successes: [IntCounter; 3], pub get_slice_timeout_all_timeouts: IntCounter, - pub object_storage_get_total: IntCounter, - pub object_storage_get_errors_total: IntCounterVec<1>, + pub object_storage_requests_total: IntCounterVec<2>, + pub object_storage_request_duration: HistogramVec<2>, pub object_storage_get_slice_in_flight_count: IntGauge, pub object_storage_get_slice_in_flight_num_bytes: IntGauge, - pub object_storage_put_total: IntCounter, - pub object_storage_put_parts: IntCounter, pub object_storage_download_num_bytes: IntCounter, + pub object_storage_download_errors: IntCounterVec<1>, pub object_storage_upload_num_bytes: IntCounter, - - pub object_storage_delete_requests_total: IntCounter, - pub object_storage_bulk_delete_requests_total: IntCounter, - pub object_storage_delete_request_duration: Histogram, - pub object_storage_bulk_delete_request_duration: Histogram, } impl Default for StorageMetrics { @@ -63,31 +57,6 @@ impl Default for StorageMetrics { let get_slice_timeout_all_timeouts = get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]); - let object_storage_requests_total = new_counter_vec( - "object_storage_requests_total", - "Total number of object storage requests performed.", - "storage", - &[], - ["action"], - ); - let object_storage_delete_requests_total = - object_storage_requests_total.with_label_values(["delete_object"]); - let object_storage_bulk_delete_requests_total = - object_storage_requests_total.with_label_values(["delete_objects"]); - - let object_storage_request_duration = new_histogram_vec( - "object_storage_request_duration_seconds", - "Duration of object storage requests in seconds.", - "storage", - &[], - ["action"], - vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], - ); - let object_storage_delete_request_duration = - object_storage_request_duration.with_label_values(["delete_object"]); - let object_storage_bulk_delete_request_duration = - object_storage_request_duration.with_label_values(["delete_objects"]); - StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), fd_cache_metrics: CacheMetrics::for_component("fd"), @@ -97,19 +66,23 @@ impl Default for StorageMetrics { split_footer_cache: CacheMetrics::for_component("splitfooter"), get_slice_timeout_successes, get_slice_timeout_all_timeouts, - object_storage_get_total: new_counter( - "object_storage_gets_total", - "Number of objects fetched. Might be lower than get_slice_timeout_outcome if \ - queries are debounced.", + object_storage_requests_total: new_counter_vec( + "object_storage_requests_total", + "Number of requests to the object store, by action and status. Requests are \ + recorded when the response headers are returned, download failures will not \ + appear as errors.", "storage", &[], + ["action", "status"], ), - object_storage_get_errors_total: new_counter_vec::<1>( - "object_storage_get_errors_total", - "Number of GetObject errors.", + object_storage_request_duration: new_histogram_vec( + "object_storage_request_duration", + "Durations until the response headers are returned from the object store, by \ + action and status. This does not measure the download time.", "storage", &[], - ["code"], + ["action", "status"], + vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], ), object_storage_get_slice_in_flight_count: new_gauge( "object_storage_get_slice_in_flight_count", @@ -124,35 +97,26 @@ impl Default for StorageMetrics { "storage", &[], ), - object_storage_put_total: new_counter( - "object_storage_puts_total", - "Number of objects uploaded. May differ from object_storage_requests_parts due to \ - multipart upload.", - "storage", - &[], - ), - object_storage_put_parts: new_counter( - "object_storage_puts_parts", - "Number of object parts uploaded.", - "", - &[], - ), object_storage_download_num_bytes: new_counter( "object_storage_download_num_bytes", "Amount of data downloaded from an object storage.", "storage", &[], ), + object_storage_download_errors: new_counter_vec( + "object_storage_download_errors", + "Number of download requests that received successfull response headers but \ + failed during download.", + "storage", + &[], + ["status"], + ), object_storage_upload_num_bytes: new_counter( "object_storage_upload_num_bytes", "Amount of data uploaded to an object storage.", "storage", &[], ), - object_storage_delete_requests_total, - object_storage_bulk_delete_requests_total, - object_storage_delete_request_duration, - object_storage_bulk_delete_request_duration, } } } diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index b21776fa69f..726c35be37f 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -45,10 +45,11 @@ use tracing::{instrument, warn}; use crate::debouncer::DebouncedStorage; use crate::metrics::object_storage_get_slice_in_flight_guards; +use crate::object_storage::metrics_wrappers::{S3MetricsWrapperExt, copy_with_download_metrics}; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, STORAGE_METRICS, Storage, - StorageError, StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, + BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError, + StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, }; /// Azure object storage resolver. @@ -225,7 +226,6 @@ impl AzureBlobStorage { name: &'a str, payload: Box, ) -> StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_parts.inc(); crate::STORAGE_METRICS .object_storage_upload_num_bytes .inc_by(payload.len()); @@ -237,6 +237,7 @@ impl AzureBlobStorage { .put_block_blob(data) .hash(hash) .into_future() + .with_count_metric("put_block_blob") .await?; Result::<(), AzureErrorWrapper>::Ok(()) }) @@ -261,7 +262,6 @@ impl AzureBlobStorage { .map(|(num, range)| { let moved_blob_client = blob_client.clone(); let moved_payload = payload.clone(); - crate::STORAGE_METRICS.object_storage_put_parts.inc(); crate::STORAGE_METRICS .object_storage_upload_num_bytes .inc_by(range.end - range.start); @@ -276,6 +276,7 @@ impl AzureBlobStorage { .put_block(block_id.clone(), data) .hash(hash) .into_future() + .with_count_metric("put_block") .await?; Result::<_, AzureErrorWrapper>::Ok(block_id) }) @@ -299,6 +300,7 @@ impl AzureBlobStorage { blob_client .put_block_list(block_list) .into_future() + .with_count_metric("put_block_list") .await .map_err(AzureErrorWrapper::from)?; @@ -327,7 +329,6 @@ impl Storage for AzureBlobStorage { path: &Path, payload: Box, ) -> crate::StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_total.inc(); let name = self.blob_name(path); let total_len = payload.len(); let part_num_bytes = self.multipart_policy.part_num_bytes(total_len); @@ -345,7 +346,7 @@ impl Storage for AzureBlobStorage { let name = self.blob_name(path); let mut output_stream = self.container_client.blob_client(name).get().into_stream(); - while let Some(chunk_result) = output_stream.next().await { + while let Some(chunk_result) = output_stream.next().with_count_metric("get_blob").await { let chunk_response = chunk_result.map_err(AzureErrorWrapper::from)?; let chunk_response_body_stream = chunk_response .data @@ -353,10 +354,7 @@ impl Storage for AzureBlobStorage { .into_async_read() .compat(); let mut body_stream_reader = BufReader::new(chunk_response_body_stream); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; } output.flush().await?; Ok(()) @@ -369,6 +367,7 @@ impl Storage for AzureBlobStorage { .blob_client(blob_name) .delete() .into_future() + .with_count_metric("delete_blob") .await .map_err(|err| AzureErrorWrapper::from(err).into()); ignore_error_kind!(StorageErrorKind::NotFound, delete_res)?; @@ -513,7 +512,7 @@ async fn extract_range_data_and_hash( .await? .into_async_read(); let mut buf: Vec = Vec::with_capacity(range.count()); - tokio::io::copy(&mut reader, &mut buf).await?; + tokio::io::copy_buf(&mut reader, &mut buf).await?; let data = Bytes::from(buf); let hash = md5::compute(&data[..]); Ok((data, hash)) @@ -544,7 +543,7 @@ async fn download_all( output: &mut Vec, ) -> Result<(), AzureErrorWrapper> { output.clear(); - while let Some(chunk_result) = chunk_stream.next().await { + while let Some(chunk_result) = chunk_stream.next().with_count_metric("get_blob").await { let chunk_response = chunk_result?; let chunk_response_body_stream = chunk_response .data @@ -552,10 +551,7 @@ async fn download_all( .into_async_read() .compat(); let mut body_stream_reader = BufReader::new(chunk_response_body_stream); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - crate::STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; } // When calling `get_all`, the Vec capacity is not properly set. output.shrink_to_fit(); diff --git a/quickwit/quickwit-storage/src/object_storage/error.rs b/quickwit/quickwit-storage/src/object_storage/error.rs index 5f60fe1f944..8a7efc13332 100644 --- a/quickwit/quickwit-storage/src/object_storage/error.rs +++ b/quickwit/quickwit-storage/src/object_storage/error.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use aws_sdk_s3::error::{DisplayErrorContext, ProvideErrorMetadata, SdkError}; +use aws_sdk_s3::error::{DisplayErrorContext, SdkError}; use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError; use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError; use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError; @@ -62,11 +62,6 @@ pub trait ToStorageErrorKind { impl ToStorageErrorKind for GetObjectError { fn to_storage_error_kind(&self) -> StorageErrorKind { - let error_code = self.code().unwrap_or("unknown"); - crate::STORAGE_METRICS - .object_storage_get_errors_total - .with_label_values([error_code]) - .inc(); match self { GetObjectError::InvalidObjectState(_) => StorageErrorKind::Service, GetObjectError::NoSuchKey(_) => StorageErrorKind::NotFound, diff --git a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs new file mode 100644 index 00000000000..2347ceee9c7 --- /dev/null +++ b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs @@ -0,0 +1,264 @@ +use std::borrow::Cow; +use std::io; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll, ready}; +use std::time::Instant; + +use pin_project::{pin_project, pinned_drop}; +use tokio::io::{AsyncBufRead, AsyncWrite}; + +use crate::STORAGE_METRICS; + +pub enum Status { + Pending, + Done, + Ready(String), +} + +/// Converts an object store client SDK Result<> to the [Status] that should be +/// recorded in the metrics. +/// +/// The `Marker` type is necessary to avoid conflicting implementations of the +/// trait. +pub trait AsStatus { + fn as_status(&self) -> Status; +} + +/// Wrapper around object store requests to record metrics, including cancellation. +#[pin_project(PinnedDrop)] +pub struct RequestMetricsWrapper +where + F: Future, + F::Output: AsStatus, +{ + #[pin] + tracked: F, + action: &'static str, + start: Option, + uploaded_bytes: Option, + status: Status, + _marker: PhantomData, +} + +#[pinned_drop] +impl PinnedDrop for RequestMetricsWrapper +where + F: Future, + F::Output: AsStatus, +{ + fn drop(self: Pin<&mut Self>) { + let status = match &self.status { + Status::Pending => "cancelled", + Status::Done => return, + Status::Ready(s) => s.as_str(), + }; + let label_values = [self.action, status]; + STORAGE_METRICS + .object_storage_requests_total + .with_label_values(label_values) + .inc(); + if let Some(start) = self.start { + STORAGE_METRICS + .object_storage_request_duration + .with_label_values(label_values) + .observe(start.elapsed().as_secs_f64()); + } + if let Some(bytes) = self.uploaded_bytes { + STORAGE_METRICS + .object_storage_upload_num_bytes + .inc_by(bytes); + } + } +} + +impl Future for RequestMetricsWrapper +where + F: Future, + F::Output: AsStatus, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.status = response.as_status(); + + Poll::Ready(response) + } +} + +pub trait S3MetricsWrapperExt +where + F: Future, + F::Output: AsStatus, +{ + fn with_count_metric(self, action: &'static str) -> RequestMetricsWrapper; + + fn with_count_and_duration_metrics( + self, + action: &'static str, + ) -> RequestMetricsWrapper; + + fn with_count_and_upload_metrics( + self, + action: &'static str, + bytes: u64, + ) -> RequestMetricsWrapper; +} + +impl S3MetricsWrapperExt for F +where + F: Future, + F::Output: AsStatus, +{ + fn with_count_metric(self, action: &'static str) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: Status::Pending, + start: None, + uploaded_bytes: None, + _marker: PhantomData, + } + } + + fn with_count_and_duration_metrics( + self, + action: &'static str, + ) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: Status::Pending, + start: Some(Instant::now()), + uploaded_bytes: None, + _marker: PhantomData, + } + } + + fn with_count_and_upload_metrics( + self, + action: &'static str, + bytes: u64, + ) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: Status::Pending, + start: None, + uploaded_bytes: Some(bytes), + _marker: PhantomData, + } + } +} + +pub struct S3Marker; + +impl AsStatus for Result +where E: aws_sdk_s3::error::ProvideErrorMetadata +{ + fn as_status(&self) -> Status { + let status_str = match self { + Ok(_) => "success".to_string(), + Err(e) => e.meta().code().unwrap_or("unknown").to_string(), + }; + Status::Ready(status_str) + } +} + +pub struct AzureMarker; + +impl AsStatus for Result { + fn as_status(&self) -> Status { + let Err(err) = self else { + return Status::Ready("success".to_string()); + }; + let err_status_str = match err.kind() { + azure_storage::ErrorKind::HttpResponse { status, .. } => status.to_string(), + azure_storage::ErrorKind::Credential => "credential".to_string(), + azure_storage::ErrorKind::Io => "io".to_string(), + azure_storage::ErrorKind::DataConversion => "data_conversion".to_string(), + _ => "unknown".to_string(), + }; + Status::Ready(err_status_str) + } +} + +// The Azure SDK get_blob request returns Option because it chunks +// the download into a stream of get requests. +impl AsStatus for Option> { + fn as_status(&self) -> Status { + match self { + None => Status::Done, + Some(res) => res.as_status(), + } + } +} + +/// Track io errors during downloads. +/// +/// Downloads are a bit different from other requests because the request might +/// fail while getting the bytes from the response body, long after getting a +/// successful response header. +#[pin_project(PinnedDrop)] +struct DownloadMetricsWrapper +where F: Future> +{ + #[pin] + tracked: F, + result: Option>, +} + +#[pinned_drop] +impl PinnedDrop for DownloadMetricsWrapper +where F: Future> +{ + fn drop(self: Pin<&mut Self>) { + let status = match &self.result { + None => Cow::Borrowed("cancelled"), + Some(Err(e)) => Cow::Owned(format!("{e:?}")), + Some(Ok(downloaded_bytes)) => { + STORAGE_METRICS + .object_storage_download_num_bytes + .inc_by(*downloaded_bytes); + return; + } + }; + STORAGE_METRICS + .object_storage_download_errors + .with_label_values([status.as_ref()]) + .inc(); + } +} + +impl Future for DownloadMetricsWrapper +where F: Future> +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.result = match &response { + Ok(s) => Some(Ok(*s)), + Err(e) => Some(Err(e.kind())), + }; + Poll::Ready(response) + } +} + +pub async fn copy_with_download_metrics<'a, R, W>( + reader: &'a mut R, + writer: &'a mut W, +) -> io::Result +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + DownloadMetricsWrapper { + tracked: tokio::io::copy_buf(reader, writer), + result: None, + } + .await +} diff --git a/quickwit/quickwit-storage/src/object_storage/mod.rs b/quickwit/quickwit-storage/src/object_storage/mod.rs index e914c107291..cee3bacd338 100644 --- a/quickwit/quickwit-storage/src/object_storage/mod.rs +++ b/quickwit/quickwit-storage/src/object_storage/mod.rs @@ -14,6 +14,8 @@ mod error; +mod metrics_wrappers; + mod s3_compatible_storage; pub use self::s3_compatible_storage::S3CompatibleObjectStorage; pub use self::s3_compatible_storage_resolver::S3CompatibleObjectStorageFactory; diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 6a7105fb8f1..361c8b33c52 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -46,10 +46,11 @@ use tracing::{info, instrument, warn}; use crate::metrics::object_storage_get_slice_in_flight_guards; use crate::object_storage::MultiPartPolicy; +use crate::object_storage::metrics_wrappers::{S3MetricsWrapperExt, copy_with_download_metrics}; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, DeleteFailure, OwnedBytes, STORAGE_METRICS, Storage, StorageError, - StorageErrorKind, StorageResolverError, StorageResult, + BulkDeleteError, DeleteFailure, OwnedBytes, Storage, StorageError, StorageErrorKind, + StorageResolverError, StorageResult, }; /// Semaphore to limit the number of concurrent requests to the object store. Some object stores @@ -286,11 +287,6 @@ impl S3CompatibleObjectStorage { .await .map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?; - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(len); - self.s3_client .put_object() .bucket(bucket) @@ -298,6 +294,7 @@ impl S3CompatibleObjectStorage { .body(body) .content_length(len as i64) .send() + .with_count_and_upload_metrics("put_object", len) .await .map_err(|sdk_error| { if sdk_error.is_retryable() { @@ -332,6 +329,7 @@ impl S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .key(key) .send() + .with_count_metric("create_multipart_upload") .await }) .await? @@ -421,11 +419,6 @@ impl S3CompatibleObjectStorage { .map_err(Retry::Permanent)?; let md5 = BASE64_STANDARD.encode(part.md5.0); - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(part.len()); - let upload_part_output = self .s3_client .upload_part() @@ -437,6 +430,7 @@ impl S3CompatibleObjectStorage { .part_number(part.part_number as i32) .upload_id(upload_id.0) .send() + .with_count_and_upload_metrics("upload_part", part.len()) .await .map_err(|s3_err| { if s3_err.is_retryable() { @@ -516,6 +510,7 @@ impl S3CompatibleObjectStorage { .multipart_upload(completed_upload.clone()) .upload_id(upload_id) .send() + .with_count_metric("complete_multipart_upload") .await }) .await?; @@ -530,6 +525,7 @@ impl S3CompatibleObjectStorage { .key(key) .upload_id(upload_id) .send() + .with_count_metric("abort_multipart_upload") .await }) .await?; @@ -544,8 +540,6 @@ impl S3CompatibleObjectStorage { let key = self.key(path); let range_str = range_opt.map(|range| format!("bytes={}-{}", range.start, range.end - 1)); - crate::STORAGE_METRICS.object_storage_get_total.inc(); - let get_object_output = self .s3_client .get_object() @@ -553,6 +547,7 @@ impl S3CompatibleObjectStorage { .key(key) .set_range(range_str) .send() + .with_count_metric("get_object") .await?; Ok(get_object_output) } @@ -640,17 +635,12 @@ impl S3CompatibleObjectStorage { for (path_chunk, delete) in &mut delete_requests_it { let delete_objects_res: StorageResult = aws_retry(&self.retry_params, || async { - crate::STORAGE_METRICS - .object_storage_bulk_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_bulk_delete_request_duration - .start_timer(); self.s3_client .delete_objects() .bucket(self.bucket.clone()) .delete(delete.clone()) .send() + .with_count_and_duration_metrics("delete_objects") .await }) .await @@ -716,10 +706,7 @@ impl S3CompatibleObjectStorage { async fn download_all(byte_stream: ByteStream, output: &mut Vec) -> io::Result<()> { output.clear(); let mut body_stream_reader = BufReader::new(byte_stream.into_async_read()); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; // When calling `get_all`, the Vec capacity is not properly set. output.shrink_to_fit(); Ok(()) @@ -735,6 +722,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .max_keys(1) .send() + .with_count_metric("list_objects_v2") .await?; Ok(()) } @@ -744,7 +732,6 @@ impl Storage for S3CompatibleObjectStorage { path: &Path, payload: Box, ) -> crate::StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_total.inc(); let _permit = REQUEST_SEMAPHORE.acquire().await; let key = self.key(path); let total_len = payload.len(); @@ -763,10 +750,7 @@ impl Storage for S3CompatibleObjectStorage { let get_object_output = aws_retry(&self.retry_params, || self.get_object(path, None)).await?; let mut body_read = BufReader::new(get_object_output.body.into_async_read()); - let num_bytes_copied = tokio::io::copy_buf(&mut body_read, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_read, output).await?; output.flush().await?; Ok(()) } @@ -776,17 +760,12 @@ impl Storage for S3CompatibleObjectStorage { let bucket = self.bucket.clone(); let key = self.key(path); let delete_res = aws_retry(&self.retry_params, || async { - crate::STORAGE_METRICS - .object_storage_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_delete_request_duration - .start_timer(); self.s3_client .delete_object() .bucket(&bucket) .key(&key) .send() + .with_count_and_duration_metrics("delete_object") .await }) .await; @@ -867,6 +846,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(&bucket) .key(&key) .send() + .with_count_metric("head_object") .await }) .await?; From 88528bfefe2293075e77ff3e749b97bd8a71fdac Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 2 Jul 2025 12:08:24 +0200 Subject: [PATCH 2/8] Fix feature flags and license headers --- .../src/object_storage/metrics_wrappers.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs index 2347ceee9c7..0371756091b 100644 --- a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs +++ b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs @@ -1,3 +1,17 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::borrow::Cow; use std::io; use std::marker::PhantomData; @@ -12,6 +26,7 @@ use crate::STORAGE_METRICS; pub enum Status { Pending, + #[allow(dead_code)] Done, Ready(String), } @@ -167,8 +182,10 @@ where E: aws_sdk_s3::error::ProvideErrorMetadata } } +#[cfg(feature = "azure")] pub struct AzureMarker; +#[cfg(feature = "azure")] impl AsStatus for Result { fn as_status(&self) -> Status { let Err(err) = self else { @@ -187,6 +204,7 @@ impl AsStatus for Result { // The Azure SDK get_blob request returns Option because it chunks // the download into a stream of get requests. +#[cfg(feature = "azure")] impl AsStatus for Option> { fn as_status(&self) -> Status { match self { From cb7eb486b8ef8aeaabd77f8e79697baea21c8d91 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 15 Jul 2025 16:01:26 +0200 Subject: [PATCH 3/8] Improve download metric - Use explicit label values - Track download at the copy level Unified label values for object store actions. --- quickwit/quickwit-storage/src/metrics.rs | 19 +- .../src/object_storage/azure_blob_storage.rs | 35 ++- .../src/object_storage/metrics_wrappers.rs | 268 ++++++++++++++---- .../object_storage/s3_compatible_storage.rs | 24 +- 4 files changed, 252 insertions(+), 94 deletions(-) diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index ff62b373a4a..5ea00df485c 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -34,9 +34,9 @@ pub struct StorageMetrics { pub object_storage_request_duration: HistogramVec<2>, pub object_storage_get_slice_in_flight_count: IntGauge, pub object_storage_get_slice_in_flight_num_bytes: IntGauge, - pub object_storage_download_num_bytes: IntCounter, + pub object_storage_download_num_bytes: IntCounterVec<1>, pub object_storage_download_errors: IntCounterVec<1>, - pub object_storage_upload_num_bytes: IntCounter, + pub object_storage_upload_num_bytes: IntCounterVec<1>, } impl Default for StorageMetrics { @@ -97,25 +97,28 @@ impl Default for StorageMetrics { "storage", &[], ), - object_storage_download_num_bytes: new_counter( + object_storage_download_num_bytes: new_counter_vec( "object_storage_download_num_bytes", - "Amount of data downloaded from an object storage.", + "Amount of data downloaded from object storage.", "storage", &[], + ["status"], ), object_storage_download_errors: new_counter_vec( "object_storage_download_errors", - "Number of download requests that received successfull response headers but \ - failed during download.", + "Number of download requests that received successful response headers but failed \ + during download.", "storage", &[], ["status"], ), - object_storage_upload_num_bytes: new_counter( + object_storage_upload_num_bytes: new_counter_vec( "object_storage_upload_num_bytes", - "Amount of data uploaded to an object storage.", + "Amount of data uploaded to object storage. The value recorded for failed and \ + aborted uploads is the full payload size.", "storage", &[], + ["status"], ), } } diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index 726c35be37f..3bb6d9711dd 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -45,7 +45,9 @@ use tracing::{instrument, warn}; use crate::debouncer::DebouncedStorage; use crate::metrics::object_storage_get_slice_in_flight_guards; -use crate::object_storage::metrics_wrappers::{S3MetricsWrapperExt, copy_with_download_metrics}; +use crate::object_storage::metrics_wrappers::{ + ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics, +}; use crate::storage::SendableAsync; use crate::{ BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError, @@ -226,9 +228,6 @@ impl AzureBlobStorage { name: &'a str, payload: Box, ) -> StorageResult<()> { - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(payload.len()); retry(&self.retry_params, || async { let data = Bytes::from(payload.read_all().await?.to_vec()); let hash = azure_storage_blobs::prelude::Hash::from(md5::compute(&data[..]).0); @@ -237,7 +236,7 @@ impl AzureBlobStorage { .put_block_blob(data) .hash(hash) .into_future() - .with_count_metric("put_block_blob") + .with_count_and_upload_metrics(ActionLabel::PutObject, payload.len()) .await?; Result::<(), AzureErrorWrapper>::Ok(()) }) @@ -262,9 +261,6 @@ impl AzureBlobStorage { .map(|(num, range)| { let moved_blob_client = blob_client.clone(); let moved_payload = payload.clone(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(range.end - range.start); async move { retry(&self.retry_params, || async { let block_id = format!("block:{num}"); @@ -276,7 +272,10 @@ impl AzureBlobStorage { .put_block(block_id.clone(), data) .hash(hash) .into_future() - .with_count_metric("put_block") + .with_count_and_upload_metrics( + ActionLabel::UploadPart, + range.end - range.start, + ) .await?; Result::<_, AzureErrorWrapper>::Ok(block_id) }) @@ -300,7 +299,7 @@ impl AzureBlobStorage { blob_client .put_block_list(block_list) .into_future() - .with_count_metric("put_block_list") + .with_count_metric(ActionLabel::CompleteMultipartUpload) .await .map_err(AzureErrorWrapper::from)?; @@ -317,6 +316,7 @@ impl Storage for AzureBlobStorage { .max_results(NonZeroU32::new(1u32).expect("1 is always non-zero.")) .into_stream() .next() + .with_count_metric(ActionLabel::ListObjects) .await { let _ = first_blob_result?; @@ -346,7 +346,11 @@ impl Storage for AzureBlobStorage { let name = self.blob_name(path); let mut output_stream = self.container_client.blob_client(name).get().into_stream(); - while let Some(chunk_result) = output_stream.next().with_count_metric("get_blob").await { + while let Some(chunk_result) = output_stream + .next() + .with_count_metric(ActionLabel::GetObject) + .await + { let chunk_response = chunk_result.map_err(AzureErrorWrapper::from)?; let chunk_response_body_stream = chunk_response .data @@ -367,7 +371,7 @@ impl Storage for AzureBlobStorage { .blob_client(blob_name) .delete() .into_future() - .with_count_metric("delete_blob") + .with_count_metric(ActionLabel::DeleteObject) .await .map_err(|err| AzureErrorWrapper::from(err).into()); ignore_error_kind!(StorageErrorKind::NotFound, delete_res)?; @@ -490,6 +494,7 @@ impl Storage for AzureBlobStorage { .blob_client(name) .get_properties() .into_future() + .with_count_metric(ActionLabel::HeadObject) .await; match properties_result { Ok(response) => Ok(response.blob.properties.content_length), @@ -543,7 +548,11 @@ async fn download_all( output: &mut Vec, ) -> Result<(), AzureErrorWrapper> { output.clear(); - while let Some(chunk_result) = chunk_stream.next().with_count_metric("get_blob").await { + while let Some(chunk_result) = chunk_stream + .next() + .with_count_metric(ActionLabel::GetObject) + .await + { let chunk_response = chunk_result?; let chunk_response_body_stream = chunk_response .data diff --git a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs index 0371756091b..f2d92991984 100644 --- a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs +++ b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Cow; use std::io; use std::marker::PhantomData; use std::pin::Pin; @@ -24,8 +23,39 @@ use tokio::io::{AsyncBufRead, AsyncWrite}; use crate::STORAGE_METRICS; -pub enum Status { +pub enum ActionLabel { + AbortMultipartUpload, + CompleteMultipartUpload, + CreateMultipartUpload, + DeleteObject, + DeleteObjects, + GetObject, + HeadObject, + ListObjects, + PutObject, + UploadPart, +} + +impl ActionLabel { + fn as_str(&self) -> &'static str { + match self { + ActionLabel::AbortMultipartUpload => "abort_multipart_upload", + ActionLabel::CompleteMultipartUpload => "complete_multipart_upload", + ActionLabel::CreateMultipartUpload => "create_multipart_upload", + ActionLabel::DeleteObject => "delete_object", + ActionLabel::DeleteObjects => "delete_objects", + ActionLabel::GetObject => "get_object", + ActionLabel::HeadObject => "head_object", + ActionLabel::ListObjects => "list_objects", + ActionLabel::PutObject => "put_object", + ActionLabel::UploadPart => "upload_part", + } + } +} + +pub enum RequestStatus { Pending, + // only useful on feature="azure" #[allow(dead_code)] Done, Ready(String), @@ -36,8 +66,8 @@ pub enum Status { /// /// The `Marker` type is necessary to avoid conflicting implementations of the /// trait. -pub trait AsStatus { - fn as_status(&self) -> Status; +pub trait AsRequestStatus { + fn as_status(&self) -> RequestStatus; } /// Wrapper around object store requests to record metrics, including cancellation. @@ -45,14 +75,14 @@ pub trait AsStatus { pub struct RequestMetricsWrapper where F: Future, - F::Output: AsStatus, + F::Output: AsRequestStatus, { #[pin] tracked: F, - action: &'static str, + action: ActionLabel, start: Option, uploaded_bytes: Option, - status: Status, + status: RequestStatus, _marker: PhantomData, } @@ -60,15 +90,15 @@ where impl PinnedDrop for RequestMetricsWrapper where F: Future, - F::Output: AsStatus, + F::Output: AsRequestStatus, { fn drop(self: Pin<&mut Self>) { let status = match &self.status { - Status::Pending => "cancelled", - Status::Done => return, - Status::Ready(s) => s.as_str(), + RequestStatus::Pending => "cancelled", + RequestStatus::Done => return, + RequestStatus::Ready(s) => s.as_str(), }; - let label_values = [self.action, status]; + let label_values = [self.action.as_str(), status]; STORAGE_METRICS .object_storage_requests_total .with_label_values(label_values) @@ -82,6 +112,7 @@ where if let Some(bytes) = self.uploaded_bytes { STORAGE_METRICS .object_storage_upload_num_bytes + .with_label_values([status]) .inc_by(bytes); } } @@ -90,7 +121,7 @@ where impl Future for RequestMetricsWrapper where F: Future, - F::Output: AsStatus, + F::Output: AsRequestStatus, { type Output = F::Output; @@ -103,35 +134,35 @@ where } } -pub trait S3MetricsWrapperExt +pub trait RequestMetricsWrapperExt where F: Future, - F::Output: AsStatus, + F::Output: AsRequestStatus, { - fn with_count_metric(self, action: &'static str) -> RequestMetricsWrapper; + fn with_count_metric(self, action: ActionLabel) -> RequestMetricsWrapper; fn with_count_and_duration_metrics( self, - action: &'static str, + action: ActionLabel, ) -> RequestMetricsWrapper; fn with_count_and_upload_metrics( self, - action: &'static str, + action: ActionLabel, bytes: u64, ) -> RequestMetricsWrapper; } -impl S3MetricsWrapperExt for F +impl RequestMetricsWrapperExt for F where F: Future, - F::Output: AsStatus, + F::Output: AsRequestStatus, { - fn with_count_metric(self, action: &'static str) -> RequestMetricsWrapper { + fn with_count_metric(self, action: ActionLabel) -> RequestMetricsWrapper { RequestMetricsWrapper { tracked: self, action, - status: Status::Pending, + status: RequestStatus::Pending, start: None, uploaded_bytes: None, _marker: PhantomData, @@ -140,12 +171,12 @@ where fn with_count_and_duration_metrics( self, - action: &'static str, + action: ActionLabel, ) -> RequestMetricsWrapper { RequestMetricsWrapper { tracked: self, action, - status: Status::Pending, + status: RequestStatus::Pending, start: Some(Instant::now()), uploaded_bytes: None, _marker: PhantomData, @@ -154,13 +185,13 @@ where fn with_count_and_upload_metrics( self, - action: &'static str, + action: ActionLabel, bytes: u64, ) -> RequestMetricsWrapper { RequestMetricsWrapper { tracked: self, action, - status: Status::Pending, + status: RequestStatus::Pending, start: None, uploaded_bytes: Some(bytes), _marker: PhantomData, @@ -170,15 +201,15 @@ where pub struct S3Marker; -impl AsStatus for Result +impl AsRequestStatus for Result where E: aws_sdk_s3::error::ProvideErrorMetadata { - fn as_status(&self) -> Status { + fn as_status(&self) -> RequestStatus { let status_str = match self { Ok(_) => "success".to_string(), Err(e) => e.meta().code().unwrap_or("unknown").to_string(), }; - Status::Ready(status_str) + RequestStatus::Ready(status_str) } } @@ -186,10 +217,10 @@ where E: aws_sdk_s3::error::ProvideErrorMetadata pub struct AzureMarker; #[cfg(feature = "azure")] -impl AsStatus for Result { - fn as_status(&self) -> Status { +impl AsRequestStatus for Result { + fn as_status(&self) -> RequestStatus { let Err(err) = self else { - return Status::Ready("success".to_string()); + return RequestStatus::Ready("success".to_string()); }; let err_status_str = match err.kind() { azure_storage::ErrorKind::HttpResponse { status, .. } => status.to_string(), @@ -198,69 +229,84 @@ impl AsStatus for Result { azure_storage::ErrorKind::DataConversion => "data_conversion".to_string(), _ => "unknown".to_string(), }; - Status::Ready(err_status_str) + RequestStatus::Ready(err_status_str) } } // The Azure SDK get_blob request returns Option because it chunks // the download into a stream of get requests. #[cfg(feature = "azure")] -impl AsStatus for Option> { - fn as_status(&self) -> Status { +impl AsRequestStatus for Option> { + fn as_status(&self) -> RequestStatus { match self { - None => Status::Done, + None => RequestStatus::Done, Some(res) => res.as_status(), } } } +pub enum DownloadStatus { + InProgress, + Done, + Failed(&'static str), +} + /// Track io errors during downloads. /// /// Downloads are a bit different from other requests because the request might /// fail while getting the bytes from the response body, long after getting a /// successful response header. #[pin_project(PinnedDrop)] -struct DownloadMetricsWrapper -where F: Future> +struct DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, { #[pin] - tracked: F, - result: Option>, + tracked: copy_buf::CopyBuf<'a, R, W>, + status: DownloadStatus, } #[pinned_drop] -impl PinnedDrop for DownloadMetricsWrapper -where F: Future> +impl<'a, R, W> PinnedDrop for DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, { fn drop(self: Pin<&mut Self>) { - let status = match &self.result { - None => Cow::Borrowed("cancelled"), - Some(Err(e)) => Cow::Owned(format!("{e:?}")), - Some(Ok(downloaded_bytes)) => { - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(*downloaded_bytes); - return; - } + let error_opt = match &self.status { + DownloadStatus::InProgress => Some("cancelled"), + DownloadStatus::Failed(e) => Some(*e), + DownloadStatus::Done => None, }; + STORAGE_METRICS - .object_storage_download_errors - .with_label_values([status.as_ref()]) - .inc(); + .object_storage_download_num_bytes + .with_label_values([error_opt.unwrap_or("success")]) + .inc_by(self.tracked.amt); + + if let Some(error) = error_opt { + STORAGE_METRICS + .object_storage_download_errors + .with_label_values([error]) + .inc(); + } } } -impl Future for DownloadMetricsWrapper -where F: Future> +impl<'a, R, W> Future for DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, { - type Output = F::Output; + type Output = io::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); let response = ready!(this.tracked.poll(cx)); - *this.result = match &response { - Ok(s) => Some(Ok(*s)), - Err(e) => Some(Err(e.kind())), + *this.status = match &response { + Ok(_) => DownloadStatus::Done, + Err(e) => DownloadStatus::Failed(io_error_as_label(e.kind())), }; Poll::Ready(response) } @@ -275,8 +321,106 @@ where W: AsyncWrite + Unpin + ?Sized, { DownloadMetricsWrapper { - tracked: tokio::io::copy_buf(reader, writer), - result: None, + tracked: copy_buf::CopyBuf { + reader, + writer, + amt: 0, + }, + status: DownloadStatus::InProgress, } .await } + +/// This is a fork of `tokio::io::copy_buf` that enables tracking the number of +/// bytes transferred. This estimate should be accurate as long as the network +/// is the bottleneck. +mod copy_buf { + + use std::future::Future; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll, ready}; + + use tokio::io::{AsyncBufRead, AsyncWrite}; + + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBuf<'a, R: ?Sized, W: ?Sized> { + pub reader: &'a mut R, + pub writer: &'a mut W, + pub amt: u64, + } + + impl Future for CopyBuf<'_, R, W> + where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, + { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let me = &mut *self; + let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(Pin::new(&mut self.writer).poll_flush(cx))?; + return Poll::Ready(Ok(self.amt)); + } + + let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); + } + self.amt += i as u64; + Pin::new(&mut *self.reader).consume(i); + } + } + } +} + +fn io_error_as_label(error: io::ErrorKind) -> &'static str { + use io::ErrorKind::*; + // most of these variants are not expected to happen + match error { + AddrInUse => "addr_in_use", + AddrNotAvailable => "addr_not_available", + AlreadyExists => "already_exists", + ArgumentListTooLong => "argument_list_too_long", + BrokenPipe => "broken_pipe", + ConnectionAborted => "connection_aborted", + ConnectionRefused => "connection_refused", + ConnectionReset => "connection_reset", + CrossesDevices => "crosses_devices", + Deadlock => "deadlock", + DirectoryNotEmpty => "directory_not_empty", + ExecutableFileBusy => "executable_file_busy", + FileTooLarge => "file_too_large", + HostUnreachable => "host_unreachable", + Interrupted => "interrupted", + InvalidData => "invalid_data", + InvalidFilename => "invalid_filename", + InvalidInput => "invalid_input", + IsADirectory => "is_a_directory", + NetworkDown => "network_down", + NetworkUnreachable => "network_unreachable", + NotADirectory => "not_a_directory", + NotConnected => "not_connected", + NotFound => "not_found", + NotSeekable => "not_seekable", + Other => "other", + OutOfMemory => "out_of_memory", + PermissionDenied => "permission_denied", + QuotaExceeded => "quota_exceeded", + ReadOnlyFilesystem => "read_only_filesystem", + ResourceBusy => "resource_busy", + StaleNetworkFileHandle => "stale_network_file_handle", + StorageFull => "storage_full", + TimedOut => "timed_out", + TooManyLinks => "too_many_links", + UnexpectedEof => "unexpected_eof", + Unsupported => "unsupported", + WouldBlock => "would_block", + WriteZero => "write_zero", + _ => "uncategorized", + } +} diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 361c8b33c52..9d6d376205e 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -46,7 +46,9 @@ use tracing::{info, instrument, warn}; use crate::metrics::object_storage_get_slice_in_flight_guards; use crate::object_storage::MultiPartPolicy; -use crate::object_storage::metrics_wrappers::{S3MetricsWrapperExt, copy_with_download_metrics}; +use crate::object_storage::metrics_wrappers::{ + ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics, +}; use crate::storage::SendableAsync; use crate::{ BulkDeleteError, DeleteFailure, OwnedBytes, Storage, StorageError, StorageErrorKind, @@ -294,7 +296,7 @@ impl S3CompatibleObjectStorage { .body(body) .content_length(len as i64) .send() - .with_count_and_upload_metrics("put_object", len) + .with_count_and_upload_metrics(ActionLabel::PutObject, len) .await .map_err(|sdk_error| { if sdk_error.is_retryable() { @@ -329,7 +331,7 @@ impl S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .key(key) .send() - .with_count_metric("create_multipart_upload") + .with_count_metric(ActionLabel::CreateMultipartUpload) .await }) .await? @@ -430,7 +432,7 @@ impl S3CompatibleObjectStorage { .part_number(part.part_number as i32) .upload_id(upload_id.0) .send() - .with_count_and_upload_metrics("upload_part", part.len()) + .with_count_and_upload_metrics(ActionLabel::UploadPart, part.len()) .await .map_err(|s3_err| { if s3_err.is_retryable() { @@ -510,7 +512,7 @@ impl S3CompatibleObjectStorage { .multipart_upload(completed_upload.clone()) .upload_id(upload_id) .send() - .with_count_metric("complete_multipart_upload") + .with_count_metric(ActionLabel::CompleteMultipartUpload) .await }) .await?; @@ -525,7 +527,7 @@ impl S3CompatibleObjectStorage { .key(key) .upload_id(upload_id) .send() - .with_count_metric("abort_multipart_upload") + .with_count_metric(ActionLabel::AbortMultipartUpload) .await }) .await?; @@ -547,7 +549,7 @@ impl S3CompatibleObjectStorage { .key(key) .set_range(range_str) .send() - .with_count_metric("get_object") + .with_count_metric(ActionLabel::GetObject) .await?; Ok(get_object_output) } @@ -640,7 +642,7 @@ impl S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .delete(delete.clone()) .send() - .with_count_and_duration_metrics("delete_objects") + .with_count_and_duration_metrics(ActionLabel::DeleteObjects) .await }) .await @@ -722,7 +724,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .max_keys(1) .send() - .with_count_metric("list_objects_v2") + .with_count_metric(ActionLabel::ListObjects) .await?; Ok(()) } @@ -765,7 +767,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(&bucket) .key(&key) .send() - .with_count_and_duration_metrics("delete_object") + .with_count_and_duration_metrics(ActionLabel::DeleteObject) .await }) .await; @@ -846,7 +848,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(&bucket) .key(&key) .send() - .with_count_metric("head_object") + .with_count_metric(ActionLabel::HeadObject) .await }) .await?; From 1553498b0d4da4ef2ad3ca00ca662909ca2c8309 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 15 Jul 2025 17:12:58 +0200 Subject: [PATCH 4/8] Improve metric descriptions --- quickwit/quickwit-storage/src/metrics.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 5ea00df485c..064448e0270 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -78,7 +78,7 @@ impl Default for StorageMetrics { object_storage_request_duration: new_histogram_vec( "object_storage_request_duration", "Durations until the response headers are returned from the object store, by \ - action and status. This does not measure the download time.", + action and status. This does not measure the download time for the body content.", "storage", &[], ["action", "status"], @@ -86,14 +86,14 @@ impl Default for StorageMetrics { ), object_storage_get_slice_in_flight_count: new_gauge( "object_storage_get_slice_in_flight_count", - "Number of GetObject for which the memory was allocated but the download is still \ - in progress.", + "Number of get_object for which the memory was allocated but the download is \ + still in progress.", "storage", &[], ), object_storage_get_slice_in_flight_num_bytes: new_gauge( "object_storage_get_slice_in_flight_num_bytes", - "Memory allocated for GetObject requests that are still in progress.", + "Memory allocated for get_object requests that are still in progress.", "storage", &[], ), @@ -106,6 +106,9 @@ impl Default for StorageMetrics { ), object_storage_download_errors: new_counter_vec( "object_storage_download_errors", + // Download errors are recorded separately because the associated + // get_object requests were already recorded as successful in + // object_storage_requests_total "Number of download requests that received successful response headers but failed \ during download.", "storage", From 922a15dfd72e11d4ca7088fc17552d8117a7d765 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 16 Jul 2025 12:37:04 +0200 Subject: [PATCH 5/8] Update metrics doc --- docs/reference/metrics.md | 197 ++++++++++++++---- .../quickwit-ingest/src/ingest_v2/metrics.rs | 17 +- .../src/ingest_v2/replication.rs | 6 +- quickwit/quickwit-ingest/src/metrics.rs | 28 +-- quickwit/quickwit-serve/src/metrics.rs | 4 +- 5 files changed, 181 insertions(+), 71 deletions(-) diff --git a/docs/reference/metrics.md b/docs/reference/metrics.md index 9df72335a05..dd7bb432e3b 100644 --- a/docs/reference/metrics.md +++ b/docs/reference/metrics.md @@ -5,78 +5,195 @@ sidebar_position: 70 Quickwit exposes key metrics in the [Prometheus](https://prometheus.io/) format on the `/metrics` endpoint. You can use any front-end that supports Prometheus to examine the behavior of Quickwit visually. +:::tip + +Workloads with a large number of indexes generate high cardinality metrics for the label `index`. Set the environment variable `QW_DISABLE_PER_INDEX_METRICS=true` to disable that label if this is problematic for your metrics database. + +::: + ## Cache Metrics -Currently Quickwit exposes metrics for three caches: `fastfields`, `shortlived`, `splitfooter`. These metrics share the same structure. +Quickwit exposes several metrics every caches. The cache type is defined in the `component_name` label. Values are `fastfields`, `shortlived`, `splitfooter`, `fd`, `partial_request`, and `searcher_split`. -| Namespace | Metric Name | Description | Type | -| --------- | ----------- | ----------- | ---- | -| `quickwit_cache_{cache_name}` | `in_cache_count` | Count of {cache_name} in cache | `gauge` | -| `quickwit_cache_{cache_name}` | `in_cache_num_bytes` | Number of {cache_name} bytes in cache | `gauge` | -| `quickwit_cache_{cache_name}` | `cache_hit_total` | Number of {cache_name} cache hits | `counter` | -| `quickwit_cache_{cache_name}` | `cache_hits_bytes` | Number of {cache_name} cache hits in bytes | `counter` | -| `quickwit_cache_{cache_name}` | `cache_miss_total` | Number of {cache_name} cache hits | `counter` | +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_cache` | `in_cache_count` | Count of entries in cache by component | [`component_name`] | `gauge` | +| `quickwit_cache` | `in_cache_num_bytes` | Number of bytes in cache by component | [`component_name`] | `gauge` | +| `quickwit_cache` | `cache_hits_total` | Number of cache hits by component | [`component_name`] | `counter` | +| `quickwit_cache` | `cache_hits_bytes` | Number of cache hits in bytes by component | [`component_name`] | `counter` | +| `quickwit_cache` | `cache_misses_total` | Number of cache misses by component | [`component_name`] | `counter` | +| `quickwit_cache` | `cache_evict_total` | Number of cache entries evicted by component | [`component_name`] | `counter` | +| `quickwit_cache` | `cache_evict_bytes` | Number of cache entries evicted in bytes by component | [`component_name`] | `counter` | -## CLI Metrics +## Cluster Metrics + +Cluster metrics help track the behavior of the Chitchat protocol. + +Note: the cluster protocol uses GRPC to catch up large deltas in its state. Those calls are monitored as [GRPC metrics](#grpc-metrics). | Namespace | Metric Name | Description | Type | | --------- | ----------- | ----------- | ---- | -| `quickwit` | `allocated_num_bytes` | Number of bytes allocated memory, as reported by jemalloc. | `gauge` | +| `quickwit_cluster` | `live_nodes` | The number of live nodes observed locally | `gauge` | +| `quickwit_cluster` | `ready_nodes` | The number of ready nodes observed locally | `gauge` | +| `quickwit_cluster` | `zombie_nodes` | The number of zombie nodes observed locally | `gauge` | +| `quickwit_cluster` | `dead_nodes` | The number of dead nodes observed locally | `gauge` | +| `quickwit_cluster` | `cluster_state_size_bytes` | The size of the cluster state in bytes | `gauge` | +| `quickwit_cluster` | `node_state_size_bytes` | The size of the node state in bytes | `gauge` | +| `quickwit_cluster` | `node_state_keys` | The number of keys in the node state | `gauge` | +| `quickwit_cluster` | `gossip_recv_messages_total` | Total number of gossip messages received | `counter` | +| `quickwit_cluster` | `gossip_recv_bytes_total` | Total amount of gossip data received in bytes | `counter` | +| `quickwit_cluster` | `gossip_sent_messages_total` | Total number of gossip messages sent | `counter` | +| `quickwit_cluster` | `gossip_sent_bytes_total` | Total amount of gossip data sent in bytes | `counter` | +| `quickwit_cluster` | `grpc_gossip_rounds_total` | Total number of gRPC gossip rounds performed with peer nodes | `counter` | + +## Control Plane Metrics -## Common Metrics +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_control_plane` | `indexes_total` | Number of indexes | | `gauge` | +| `quickwit_control_plane` | `restart_total` | Number of control plane restarts | | `counter` | +| `quickwit_control_plane` | `schedule_total` | Number of control plane schedule operations | | `counter` | +| `quickwit_control_plane` | `apply_total` | Number of control plane apply plan operations | | `counter` | +| `quickwit_control_plane` | `metastore_error_aborted` | Number of aborted metastore transactions (do not trigger a control plane restart) | | `counter` | +| `quickwit_control_plane` | `metastore_error_maybe_executed` | Number of metastore transactions with an uncertain outcome (do trigger a control plane restart) | | `counter` | +| `quickwit_control_plane` | `open_shards_total` | Number of open shards per source | [`index_id`] | `gauge` | +| `quickwit_control_plane` | `shards` | Number of (remote/local) shards in the indexing plan | [`locality`] | `gauge` | + +## GRPC Metrics + +The following subsystems expose gRPC metrics: `cluster`, `control_plane`, `indexing`, `ingest`, `metastore`. | Namespace | Metric Name | Description | Labels | Type | | --------- | ----------- | ----------- | ------ | ---- | -| `quickwit` | `write_bytes`| Number of bytes written by a given component in [`indexer`, `merger`, `deleter`, `split_downloader_{merge,delete}`] | [`index`, `component`] | `counter` | +| `quickwit_{subsystem}` | `grpc_requests_total` | Total number of gRPC requests processed | [`kind`, `rpc`, `status`] | `counter` | +| `quickwit_{subsystem}` | `grpc_requests_in_flight` | Number of gRPC requests in-flight | [`kind`, `rpc`] | `gauge` | +| `quickwit_{subsystem}` | `grpc_request_duration_seconds` | Duration of request in seconds | [`kind`, `rpc`, `status`] | `histogram` | +| `quickwit_grpc` | `circuit_break_total` | Circuit breaker counter | | `counter` | ## Indexing Metrics | Namespace | Metric Name | Description | Labels | Type | | --------- | ----------- | ----------- | ------ | ---- | -| `quickwit_indexing` | `processed_docs_total`| Number of processed docs by index, source and processed status in [`valid`, `schema_error`, `parse_error`, `transform_error`] | [`index`, `source`, `docs_processed_status`] | `counter` | -| `quickwit_indexing` | `processed_bytes`| Number of processed bytes by index, source and processed status in [`valid`, `schema_error`, `parse_error`, `transform_error`] | [`index`, `source`, `docs_processed_status`] | `counter` | -| `quickwit_indexing` | `available_concurrent_upload_permits`| Number of available concurrent upload permits by component in [`merger`, `indexer`] | [`component`] | `gauge` | -| `quickwit_indexing` | `ongoing_merge_operations`| Number of available concurrent upload permits by component in [`merger`, `indexer`]. | [`index`, `source`] | `gauge` | +| `quickwit_indexing` | `processed_docs_total` | Number of processed docs by index and processed status | [`index`, `docs_processed_status`] | `counter` | +| `quickwit_indexing` | `processed_bytes` | Number of bytes of processed documents by index and processed status | [`index`, `docs_processed_status`] | `counter` | +| `quickwit_indexing` | `backpressure_micros` | Amount of time spent in backpressure (in micros) | [`actor_name`] | `counter` | +| `quickwit_indexing` | `concurrent_upload_available_permits_num` | Number of available concurrent upload permits by component | [`component`] | `gauge` | +| `quickwit_indexing` | `split_builders` | Number of existing index writer instances | | `gauge` | +| `quickwit_indexing` | `ongoing_merge_operations` | Number of ongoing merge operations | | `gauge` | +| `quickwit_indexing` | `pending_merge_operations` | Number of pending merge operations | | `gauge` | +| `quickwit_indexing` | `pending_merge_bytes` | Number of pending merge bytes | | `gauge` | +| `quickwit_indexing` | `kafka_rebalance_total` | Number of kafka rebalances | | `counter` | ## Ingest Metrics -| Namespace | Metric Name | Description | Type | -| --------- | ----------- | ----------- | ---- | -| `quickwit_ingest` | `ingested_num_bytes` | Total size of the docs ingested in bytes | `counter` | -| `quickwit_ingest` | `ingested_num_docs` | Number of docs received to be ingested | `counter` | -| `quickwit_ingest` | `queue_count` | Number of queues currently active | `counter` | +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_ingest` | `docs_total` | Total number of docs ingested, measured in ingester's leader | [`validity`] | `counter` | +| `quickwit_ingest` | `docs_bytes_total` | Total size of the docs ingested in bytes, measured in ingester's leader | [`validity`] | `counter` | +| `quickwit_ingest` | `ingest_result_total` | Number of ingest requests by result | [`result`] | `counter` | +| `quickwit_ingest` | `reset_shards_operations_total` | Total number of reset shards operations performed | [`status`] | `counter` | +| `quickwit_ingest` | `shards` | Number of shards hosted by the ingester | [`state`] | `gauge` | +| `quickwit_ingest` | `shard_lt_throughput_mib` | Shard long term throughput as reported through chitchat | | `histogram` | +| `quickwit_ingest` | `shard_st_throughput_mib` | Shard short term throughput as reported through chitchat | | `histogram` | +| `quickwit_ingest` | `wal_acquire_lock_requests_in_flight` | Number of acquire lock requests in-flight | [`operation`, `type`] | `gauge` | +| `quickwit_ingest` | `wal_acquire_lock_request_duration_secs` | Duration of acquire lock requests in seconds | [`operation`, `type`] | `histogram` | +| `quickwit_ingest` | `wal_disk_used_bytes` | WAL disk space used in bytes | | `gauge` | +| `quickwit_ingest` | `wal_memory_used_bytes` | WAL memory used in bytes | | `gauge` | + + +Note that the legacy ingest (V1) only records the `docs_total` and `docs_bytes_total` metrics. The `validity` label is always set to `valid` because it doesn't parse the documents at ingest time. Invalid documents are discarded asynchronously in the indexing pipeline's doc processor. + +## Janitor Metrics -## Metastore Metrics +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_janitor` | `ongoing_num_delete_operations_total` | Number of ongoing delete operations per index | [`index`] | `gauge` | +| `quickwit_janitor` | `gc_deleted_splits_total` | Total number of splits deleted by the garbage collector | [`result`] | `counter` | +| `quickwit_janitor` | `gc_deleted_bytes_total` | Total number of bytes deleted by the garbage collector | | `counter` | +| `quickwit_janitor` | `gc_runs_total` | Total number of garbage collector executions | [`result`] | `counter` | +| `quickwit_janitor` | `gc_seconds_total` | Total time spent running the garbage collector | | `counter` | + +## Jaeger Metrics + +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_jaeger` | `requests_total` | Number of requests | [`operation`, `index`] | `counter` | +| `quickwit_jaeger` | `request_errors_total` | Number of failed requests | [`operation`, `index`] | `counter` | +| `quickwit_jaeger` | `request_duration_seconds` | Duration of requests | [`operation`, `index`, `error`] | `histogram` | +| `quickwit_jaeger` | `fetched_traces_total` | Number of traces retrieved from storage | [`operation`, `index`] | `counter` | +| `quickwit_jaeger` | `fetched_spans_total` | Number of spans retrieved from storage | [`operation`, `index`] | `counter` | +| `quickwit_jaeger` | `transferred_bytes_total` | Number of bytes transferred | [`operation`, `index`] | `counter` | -All metastore methods are monitored by the 3 metrics: +## Memory Metrics | Namespace | Metric Name | Description | Labels | Type | | --------- | ----------- | ----------- | ------ | ---- | -| `quickwit_metastore` | `requests_total` | Number of requests | [`operation`, `index`] | `counter` | -| `quickwit_metastore` | `request_errors_total` | Number of failed requests | [`operation`, `index`] | `counter` | -| `quickwit_metastore` | `request_duration_seconds` | Duration of requests | [`operation`, `index`, `error`] | `histogram` | +| `quickwit_memory` | `active_bytes` | Total number of bytes in active pages allocated by the application, as reported by jemalloc `stats.active` | | `gauge` | +| `quickwit_memory` | `allocated_bytes` | Total number of bytes allocated by the application, as reported by jemalloc `stats.allocated` | | `gauge` | +| `quickwit_memory` | `resident_bytes` | Total number of bytes in physically resident data pages mapped by the allocator, as reported by jemalloc `stats.resident` | | `gauge` | +| `quickwit_memory` | `in_flight_data_bytes` | Amount of data in-flight in various buffers in bytes | [`component`] | `gauge` | -Examples of operation names: `create_index`, `index_metadata`, `delete_index`, `stage_splits`, `publish_splits`, `list_splits`, `add_source`, ... +## Metastore Metrics -## Rest API Metrics +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_metastore` | `acquire_connections` | Number of connections being acquired (PostgreSQL only) | | `gauge` | +| `quickwit_metastore` | `active_connections` | Number of active (used + idle) connections (PostgreSQL only) | | `gauge` | +| `quickwit_metastore` | `idle_connections` | Number of idle connections (PostgreSQL only) | | `gauge` | -| Namespace | Metric Name | Description | Type | -| --------- | ----------- | ----------- | ---- | -| `quickwit` | `http_requests_total` | Total number of HTTP requests received | `counter` | +## OTLP Metrics + +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_otlp` | `requests_total` | Number of requests | [`service`, `index`, `transport`, `format`] | `counter` | +| `quickwit_otlp` | `request_errors_total` | Number of failed requests | [`service`, `index`, `transport`, `format`] | `counter` | +| `quickwit_otlp` | `request_duration_seconds` | Duration of requests | [`service`, `index`, `transport`, `format`, `error`] | `histogram` | +| `quickwit_otlp` | `ingested_log_records_total` | Number of log records ingested | [`service`, `index`, `transport`, `format`] | `counter` | +| `quickwit_otlp` | `ingested_spans_total` | Number of spans ingested | [`service`, `index`, `transport`, `format`] | `counter` | +| `quickwit_otlp` | `ingested_bytes_total` | Number of bytes ingested | [`service`, `index`, `transport`, `format`] | `counter` | + +## REST API Metrics + +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit` | `http_requests_total` | Total number of HTTP requests processed | [`method`, `status_code`] | `counter` | +| `quickwit` | `request_duration_secs` | Response time in seconds | [`method`, `status_code`] | `histogram` | +| `quickwit` | `ongoing_requests` | Number of ongoing requests on specific endpoint groups | [`endpoint_group`] | `gauge` | +| `quickwit` | `pending_requests` | Number of pending requests on specific endpoint groups | [`endpoint_group`] | `gauge` | ## Search Metrics -| Namespace | Metric Name | Description | Type | -| --------- | ----------- | ----------- | ---- | -| `quickwit_search` | `leaf_searches_splits_total` | Number of leaf searches (count of splits) started | `counter` | -| `quickwit_search` | `leaf_search_split_duration_secs` | Number of seconds required to run a leaf search over a single split. The timer starts after the semaphore is obtained | `histogram` | -| `quickwit_search` | `active_search_threads_count` | Number of threads in use in the CPU thread pool | `gauge` | +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_search` | `root_search_requests_total` | Total number of root search gRPC requests processed | [`status`] | `counter` | +| `quickwit_search` | `root_search_request_duration_seconds` | Duration of root search gRPC requests in seconds | [`status`] | `histogram` | +| `quickwit_search` | `root_search_targeted_splits` | Number of splits targeted per root search gRPC request | [`status`] | `histogram` | +| `quickwit_search` | `leaf_search_requests_total` | Total number of leaf search gRPC requests processed | [`status`] | `counter` | +| `quickwit_search` | `leaf_search_request_duration_seconds` | Duration of leaf search gRPC requests in seconds | [`status`] | `histogram` | +| `quickwit_search` | `leaf_search_targeted_splits` | Number of splits targeted per leaf search gRPC request | [`status`] | `histogram` | +| `quickwit_search` | `leaf_searches_splits_total` | Number of leaf searches (count of splits) started | | `counter` | +| `quickwit_search` | `leaf_search_split_duration_secs` | Number of seconds required to run a leaf search over a single split. The timer starts after the semaphore is obtained | | `histogram` | +| `quickwit_search` | `leaf_search_single_split_tasks` | Number of single split search tasks pending or ongoing | [`status`] | `gauge` | +| `quickwit_search` | `leaf_search_single_split_warmup_num_bytes` | Size of the short lived cache for a single split once the warmup is done | | `histogram` | +| `quickwit_search` | `job_assigned_total` | Number of jobs assigned to searchers, per affinity rank | [`affinity`] | `counter` | +| `quickwit_search` | `searcher_local_kv_store_size_bytes` | Size of the searcher kv store in bytes. This store is used to cache scroll contexts | | `gauge` | ## Storage Metrics +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_storage` | `get_slice_timeout_outcome` | Outcome of get_slice operations. success_after_1_timeout means the operation succeeded after a retry caused by a timeout | [`outcome`] | `counter` | +| `quickwit_storage` | `object_storage_requests_total` | Number of requests to the object store, by action and status. Requests are recorded when the response headers are returned | [`action`, `status`] | `counter` | +| `quickwit_storage` | `object_storage_request_duration` | Durations until the response headers are returned from the object store, by action and status | [`action`, `status`] | `histogram` | +| `quickwit_storage` | `object_storage_download_num_bytes` | Amount of data downloaded from object storage | [`status`] | `counter` | +| `quickwit_storage` | `object_storage_download_errors` | Number of download requests that received successful response headers but failed during download | [`status`] | `counter` | +| `quickwit_storage` | `object_storage_upload_num_bytes` | Amount of data uploaded to object storage. The value recorded for failed and aborted uploads is the full payload size | [`status`] | `counter` | + +## CLI Metrics + | Namespace | Metric Name | Description | Type | | --------- | ----------- | ----------- | ---- | -| `quickwit_storage` | `object_storage_gets_total` | Number of objects fetched | `counter` | -| `quickwit_storage` | `object_storage_puts_total` | Number of objects uploaded. May differ from object_storage_requests_parts due to multipart upload | `counter` | -| `quickwit_storage` | `object_storage_puts_parts` | Number of object parts uploaded | `counter` | -| `quickwit_storage` | `object_storage_download_num_bytes` | Amount of data downloaded from an object storage | `counter` | +| `quickwit_cli` | `thread_unpark_duration_microseconds` | Duration for which a thread of the main tokio runtime is unparked | `histogram` | diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index 1fb32c0b2fd..1b27617c624 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -16,7 +16,8 @@ use mrecordlog::ResourceUsage; use once_cell::sync::Lazy; use quickwit_common::metrics::{ Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, exponential_buckets, - linear_buckets, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, new_histogram_vec, + linear_buckets, new_counter, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, + new_histogram_vec, }; // Counter vec counting the different outcomes of ingest requests as @@ -82,6 +83,8 @@ pub(super) struct IngestV2Metrics { pub wal_disk_used_bytes: IntGauge, pub wal_memory_used_bytes: IntGauge, pub ingest_results: IngestResultMetrics, + pub replicated_num_bytes_total: IntCounter, + pub replicated_num_docs_total: IntCounter, } impl Default for IngestV2Metrics { @@ -146,6 +149,18 @@ impl Default for IngestV2Metrics { "ingest", &[], ), + replicated_num_bytes_total: new_counter( + "replicated_num_bytes_total", + "Total size in bytes of the replicated docs.", + "ingest", + &[], + ), + replicated_num_docs_total: new_counter( + "replicated_num_docs_total", + "Total number of docs replicated.", + "ingest", + &[], + ), } } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 5e286ec5b84..5c54593fe69 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -37,8 +37,8 @@ use super::metrics::report_wal_usage; use super::models::IngesterShard; use super::mrecordlog_utils::check_enough_capacity; use super::state::IngesterState; +use crate::ingest_v2::metrics::INGEST_V2_METRICS; use crate::ingest_v2::mrecordlog_utils::{AppendDocBatchError, append_non_empty_doc_batch}; -use crate::metrics::INGEST_METRICS; use crate::{estimate_size, with_lock_metrics}; pub(super) const SYN_REPLICATION_STREAM_CAPACITY: usize = 5; @@ -667,10 +667,10 @@ impl ReplicationTask { .expect("replica shard should be initialized") .set_replication_position_inclusive(current_position_inclusive.clone(), now); - INGEST_METRICS + INGEST_V2_METRICS .replicated_num_bytes_total .inc_by(batch_num_bytes); - INGEST_METRICS + INGEST_V2_METRICS .replicated_num_docs_total .inc_by(batch_num_docs); diff --git a/quickwit/quickwit-ingest/src/metrics.rs b/quickwit/quickwit-ingest/src/metrics.rs index 15eb5d661de..4601c6e7498 100644 --- a/quickwit/quickwit-ingest/src/metrics.rs +++ b/quickwit/quickwit-ingest/src/metrics.rs @@ -13,18 +13,14 @@ // limitations under the License. use once_cell::sync::Lazy; -use quickwit_common::metrics::{IntCounter, IntGauge, new_counter, new_counter_vec, new_gauge}; +use quickwit_common::metrics::{IntCounter, new_counter_vec}; pub struct IngestMetrics { + // With ingest V1 all ingested documents are considered valid pub ingested_docs_bytes_valid: IntCounter, + pub ingested_docs_valid: IntCounter, pub ingested_docs_bytes_invalid: IntCounter, pub ingested_docs_invalid: IntCounter, - pub ingested_docs_valid: IntCounter, - - pub replicated_num_bytes_total: IntCounter, - pub replicated_num_docs_total: IntCounter, - #[allow(dead_code)] // this really shouldn't be dead, it needs to be used somewhere - pub queue_count: IntGauge, } impl Default for IngestMetrics { @@ -56,24 +52,6 @@ impl Default for IngestMetrics { ingested_docs_bytes_invalid, ingested_docs_valid, ingested_docs_invalid, - replicated_num_bytes_total: new_counter( - "replicated_num_bytes_total", - "Total size in bytes of the replicated docs.", - "ingest", - &[], - ), - replicated_num_docs_total: new_counter( - "replicated_num_docs_total", - "Total number of docs replicated.", - "ingest", - &[], - ), - queue_count: new_gauge( - "queue_count", - "Number of queues currently active", - "ingest", - &[], - ), } } } diff --git a/quickwit/quickwit-serve/src/metrics.rs b/quickwit/quickwit-serve/src/metrics.rs index f88896ea68a..bda75da7718 100644 --- a/quickwit/quickwit-serve/src/metrics.rs +++ b/quickwit/quickwit-serve/src/metrics.rs @@ -52,14 +52,14 @@ impl Default for ServeMetrics { ), ongoing_requests: new_gauge_vec( "ongoing_requests", - "Number of ongoing requests.", + "Number of ongoing requests on specific endpoint groups", "", &[], ["endpoint_group"], ), pending_requests: new_gauge_vec( "pending_requests", - "Number of pending requests.", + "Number of pending requests on specific endpoint groups", "", &[], ["endpoint_group"], From a7f12de98976672831e67437630f40e1e5c084fb Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 16 Jul 2025 12:37:11 +0200 Subject: [PATCH 6/8] Make in-flight storage get requests a memory metric --- quickwit/quickwit-common/src/metrics.rs | 2 ++ quickwit/quickwit-storage/src/metrics.rs | 35 ++++++------------------ 2 files changed, 10 insertions(+), 27 deletions(-) diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index c59bf953937..4212760e0b7 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -342,6 +342,7 @@ pub struct InFlightDataGauges { pub doc_processor_mailbox: IntGauge, pub indexer_mailbox: IntGauge, pub index_writer: IntGauge, + pub get_object: IntGauge, in_flight_gauge_vec: IntGaugeVec<1>, } @@ -365,6 +366,7 @@ impl Default for InFlightDataGauges { doc_processor_mailbox: in_flight_gauge_vec.with_label_values(["doc_processor_mailbox"]), indexer_mailbox: in_flight_gauge_vec.with_label_values(["indexer_mailbox"]), index_writer: in_flight_gauge_vec.with_label_values(["index_writer"]), + get_object: in_flight_gauge_vec.with_label_values(["get_object"]), in_flight_gauge_vec: in_flight_gauge_vec.clone(), } } diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 064448e0270..61342c593b6 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -16,8 +16,8 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - GaugeGuard, HistogramVec, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, - new_gauge, new_histogram_vec, + GaugeGuard, HistogramVec, IntCounter, IntCounterVec, IntGauge, MEMORY_METRICS, new_counter, + new_counter_vec, new_gauge, new_histogram_vec, }; /// Counters associated to storage operations. @@ -32,8 +32,6 @@ pub struct StorageMetrics { pub get_slice_timeout_all_timeouts: IntCounter, pub object_storage_requests_total: IntCounterVec<2>, pub object_storage_request_duration: HistogramVec<2>, - pub object_storage_get_slice_in_flight_count: IntGauge, - pub object_storage_get_slice_in_flight_num_bytes: IntGauge, pub object_storage_download_num_bytes: IntCounterVec<1>, pub object_storage_download_errors: IntCounterVec<1>, pub object_storage_upload_num_bytes: IntCounterVec<1>, @@ -84,19 +82,6 @@ impl Default for StorageMetrics { ["action", "status"], vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], ), - object_storage_get_slice_in_flight_count: new_gauge( - "object_storage_get_slice_in_flight_count", - "Number of get_object for which the memory was allocated but the download is \ - still in progress.", - "storage", - &[], - ), - object_storage_get_slice_in_flight_num_bytes: new_gauge( - "object_storage_get_slice_in_flight_num_bytes", - "Memory allocated for get_object requests that are still in progress.", - "storage", - &[], - ), object_storage_download_num_bytes: new_counter_vec( "object_storage_download_num_bytes", "Amount of data downloaded from object storage.", @@ -199,15 +184,11 @@ pub static STORAGE_METRICS: Lazy = Lazy::new(StorageMetrics::def pub static CACHE_METRICS_FOR_TESTS: Lazy = Lazy::new(|| CacheMetrics::for_component("fortest")); -pub fn object_storage_get_slice_in_flight_guards( - get_request_size: usize, -) -> (GaugeGuard<'static>, GaugeGuard<'static>) { - let mut bytes_guard = GaugeGuard::from_gauge( - &crate::STORAGE_METRICS.object_storage_get_slice_in_flight_num_bytes, - ); +/// Helps tracking pre-allocated memory for downloads that are still in progress. +/// +/// This is actually recorded as a memory metric and not a storage metric. +pub fn object_storage_get_slice_in_flight_guards(get_request_size: usize) -> GaugeGuard<'static> { + let mut bytes_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.get_object); bytes_guard.add(get_request_size as i64); - let mut count_guard = - GaugeGuard::from_gauge(&crate::STORAGE_METRICS.object_storage_get_slice_in_flight_count); - count_guard.add(1); - (bytes_guard, count_guard) + bytes_guard } From ad3dd112b869ad4bd4dc9c25c0872854f379e874 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 17 Jul 2025 14:36:13 +0200 Subject: [PATCH 7/8] Add precisions to job_assigned_total --- docs/reference/metrics.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/metrics.md b/docs/reference/metrics.md index dd7bb432e3b..316244dc296 100644 --- a/docs/reference/metrics.md +++ b/docs/reference/metrics.md @@ -178,7 +178,7 @@ Note that the legacy ingest (V1) only records the `docs_total` and `docs_bytes_t | `quickwit_search` | `leaf_search_split_duration_secs` | Number of seconds required to run a leaf search over a single split. The timer starts after the semaphore is obtained | | `histogram` | | `quickwit_search` | `leaf_search_single_split_tasks` | Number of single split search tasks pending or ongoing | [`status`] | `gauge` | | `quickwit_search` | `leaf_search_single_split_warmup_num_bytes` | Size of the short lived cache for a single split once the warmup is done | | `histogram` | -| `quickwit_search` | `job_assigned_total` | Number of jobs assigned to searchers, per affinity rank | [`affinity`] | `counter` | +| `quickwit_search` | `job_assigned_total` | Number of jobs assigned from this searcher (root) to other searchers (leafs), per affinity rank | [`affinity`] | `counter` | | `quickwit_search` | `searcher_local_kv_store_size_bytes` | Size of the searcher kv store in bytes. This store is used to cache scroll contexts | | `gauge` | ## Storage Metrics From b3390414018c725a2ed2750c42644da4e1e8d304 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 17 Jul 2025 16:36:31 +0200 Subject: [PATCH 8/8] Update example Grafana dashboards --- monitoring/grafana/dashboards/indexers.json | 38 +++++++------------- monitoring/grafana/dashboards/searchers.json | 29 ++++++++------- 2 files changed, 26 insertions(+), 41 deletions(-) diff --git a/monitoring/grafana/dashboards/indexers.json b/monitoring/grafana/dashboards/indexers.json index 18d2d94aa7a..feaf655574d 100644 --- a/monitoring/grafana/dashboards/indexers.json +++ b/monitoring/grafana/dashboards/indexers.json @@ -25,6 +25,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, + "id": 1, "links": [], "liveNow": false, "panels": [ @@ -745,7 +746,7 @@ "uid": "${datasource}" }, "editorMode": "builder", - "expr": "sum by(pod) (rate(quickwit_storage_object_storage_upload_num_bytes{namespace=\"$namespace\", pod=~\"$pod\", instance=~\"$instance\"}[$__rate_interval]))", + "expr": "sum by(pod) (rate(quickwit_storage_object_storage_upload_num_bytes{instance=~\"$instance\"}[$__rate_interval]))", "hide": false, "legendFormat": "Upload bytes / sec - {{pod}}", "range": true, @@ -806,8 +807,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -910,8 +910,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -965,23 +964,12 @@ "type": "prometheus", "uid": "${datasource}" }, - "editorMode": "builder", - "expr": "sum(rate(quickwit_storage_object_storage_gets_total{instance=~\"$instance\"}[$__rate_interval]))", - "legendFormat": "GET req/sec", - "range": true, - "refId": "Download" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${datasource}" - }, - "editorMode": "builder", - "expr": "sum(rate(quickwit_storage_object_storage_puts_total{namespace=\"$namespace\", pod=~\"$pod\", instance=~\"$instance\"}[$__rate_interval]))", + "editorMode": "code", + "expr": "sum(rate(quickwit_storage_object_storage_requests_total{instance=~\"$instance\"}[$__rate_interval])) by (action)", "hide": false, - "legendFormat": "PUT req/sec", + "legendFormat": "{{action}} req/sec", "range": true, - "refId": "Upload" + "refId": "Requests" } ], "title": "Requests on object storage", @@ -1034,8 +1022,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1130,8 +1117,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1191,7 +1177,7 @@ "list": [ { "current": { - "selected": true, + "selected": false, "text": "Prometheus", "value": "PBFA97CFB590B2093" }, @@ -1245,6 +1231,6 @@ "timezone": "", "title": "Quickwit Indexers", "uid": "quickwit-indexers", - "version": 2, + "version": 6, "weekStart": "" } diff --git a/monitoring/grafana/dashboards/searchers.json b/monitoring/grafana/dashboards/searchers.json index b6f7ade938b..fdb88dcfb73 100644 --- a/monitoring/grafana/dashboards/searchers.json +++ b/monitoring/grafana/dashboards/searchers.json @@ -18,7 +18,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, - "id": 2, + "id": 4, "links": [], "liveNow": false, "panels": [ @@ -307,18 +307,18 @@ "uid": "${datasource}" }, "disableTextWrap": false, - "editorMode": "builder", - "expr": "rate(quickwit_storage_object_storage_gets_total{instance=~\"$instance\"}[$__rate_interval])", + "editorMode": "code", + "expr": "sum(rate(quickwit_storage_object_storage_requests_total{instance=~\"$instance\"}[$__rate_interval])) by (action)", "fullMetaSearch": false, "includeNullMetadata": false, "instant": false, - "legendFormat": "Total", + "legendFormat": "{{action}} req/sec", "range": true, "refId": "A", "useBackend": false } ], - "title": "Number of GET requests", + "title": "Object store requests", "type": "timeseries" }, { @@ -407,18 +407,18 @@ "uid": "${datasource}" }, "disableTextWrap": false, - "editorMode": "builder", - "expr": "quickwit_storage_object_storage_download_num_bytes{instance=~\"$instance\"}", + "editorMode": "code", + "expr": "rate(quickwit_storage_object_storage_download_num_bytes{instance=~\"$instance\"}[$__rate_interval])", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Downloaded bytes", + "legendFormat": "Download bytes / sec ", "range": true, "refId": "A", "useBackend": false } ], - "title": "Size of GET requests (bytes)", + "title": "Object store download rate", "type": "timeseries" }, { @@ -506,8 +506,8 @@ "uid": "${datasource}" }, "disableTextWrap": false, - "editorMode": "builder", - "expr": "rate(quickwit_cache_cache_hits_total{instance=~\"$instance\"}[$__rate_interval])", + "editorMode": "code", + "expr": "sum(rate(quickwit_cache_cache_hits_total{instance=~\"$instance\"}[$__rate_interval])) by (component_name)", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, @@ -710,7 +710,7 @@ "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Split footer", + "legendFormat": "{{component_name}}", "range": true, "refId": "A", "useBackend": false @@ -810,7 +810,7 @@ "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Split footer", + "legendFormat": "{{component_name}}", "range": true, "refId": "A", "useBackend": false @@ -874,7 +874,6 @@ "sort": 0, "type": "query" } - ] }, "time": { @@ -885,6 +884,6 @@ "timezone": "", "title": "Quickwit Searchers", "uid": "quickwit-searchers", - "version": 1, + "version": 4, "weekStart": "" }