Skip to content

Commit 64285f0

Browse files
thomasballingerConvex, Inc.
authored andcommitted
Approximate types for event log events (#40944)
GitOrigin-RevId: 7349117c81a1283aaafaadf988fb953ee35721b1
1 parent a70eeb0 commit 64285f0

File tree

7 files changed

+989
-2
lines changed

7 files changed

+989
-2
lines changed

crates/common/src/log_streaming.rs

Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ pub struct SchedulerInfo {
8484
pub job_id: String,
8585
}
8686

87+
// When adding a new event type:
88+
// - add a Schema type in the tests at the bottom of this file
89+
// - consider adding formatting of it in the CLI
90+
// - add it to the docs
91+
//
92+
// Also consider getting rid of the V1 format!
8793
#[derive(Debug, Clone)]
8894
pub enum StructuredLogEvent {
8995
/// Topic for verification logs. These are issued on sink startup and are
@@ -543,10 +549,18 @@ impl HeapSize for FunctionEventSource {
543549

544550
#[cfg(test)]
545551
mod tests {
552+
use serde::{
553+
Deserialize,
554+
Serialize,
555+
};
546556
use serde_json::{
547557
json,
548558
Value as JsonValue,
549559
};
560+
use utoipa::{
561+
OpenApi,
562+
ToSchema,
563+
};
550564

551565
use crate::{
552566
components::ComponentPath,
@@ -556,9 +570,12 @@ mod tests {
556570
LogLineStructured,
557571
},
558572
log_streaming::{
573+
AggregatedFunctionUsageStats,
559574
FunctionEventSource,
560575
LogEvent,
561576
LogEventFormatVersion,
577+
OccInfo,
578+
SchedulerInfo,
562579
StructuredLogEvent,
563580
},
564581
runtime::UnixTimestamp,
@@ -619,4 +636,303 @@ mod tests {
619636
);
620637
Ok(())
621638
}
639+
640+
// Utoipa schemas for log stream events which are for documentation only.
641+
// They are cursorily tested to check they can be used to parse some
642+
// event log output.
643+
//
644+
// These types need to be updated manually.
645+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
646+
#[allow(dead_code)]
647+
struct ConsoleLogEvent {
648+
timestamp: u64,
649+
#[schema(inline)]
650+
function: SchemaFunctionEventSource,
651+
log_level: String,
652+
message: String,
653+
is_truncated: bool,
654+
system_code: Option<String>,
655+
}
656+
657+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
658+
#[allow(dead_code)]
659+
struct SchemaFunctionEventSource {
660+
path: String,
661+
r#type: String,
662+
cached: Option<bool>,
663+
request_id: String,
664+
mutation_queue_length: Option<usize>,
665+
mutation_retry_count: Option<usize>,
666+
component_path: Option<String>,
667+
}
668+
669+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
670+
#[allow(dead_code)]
671+
struct SchemaOccInfo {
672+
table_name: Option<String>,
673+
document_id: Option<String>,
674+
write_source: Option<String>,
675+
retry_count: u64,
676+
}
677+
678+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
679+
#[allow(dead_code)]
680+
struct SchemaSchedulerInfo {
681+
job_id: String,
682+
}
683+
684+
// Additional log event schemas
685+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
686+
#[allow(dead_code)]
687+
struct VerificationEvent {
688+
timestamp: u64,
689+
message: String, // "Convex connection test"
690+
}
691+
692+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
693+
#[allow(dead_code)]
694+
struct FunctionExecutionEvent {
695+
timestamp: u64,
696+
#[schema(inline)]
697+
function: SchemaFunctionEventSource,
698+
execution_time_ms: u64,
699+
status: String, // "success" or "failure"
700+
error_message: Option<String>,
701+
#[schema(inline)]
702+
occ_info: Option<SchemaOccInfo>,
703+
#[schema(inline)]
704+
scheduler_info: Option<SchemaSchedulerInfo>,
705+
#[schema(inline)]
706+
usage: UsageStats,
707+
}
708+
709+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
710+
#[allow(dead_code)]
711+
struct UsageStats {
712+
database_read_bytes: u64,
713+
database_write_bytes: u64,
714+
database_read_documents: u64,
715+
file_storage_read_bytes: u64,
716+
file_storage_write_bytes: u64,
717+
vector_storage_read_bytes: u64,
718+
vector_storage_write_bytes: u64,
719+
action_memory_used_mb: Option<u64>,
720+
}
721+
722+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
723+
#[allow(dead_code)]
724+
struct DeploymentAuditLogEvent {
725+
timestamp: u64,
726+
audit_log_action: String,
727+
audit_log_metadata: String, // JSON-stringified metadata
728+
}
729+
730+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
731+
#[allow(dead_code)]
732+
struct SchedulerStatsEvent {
733+
timestamp: u64,
734+
lag_seconds: u64,
735+
num_running_jobs: u64,
736+
}
737+
738+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
739+
#[allow(dead_code)]
740+
struct ScheduledJobLagEvent {
741+
timestamp: u64,
742+
lag_seconds: u64,
743+
}
744+
745+
// Union type for all log events, discriminated by topic field
746+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
747+
#[serde(tag = "topic")]
748+
#[allow(dead_code)]
749+
enum LogStreamEvent {
750+
#[serde(rename = "console")]
751+
Console(ConsoleLogEvent),
752+
#[serde(rename = "verification")]
753+
Verification(VerificationEvent),
754+
#[serde(rename = "function_execution")]
755+
FunctionExecution(FunctionExecutionEvent),
756+
#[serde(rename = "audit_log")]
757+
DeploymentAuditLog(DeploymentAuditLogEvent),
758+
#[serde(rename = "scheduler_stats")]
759+
SchedulerStats(SchedulerStatsEvent),
760+
#[serde(rename = "scheduled_job_lag")]
761+
ScheduledJobLag(ScheduledJobLagEvent),
762+
}
763+
764+
// OpenAPI document for log stream schemas
765+
#[derive(OpenApi)]
766+
#[openapi(
767+
info(
768+
title = "Convex Log Stream Events",
769+
version = "2.0.0",
770+
description = "Schema definitions for Convex log stream events (V2 format)"
771+
),
772+
components(schemas(LogStreamEvent))
773+
)]
774+
struct LogStreamApiDoc;
775+
776+
#[test]
777+
fn test_v2_events_deserialize_to_schemas() -> anyhow::Result<()> {
778+
let verification_json = serde_json::to_value(
779+
&LogEvent {
780+
timestamp: UnixTimestamp::from_millis(1000),
781+
event: StructuredLogEvent::Verification,
782+
}
783+
.to_json_map(LogEventFormatVersion::V2)?,
784+
)?;
785+
let _: LogStreamEvent = serde_json::from_value(verification_json)?;
786+
787+
let console_json = serde_json::to_value(
788+
&LogEvent {
789+
timestamp: UnixTimestamp::from_millis(2000),
790+
event: StructuredLogEvent::Console {
791+
source: FunctionEventSource {
792+
context: ExecutionContext::new_for_test(),
793+
component_path: ComponentPath::test_user(),
794+
udf_path: "test:console".to_string(),
795+
udf_type: UdfType::Query,
796+
module_environment: ModuleEnvironment::Isolate,
797+
cached: Some(true),
798+
mutation_queue_length: None,
799+
mutation_retry_count: None,
800+
},
801+
log_line: LogLineStructured {
802+
messages: vec!["test console log".to_string()].into(),
803+
level: LogLevel::Log,
804+
is_truncated: false,
805+
timestamp: UnixTimestamp::from_millis(2000),
806+
system_metadata: None,
807+
},
808+
},
809+
}
810+
.to_json_map(LogEventFormatVersion::V2)?,
811+
)?;
812+
let _: LogStreamEvent = serde_json::from_value(console_json)?;
813+
814+
let function_execution_json = serde_json::to_value(
815+
&LogEvent {
816+
timestamp: UnixTimestamp::from_millis(3000),
817+
event: StructuredLogEvent::FunctionExecution {
818+
source: FunctionEventSource {
819+
context: ExecutionContext::new_for_test(),
820+
component_path: ComponentPath::test_user(),
821+
udf_path: "test:function".to_string(),
822+
udf_type: UdfType::Mutation,
823+
module_environment: ModuleEnvironment::Isolate,
824+
cached: None,
825+
mutation_queue_length: Some(2),
826+
mutation_retry_count: Some(0),
827+
},
828+
error: None,
829+
execution_time: std::time::Duration::from_millis(100),
830+
usage_stats: AggregatedFunctionUsageStats {
831+
database_read_bytes: 512,
832+
database_write_bytes: 256,
833+
database_read_documents: 3,
834+
storage_read_bytes: 0,
835+
storage_write_bytes: 0,
836+
vector_index_read_bytes: 0,
837+
vector_index_write_bytes: 0,
838+
action_memory_used_mb: None,
839+
return_bytes: Some(64),
840+
},
841+
occ_info: Some(OccInfo {
842+
table_name: Some("test_table".to_string()),
843+
document_id: Some("doc123".to_string()),
844+
write_source: Some("mutation".to_string()),
845+
retry_count: 1,
846+
}),
847+
scheduler_info: Some(SchedulerInfo {
848+
job_id: "scheduled_job_456".to_string(),
849+
}),
850+
},
851+
}
852+
.to_json_map(LogEventFormatVersion::V2)?,
853+
)?;
854+
let _: LogStreamEvent = serde_json::from_value(function_execution_json)?;
855+
856+
let mut metadata = serde_json::Map::new();
857+
metadata.insert(
858+
"action".to_string(),
859+
serde_json::Value::String("deploy".to_string()),
860+
);
861+
let audit_log_json = serde_json::to_value(
862+
&LogEvent {
863+
timestamp: UnixTimestamp::from_millis(4000),
864+
event: StructuredLogEvent::DeploymentAuditLog {
865+
action: "schema_push".to_string(),
866+
metadata,
867+
},
868+
}
869+
.to_json_map(LogEventFormatVersion::V2)?,
870+
)?;
871+
let _: LogStreamEvent = serde_json::from_value(audit_log_json)?;
872+
873+
let scheduler_stats_json = serde_json::to_value(
874+
&LogEvent {
875+
timestamp: UnixTimestamp::from_millis(5000),
876+
event: StructuredLogEvent::SchedulerStats {
877+
lag_seconds: std::time::Duration::from_secs(10),
878+
num_running_jobs: 25,
879+
},
880+
}
881+
.to_json_map(LogEventFormatVersion::V2)?,
882+
)?;
883+
let _: LogStreamEvent = serde_json::from_value(scheduler_stats_json)?;
884+
885+
let job_lag_json = serde_json::to_value(
886+
&LogEvent {
887+
timestamp: UnixTimestamp::from_millis(6000),
888+
event: StructuredLogEvent::ScheduledJobLag {
889+
lag_seconds: std::time::Duration::from_secs(5),
890+
},
891+
}
892+
.to_json_map(LogEventFormatVersion::V2)?,
893+
)?;
894+
let _: LogStreamEvent = serde_json::from_value(job_lag_json)?;
895+
896+
Ok(())
897+
}
898+
899+
#[test]
900+
fn test_log_stream_schema_matches() -> anyhow::Result<()> {
901+
use std::{
902+
fs,
903+
path::Path,
904+
};
905+
906+
const LOG_STREAM_SCHEMA_FILE: &str = "../../npm-packages/convex/log-stream-openapi.json";
907+
908+
// Generate OpenAPI spec using utoipa
909+
let openapi_spec = LogStreamApiDoc::openapi();
910+
let current_schema = openapi_spec.to_pretty_json()?;
911+
912+
// Check if file exists and compare
913+
if Path::new(LOG_STREAM_SCHEMA_FILE).exists() {
914+
let existing_schema = fs::read_to_string(LOG_STREAM_SCHEMA_FILE)?;
915+
if existing_schema.trim() != current_schema.trim() {
916+
// Write updated schema
917+
fs::write(LOG_STREAM_SCHEMA_FILE, &current_schema)?;
918+
panic!(
919+
"{LOG_STREAM_SCHEMA_FILE} does not match current schema. This test \
920+
automatically updated the file so you can run again: `cargo test -p common \
921+
test_log_stream_schema_matches`"
922+
);
923+
}
924+
} else {
925+
// Create directory if it doesn't exist
926+
if let Some(parent) = Path::new(LOG_STREAM_SCHEMA_FILE).parent() {
927+
fs::create_dir_all(parent)?;
928+
}
929+
fs::write(LOG_STREAM_SCHEMA_FILE, &current_schema)?;
930+
panic!(
931+
"Created new {LOG_STREAM_SCHEMA_FILE}. Run the test again to verify: `cargo test \
932+
-p common test_log_stream_schema_matches`"
933+
);
934+
}
935+
936+
Ok(())
937+
}
622938
}

npm-packages/convex/.prettierignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,6 @@ api-extractor-configs/reports
55
api-extractor-configs/temp
66
tmpDist*
77
management-openapi.json
8+
log-stream-openapi.json
89
src/cli/generatedApi.ts
10+
src/cli/lib/generatedLogStreamApi.ts

0 commit comments

Comments
 (0)