@@ -25,7 +25,9 @@ use anyhow::{bail, ensure};
2525use bytesize:: ByteSize ;
2626use http:: HeaderMap ;
2727use quickwit_common:: net:: HostAddr ;
28- use quickwit_common:: shared_consts:: DEFAULT_SHARD_THROUGHPUT_LIMIT ;
28+ use quickwit_common:: shared_consts:: {
29+ DEFAULT_SHARD_BURST_LIMIT , DEFAULT_SHARD_SCALE_UP_FACTOR , DEFAULT_SHARD_THROUGHPUT_LIMIT ,
30+ } ;
2931use quickwit_common:: uri:: Uri ;
3032use quickwit_proto:: indexing:: CpuCapacity ;
3133use quickwit_proto:: types:: NodeId ;
@@ -39,7 +41,7 @@ use crate::{ConfigFormat, MetastoreConfigs};
3941
4042pub const DEFAULT_QW_CONFIG_PATH : & str = "config/quickwit.yaml" ;
4143
42- #[ derive( Clone , Debug , Eq , PartialEq , Serialize , Deserialize ) ]
44+ #[ derive( Clone , Debug , PartialEq , Serialize , Deserialize ) ]
4345#[ serde( deny_unknown_fields) ]
4446pub struct RestConfig {
4547 pub listen_addr : SocketAddr ,
@@ -50,7 +52,7 @@ pub struct RestConfig {
5052 pub tls : Option < TlsConfig > ,
5153}
5254
53- #[ derive( Clone , Debug , Eq , PartialEq , Serialize , Deserialize ) ]
55+ #[ derive( Clone , Debug , PartialEq , Serialize , Deserialize ) ]
5456#[ serde( deny_unknown_fields) ]
5557pub struct GrpcConfig {
5658 #[ serde( default = "GrpcConfig::default_max_message_size" ) ]
@@ -83,7 +85,7 @@ impl Default for GrpcConfig {
8385 }
8486}
8587
86- #[ derive( Clone , Debug , Eq , PartialEq , Serialize , Deserialize ) ]
88+ #[ derive( Clone , Debug , PartialEq , Serialize , Deserialize ) ]
8789#[ serde( deny_unknown_fields) ]
8890pub struct TlsConfig {
8991 pub cert_path : String ,
@@ -193,7 +195,7 @@ impl Default for IndexerConfig {
193195 }
194196}
195197
196- #[ derive( Debug , Clone , Copy , Eq , PartialEq , Serialize , Deserialize ) ]
198+ #[ derive( Debug , Clone , Copy , PartialEq , Serialize , Deserialize ) ]
197199#[ serde( deny_unknown_fields) ]
198200pub struct SplitCacheLimits {
199201 pub max_num_bytes : ByteSize ,
@@ -219,7 +221,7 @@ impl SplitCacheLimits {
219221 }
220222}
221223
222- #[ derive( Clone , Debug , Eq , PartialEq , Serialize , Deserialize ) ]
224+ #[ derive( Clone , Debug , PartialEq , Serialize , Deserialize ) ]
223225#[ serde( deny_unknown_fields, default ) ]
224226pub struct SearcherConfig {
225227 pub aggregation_memory_limit : ByteSize ,
@@ -254,7 +256,7 @@ pub struct SearcherConfig {
254256/// This policy is inspired by this guidance. It does not track instanteneous throughput, but
255257/// computes an overall timeout using the following formula:
256258/// `timeout_offset + num_bytes_get_request / min_throughtput`
257- #[ derive( Clone , Debug , Eq , PartialEq , Serialize , Deserialize ) ]
259+ #[ derive( Clone , Debug , PartialEq , Serialize , Deserialize ) ]
258260pub struct StorageTimeoutPolicy {
259261 pub min_throughtput_bytes_per_secs : u64 ,
260262 pub timeout_millis : u64 ,
@@ -338,14 +340,25 @@ impl SearcherConfig {
338340 }
339341}
340342
341- #[ derive( Clone , Debug , Eq , PartialEq , Serialize , Deserialize ) ]
343+ #[ derive( Clone , Debug , PartialEq , Serialize , Deserialize ) ]
342344#[ serde( deny_unknown_fields, default ) ]
343345pub struct IngestApiConfig {
346+ /// Maximum memory space taken by the ingest WAL
344347 pub max_queue_memory_usage : ByteSize ,
348+ /// Maximum disk space taken by the ingest WAL
345349 pub max_queue_disk_usage : ByteSize ,
346350 replication_factor : usize ,
347351 pub content_length_limit : ByteSize ,
352+ /// (hidden) Targeted throughput for each shard
348353 pub shard_throughput_limit : ByteSize ,
354+ /// (hidden) Maximum accumulated throughput capacity for underutilized
355+ /// shards, allowing the throughput limit to be temporarily exceeded
356+ pub shard_burst_limit : ByteSize ,
357+ /// (hidden) new_shard_count = ceil(old_shard_count * shard_scale_up_factor)
358+ ///
359+ /// Setting this too high will be cancelled out by the arbiter that prevents
360+ /// creating too many shards at once.
361+ pub shard_scale_up_factor : f32 ,
349362}
350363
351364impl Default for IngestApiConfig {
@@ -356,6 +369,8 @@ impl Default for IngestApiConfig {
356369 replication_factor : 1 ,
357370 content_length_limit : ByteSize :: mib ( 10 ) ,
358371 shard_throughput_limit : DEFAULT_SHARD_THROUGHPUT_LIMIT ,
372+ shard_burst_limit : DEFAULT_SHARD_BURST_LIMIT ,
373+ shard_scale_up_factor : DEFAULT_SHARD_SCALE_UP_FACTOR ,
359374 }
360375 }
361376}
@@ -398,20 +413,34 @@ impl IngestApiConfig {
398413 self . max_queue_memory_usage
399414 ) ;
400415 info ! (
401- "ingestion shard throughput limit: {:? }" ,
416+ "ingestion shard throughput limit: {}" ,
402417 self . shard_throughput_limit
403418 ) ;
404419 ensure ! (
405420 self . shard_throughput_limit >= ByteSize :: mib( 1 )
406421 && self . shard_throughput_limit <= ByteSize :: mib( 20 ) ,
407- "shard_throughput_limit ({:? }) must be within 1mb and 20mb" ,
422+ "shard_throughput_limit ({}) must be within 1mb and 20mb" ,
408423 self . shard_throughput_limit
409424 ) ;
425+ // The newline delimited format is persisted as something a bit larger
426+ // (lines prefixed with their length)
427+ let estimated_persist_size = ByteSize :: b ( 3 * self . content_length_limit . as_u64 ( ) / 2 ) ;
428+ ensure ! (
429+ self . shard_burst_limit >= estimated_persist_size,
430+ "shard_burst_limit ({}) must be at least 1.5*content_length_limit ({})" ,
431+ self . shard_burst_limit,
432+ estimated_persist_size,
433+ ) ;
434+ ensure ! (
435+ self . shard_scale_up_factor > 1.0 ,
436+ "shard_scale_up_factor ({}) must be greater than 1" ,
437+ self . shard_scale_up_factor,
438+ ) ;
410439 Ok ( ( ) )
411440 }
412441}
413442
414- #[ derive( Clone , Debug , Eq , PartialEq , Serialize , Deserialize ) ]
443+ #[ derive( Clone , Debug , PartialEq , Serialize , Deserialize ) ]
415444#[ serde( deny_unknown_fields) ]
416445pub struct JaegerConfig {
417446 /// Enables the gRPC endpoint that allows the Jaeger Query Service to connect and retrieve
0 commit comments