Skip to content

Commit dadf458

Browse files
authored
feat: allow enabling debug logging in replicator (#433)
1 parent 0ee33fe commit dadf458

File tree

34 files changed

+234
-4
lines changed

34 files changed

+234
-4
lines changed

etl-api/src/configs/log.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use std::fmt;
2+
3+
use serde::{Deserialize, Serialize};
4+
use utoipa::ToSchema;
5+
6+
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, Default)]
7+
pub enum LogLevel {
8+
/// The "trace" level.
9+
///
10+
/// Designates very low priority, often extremely verbose, information.
11+
Trace,
12+
/// The "debug" level.
13+
///
14+
/// Designates lower priority information.
15+
Debug,
16+
/// The "info" level.
17+
///
18+
/// Designates useful information.
19+
#[default]
20+
Info,
21+
/// The "warn" level.
22+
///
23+
/// Designates hazardous situations.
24+
Warn,
25+
/// The "error" level.
26+
///
27+
/// Designates very serious errors.
28+
Error,
29+
}
30+
31+
impl fmt::Display for LogLevel {
32+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33+
let s = match self {
34+
LogLevel::Trace => "trace",
35+
LogLevel::Debug => "debug",
36+
LogLevel::Info => "info",
37+
LogLevel::Warn => "warn",
38+
LogLevel::Error => "error",
39+
};
40+
write!(f, "{s}")
41+
}
42+
}

etl-api/src/configs/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod destination;
22
pub mod encryption;
3+
pub mod log;
34
pub mod pipeline;
45
pub mod serde;
56
pub mod source;

etl-api/src/configs/pipeline.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use etl_config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig};
22
use serde::{Deserialize, Serialize};
33
use utoipa::ToSchema;
44

5-
use crate::configs::store::Store;
5+
use crate::configs::{log::LogLevel, store::Store};
66

77
const DEFAULT_BATCH_MAX_SIZE: usize = 100000;
88
const DEFAULT_BATCH_MAX_FILL_MS: u64 = 10000;
@@ -41,6 +41,7 @@ pub struct FullApiPipelineConfig {
4141
#[schema(example = 4)]
4242
#[serde(skip_serializing_if = "Option::is_none")]
4343
pub max_table_sync_workers: Option<u16>,
44+
pub log_level: Option<LogLevel>,
4445
}
4546

4647
impl From<StoredPipelineConfig> for FullApiPipelineConfig {
@@ -54,6 +55,7 @@ impl From<StoredPipelineConfig> for FullApiPipelineConfig {
5455
table_error_retry_delay_ms: Some(value.table_error_retry_delay_ms),
5556
table_error_retry_max_attempts: Some(value.table_error_retry_max_attempts),
5657
max_table_sync_workers: Some(value.max_table_sync_workers),
58+
log_level: value.log_level,
5759
}
5860
}
5961
}
@@ -75,6 +77,8 @@ pub struct PartialApiPipelineConfig {
7577
#[schema(example = 4)]
7678
#[serde(skip_serializing_if = "Option::is_none")]
7779
pub max_table_sync_workers: Option<u16>,
80+
#[serde(skip_serializing_if = "Option::is_none")]
81+
pub log_level: Option<LogLevel>,
7882
}
7983

8084
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -85,6 +89,7 @@ pub struct StoredPipelineConfig {
8589
#[serde(default = "default_table_error_retry_max_attempts")]
8690
pub table_error_retry_max_attempts: u32,
8791
pub max_table_sync_workers: u16,
92+
pub log_level: Option<LogLevel>,
8893
}
8994

9095
impl StoredPipelineConfig {
@@ -129,6 +134,8 @@ impl StoredPipelineConfig {
129134
if let Some(value) = partial.max_table_sync_workers {
130135
self.max_table_sync_workers = value;
131136
}
137+
138+
self.log_level = partial.log_level
132139
}
133140
}
134141

@@ -159,6 +166,7 @@ impl From<FullApiPipelineConfig> for StoredPipelineConfig {
159166
max_table_sync_workers: value
160167
.max_table_sync_workers
161168
.unwrap_or(DEFAULT_MAX_TABLE_SYNC_WORKERS),
169+
log_level: value.log_level,
162170
}
163171
}
164172
}
@@ -179,6 +187,7 @@ mod tests {
179187
table_error_retry_delay_ms: 2000,
180188
table_error_retry_max_attempts: 7,
181189
max_table_sync_workers: 4,
190+
log_level: None,
182191
};
183192

184193
let json = serde_json::to_string(&config).unwrap();
@@ -208,6 +217,7 @@ mod tests {
208217
table_error_retry_delay_ms: None,
209218
table_error_retry_max_attempts: None,
210219
max_table_sync_workers: None,
220+
log_level: Some(LogLevel::Debug),
211221
};
212222

213223
let stored: StoredPipelineConfig = full_config.clone().into();
@@ -224,6 +234,7 @@ mod tests {
224234
table_error_retry_delay_ms: None,
225235
table_error_retry_max_attempts: None,
226236
max_table_sync_workers: None,
237+
log_level: None,
227238
};
228239

229240
let stored: StoredPipelineConfig = full_config.into();
@@ -255,6 +266,7 @@ mod tests {
255266
table_error_retry_delay_ms: 1000,
256267
table_error_retry_max_attempts: 3,
257268
max_table_sync_workers: 2,
269+
log_level: None,
258270
};
259271

260272
let partial = PartialApiPipelineConfig {
@@ -266,6 +278,7 @@ mod tests {
266278
table_error_retry_delay_ms: Some(5000),
267279
table_error_retry_max_attempts: Some(9),
268280
max_table_sync_workers: None,
281+
log_level: None,
269282
};
270283

271284
stored.merge(partial);

etl-api/src/k8s/base.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use etl_config::Environment;
33
use k8s_openapi::api::core::v1::ConfigMap;
44
use thiserror::Error;
55

6-
use crate::configs::destination::StoredDestinationConfig;
6+
use crate::configs::{destination::StoredDestinationConfig, log::LogLevel};
77

88
/// Errors emitted by the Kubernetes integration.
99
///
@@ -143,6 +143,7 @@ pub trait K8sClient: Send + Sync {
143143
replicator_image: &str,
144144
environment: Environment,
145145
destination_type: DestinationType,
146+
log_level: LogLevel,
146147
) -> Result<(), K8sError>;
147148

148149
/// Deletes the replicator [`StatefulSet`] if it exists.

etl-api/src/k8s/core.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use etl_config::shared::{ReplicatorConfigWithoutSecrets, SupabaseConfigWithoutSe
33
use secrecy::ExposeSecret;
44

55
use crate::configs::destination::{StoredDestinationConfig, StoredIcebergConfig};
6+
use crate::configs::log::LogLevel;
67
use crate::configs::pipeline::StoredPipelineConfig;
78
use crate::configs::source::StoredSourceConfig;
89
use crate::db::destinations::Destination;
@@ -72,6 +73,8 @@ pub async fn create_or_update_pipeline_resources_in_k8s(
7273
api_url: supabase_api_url.map(|url| url.to_owned()),
7374
};
7475

76+
let log_level = pipeline.config.log_level.clone().unwrap_or_default();
77+
7578
let replicator_config = build_replicator_config_without_secrets(
7679
k8s_client,
7780
// We are safe to perform this conversion, since the i64 -> u64 conversion performs wrap
@@ -93,6 +96,7 @@ pub async fn create_or_update_pipeline_resources_in_k8s(
9396
image.name,
9497
environment,
9598
destination_type,
99+
log_level,
96100
)
97101
.await?;
98102

@@ -287,9 +291,16 @@ async fn create_or_update_replicator_stateful_set(
287291
replicator_image: String,
288292
environment: Environment,
289293
destination_type: DestinationType,
294+
log_level: LogLevel,
290295
) -> Result<(), PipelineError> {
291296
k8s_client
292-
.create_or_update_stateful_set(prefix, &replicator_image, environment, destination_type)
297+
.create_or_update_stateful_set(
298+
prefix,
299+
&replicator_image,
300+
environment,
301+
destination_type,
302+
log_level,
303+
)
293304
.await?;
294305

295306
Ok(())

etl-api/src/k8s/http.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::configs::log::LogLevel;
12
use crate::k8s::DestinationType;
23
use crate::k8s::{K8sClient, K8sError, PodPhase};
34
use async_trait::async_trait;
@@ -329,6 +330,7 @@ impl K8sClient for HttpK8sClient {
329330
replicator_image: &str,
330331
environment: Environment,
331332
destination_type: DestinationType,
333+
log_level: LogLevel,
332334
) -> Result<(), K8sError> {
333335
debug!("patching stateful set");
334336

@@ -341,6 +343,7 @@ impl K8sClient for HttpK8sClient {
341343
&environment,
342344
replicator_image,
343345
destination_type,
346+
log_level,
344347
);
345348

346349
let node_selector = create_node_selector_json(&environment);
@@ -567,6 +570,7 @@ fn create_container_environment_json(
567570
environment: &Environment,
568571
replicator_image: &str,
569572
destination_type: DestinationType,
573+
log_level: LogLevel,
570574
) -> Vec<serde_json::Value> {
571575
let mut container_environment = vec![
572576
json!({
@@ -578,6 +582,10 @@ fn create_container_environment_json(
578582
//TODO: set APP_VERSION to proper version instead of the replicator image name
579583
"value": replicator_image
580584
}),
585+
json!({
586+
"name": "RUST_LOG",
587+
"value": log_level.to_string()
588+
}),
581589
];
582590

583591
match environment {
@@ -1100,6 +1108,7 @@ mod tests {
11001108
&environment,
11011109
replicator_image,
11021110
DestinationType::BigQuery,
1111+
LogLevel::Info,
11031112
);
11041113
assert_json_snapshot!(container_environment);
11051114

@@ -1109,6 +1118,7 @@ mod tests {
11091118
&environment,
11101119
replicator_image,
11111120
DestinationType::BigQuery,
1121+
LogLevel::Info,
11121122
);
11131123
assert_json_snapshot!(container_environment);
11141124

@@ -1118,6 +1128,7 @@ mod tests {
11181128
&environment,
11191129
replicator_image,
11201130
DestinationType::BigQuery,
1131+
LogLevel::Info,
11211132
);
11221133
assert_json_snapshot!(container_environment);
11231134
}
@@ -1132,6 +1143,7 @@ mod tests {
11321143
&Environment::Dev,
11331144
replicator_image,
11341145
DestinationType::Iceberg,
1146+
LogLevel::Info,
11351147
);
11361148
assert_json_snapshot!(container_environment);
11371149

@@ -1140,6 +1152,7 @@ mod tests {
11401152
&Environment::Staging,
11411153
replicator_image,
11421154
DestinationType::Iceberg,
1155+
LogLevel::Info,
11431156
);
11441157
assert_json_snapshot!(container_environment);
11451158

@@ -1148,6 +1161,7 @@ mod tests {
11481161
&Environment::Prod,
11491162
replicator_image,
11501163
DestinationType::Iceberg,
1164+
LogLevel::Info,
11511165
);
11521166
assert_json_snapshot!(container_environment);
11531167
}
@@ -1231,6 +1245,7 @@ mod tests {
12311245
&environment,
12321246
replicator_image,
12331247
DestinationType::BigQuery,
1248+
LogLevel::Info,
12341249
);
12351250

12361251
let node_selector = create_node_selector_json(&environment);
@@ -1261,6 +1276,7 @@ mod tests {
12611276
&environment,
12621277
replicator_image,
12631278
DestinationType::BigQuery,
1279+
LogLevel::Info,
12641280
);
12651281

12661282
let node_selector = create_node_selector_json(&environment);
@@ -1291,6 +1307,7 @@ mod tests {
12911307
&environment,
12921308
replicator_image,
12931309
DestinationType::BigQuery,
1310+
LogLevel::Info,
12941311
);
12951312

12961313
let node_selector = create_node_selector_json(&environment);
@@ -1328,6 +1345,7 @@ mod tests {
13281345
&environment,
13291346
replicator_image,
13301347
DestinationType::Iceberg,
1348+
LogLevel::Info,
13311349
);
13321350

13331351
let node_selector = create_node_selector_json(&environment);
@@ -1358,6 +1376,7 @@ mod tests {
13581376
&environment,
13591377
replicator_image,
13601378
DestinationType::Iceberg,
1379+
LogLevel::Info,
13611380
);
13621381

13631382
let node_selector = create_node_selector_json(&environment);
@@ -1388,6 +1407,7 @@ mod tests {
13881407
&environment,
13891408
replicator_image,
13901409
DestinationType::Iceberg,
1410+
LogLevel::Info,
13911411
);
13921412

13931413
let node_selector = create_node_selector_json(&environment);

etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_bq_container_environment-2.snap

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ expression: container_environment
1111
"name": "APP_VERSION",
1212
"value": "ramsup/etl-replicator:2a41356af735f891de37d71c0e1a62864fe4630e"
1313
},
14+
{
15+
"name": "RUST_LOG",
16+
"value": "info"
17+
},
1418
{
1519
"name": "APP_SENTRY__DSN",
1620
"valueFrom": {

etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_bq_container_environment-3.snap

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ expression: container_environment
1111
"name": "APP_VERSION",
1212
"value": "ramsup/etl-replicator:2a41356af735f891de37d71c0e1a62864fe4630e"
1313
},
14+
{
15+
"name": "RUST_LOG",
16+
"value": "info"
17+
},
1418
{
1519
"name": "APP_SENTRY__DSN",
1620
"valueFrom": {

etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_bq_container_environment.snap

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ expression: container_environment
1111
"name": "APP_VERSION",
1212
"value": "ramsup/etl-replicator:2a41356af735f891de37d71c0e1a62864fe4630e"
1313
},
14+
{
15+
"name": "RUST_LOG",
16+
"value": "info"
17+
},
1418
{
1519
"name": "APP_PIPELINE__PG_CONNECTION__PASSWORD",
1620
"valueFrom": {

etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_bq_replicator_stateful_set_json-2.snap

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ expression: stateful_set_json
3838
"name": "APP_VERSION",
3939
"value": "ramsup/etl-replicator:2a41356af735f891de37d71c0e1a62864fe4630e"
4040
},
41+
{
42+
"name": "RUST_LOG",
43+
"value": "info"
44+
},
4145
{
4246
"name": "APP_SENTRY__DSN",
4347
"valueFrom": {

0 commit comments

Comments
 (0)