@@ -6,10 +6,7 @@ use chrono::{TimeZone, Utc};
66use opentelemetry_proto:: tonic:: common:: v1:: any_value:: Value ;
77use opentelemetry_proto:: tonic:: logs:: v1:: LogRecord ;
88use std:: borrow:: Cow ;
9- use std:: collections:: HashMap ;
10- use std:: sync:: { Arc , RwLock } ;
11-
12- type SchemaCache = Arc < RwLock < HashMap < u64 , ( Arc < BondEncodedSchema > , [ u8 ; 16 ] ) > > > ;
9+ use std:: sync:: Arc ;
1310
1411const FIELD_ENV_NAME : & str = "env_name" ;
1512const FIELD_ENV_VER : & str = "env_ver" ;
@@ -25,22 +22,11 @@ const FIELD_BODY: &str = "body";
2522
2623/// Encoder to write OTLP payload in bond form.
2724#[ derive( Clone ) ]
28- pub ( crate ) struct OtlpEncoder {
29- // TODO - limit cache size or use LRU eviction, and/or add feature flag for caching
30- schema_cache : SchemaCache ,
31- }
25+ pub ( crate ) struct OtlpEncoder ;
3226
3327impl OtlpEncoder {
3428 pub ( crate ) fn new ( ) -> Self {
35- OtlpEncoder {
36- schema_cache : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
37- }
38- }
39-
40- /// Get the number of cached schemas (for testing/debugging purposes)
41- #[ allow( dead_code) ]
42- pub ( crate ) fn schema_cache_size ( & self ) -> usize {
43- self . schema_cache . read ( ) . unwrap ( ) . len ( )
29+ OtlpEncoder { }
4430 }
4531
4632 /// Encode a batch of logs into a vector of (event_name, bytes, schema_ids, start_time_nanos, end_time_nanos)
@@ -105,7 +91,7 @@ impl OtlpEncoder {
10591 let ( field_info, schema_id) =
10692 Self :: determine_fields_and_schema_id ( log_record, event_name_str) ;
10793
108- let schema_entry = self . get_or_create_schema ( schema_id, field_info. as_slice ( ) ) ;
94+ let schema_entry = Self :: create_schema ( schema_id, field_info. as_slice ( ) ) ;
10995 // 2. Encode row
11096 let row_buffer = self . write_row_data ( log_record, & field_info) ;
11197 let level = log_record. severity_number as u8 ;
@@ -251,35 +237,14 @@ impl OtlpEncoder {
251237 ( field_defs, schema_id)
252238 }
253239
254- /// Get or create schema - fields are accessible via returned schema entry
255- fn get_or_create_schema ( & self , schema_id : u64 , field_info : & [ FieldDef ] ) -> CentralSchemaEntry {
256- {
257- if let Some ( ( schema_arc, schema_md5) ) =
258- self . schema_cache . read ( ) . unwrap ( ) . get ( & schema_id)
259- {
260- return CentralSchemaEntry {
261- id : schema_id,
262- md5 : * schema_md5,
263- schema : ( * * schema_arc) . clone ( ) , // Dereference Arc and clone BondEncodedSchema
264- } ;
265- }
266- }
267-
268- // Only clone field_info when we actually need to create a new schema
269- // Investigate if we can avoid cloning by using Cow using Arc to fields_info
240+ /// Create schema - always creates a new CentralSchemaEntry
241+ fn create_schema ( schema_id : u64 , field_info : & [ FieldDef ] ) -> CentralSchemaEntry {
270242 let schema =
271243 BondEncodedSchema :: from_fields ( "OtlpLogRecord" , "telemetry" , field_info. to_vec ( ) ) ; //TODO - use actual struct name and namespace
272244
273245 let schema_bytes = schema. as_bytes ( ) ;
274246 let schema_md5 = md5:: compute ( schema_bytes) . 0 ;
275247
276- // Cache Arc<BondEncodedSchema> to avoid cloning large structures
277- let schema_arc = Arc :: new ( schema. clone ( ) ) ;
278- {
279- let mut cache = self . schema_cache . write ( ) . unwrap ( ) ;
280- cache. insert ( schema_id, ( schema_arc, schema_md5) ) ;
281- }
282-
283248 CentralSchemaEntry {
284249 id : schema_id,
285250 md5 : schema_md5,
@@ -431,35 +396,91 @@ mod tests {
431396 }
432397
433398 #[ test]
434- fn test_schema_caching ( ) {
399+ fn test_multiple_schemas_per_batch ( ) {
435400 let encoder = OtlpEncoder :: new ( ) ;
436401
402+ // Create multiple log records with different schema structures
403+ // to test that multiple schemas can exist within the same batch
437404 let log1 = LogRecord {
438405 observed_time_unix_nano : 1_700_000_000_000_000_000 ,
406+ event_name : "user_action" . to_string ( ) ,
439407 severity_number : 9 ,
408+ severity_text : "INFO" . to_string ( ) ,
440409 ..Default :: default ( )
441410 } ;
442411
412+ // Schema 2: Same event_name but with trace_id (different schema)
443413 let mut log2 = LogRecord {
414+ event_name : "user_action" . to_string ( ) ,
444415 observed_time_unix_nano : 1_700_000_001_000_000_000 ,
445416 severity_number : 10 ,
417+ severity_text : "WARN" . to_string ( ) ,
446418 ..Default :: default ( )
447419 } ;
420+ log2. trace_id = vec ! [ 1 ; 16 ] ;
448421
449- let metadata = "namespace=test" ;
422+ // Schema 3: Same event_name but with attributes (different schema)
423+ let mut log3 = LogRecord {
424+ event_name : "user_action" . to_string ( ) ,
425+ observed_time_unix_nano : 1_700_000_002_000_000_000 ,
426+ severity_number : 11 ,
427+ severity_text : "ERROR" . to_string ( ) ,
428+ ..Default :: default ( )
429+ } ;
430+ log3. attributes . push ( KeyValue {
431+ key : "user_id" . to_string ( ) ,
432+ value : Some ( AnyValue {
433+ value : Some ( Value :: StringValue ( "user123" . to_string ( ) ) ) ,
434+ } ) ,
435+ } ) ;
450436
451- // First encoding creates schema
452- let _result1 = encoder. encode_log_batch ( [ log1] . iter ( ) , metadata) ;
453- assert_eq ! ( encoder. schema_cache. read( ) . unwrap( ) . len( ) , 1 ) ;
437+ let metadata = "namespace=test" ;
454438
455- // Second encoding with same schema structure reuses schema
456- let _result2 = encoder. encode_log_batch ( [ log2. clone ( ) ] . iter ( ) , metadata) ;
457- assert_eq ! ( encoder. schema_cache. read( ) . unwrap( ) . len( ) , 1 ) ;
439+ // Encode multiple log records with different schema structures but same event_name
440+ let result = encoder. encode_log_batch ( [ log1, log2, log3] . iter ( ) , metadata) ;
458441
459- // Add trace_id to create different schema
460- log2. trace_id = vec ! [ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 ] ;
461- let _result3 = encoder. encode_log_batch ( [ log2] . iter ( ) , metadata) ;
462- assert_eq ! ( encoder. schema_cache. read( ) . unwrap( ) . len( ) , 2 ) ;
442+ // Should create one batch (same event_name = "user_action")
443+ assert_eq ! ( result. len( ) , 1 ) ;
444+ let batch = & result[ 0 ] ;
445+ assert_eq ! ( batch. event_name, "user_action" ) ;
446+
447+ // Verify that multiple schemas were created within the same batch
448+ // schema_ids should contain multiple semicolon-separated MD5 hashes
449+ let schema_ids = & batch. metadata . schema_ids ;
450+ assert ! ( !schema_ids. is_empty( ) ) ;
451+
452+ // Split by semicolon to get individual schema IDs
453+ let schema_id_list: Vec < & str > = schema_ids. split ( ';' ) . collect ( ) ;
454+
455+ // Should have 3 different schema IDs (one per unique schema structure)
456+ assert_eq ! (
457+ schema_id_list. len( ) ,
458+ 3 ,
459+ "Expected 3 schema IDs but found {}: {}" ,
460+ schema_id_list. len( ) ,
461+ schema_ids
462+ ) ;
463+
464+ // Verify all schema IDs are different (each log record has different schema structure)
465+ let unique_schemas: std:: collections:: HashSet < & str > = schema_id_list. into_iter ( ) . collect ( ) ;
466+ assert_eq ! (
467+ unique_schemas. len( ) ,
468+ 3 ,
469+ "Expected 3 unique schema IDs but found duplicates in: {schema_ids}"
470+ ) ;
471+
472+ // Verify each schema ID is a valid MD5 hash (32 hex characters)
473+ for schema_id in unique_schemas {
474+ assert_eq ! (
475+ schema_id. len( ) ,
476+ 32 ,
477+ "Schema ID should be 32 hex characters: {schema_id}"
478+ ) ;
479+ assert ! (
480+ schema_id. chars( ) . all( |c| c. is_ascii_hexdigit( ) ) ,
481+ "Schema ID should contain only hex characters: {schema_id}"
482+ ) ;
483+ }
463484 }
464485
465486 #[ test]
@@ -521,9 +542,6 @@ mod tests {
521542 assert ! ( !result[ 0 ] . data. is_empty( ) ) ; // Should have encoded data
522543 // Should have 3 different schema IDs (semicolon-separated)
523544 assert_eq ! ( result[ 0 ] . metadata. schema_ids. matches( ';' ) . count( ) , 2 ) ; // 3 schemas = 2 semicolons
524-
525- // Should have 3 different schemas cached
526- assert_eq ! ( encoder. schema_cache. read( ) . unwrap( ) . len( ) , 3 ) ;
527545 }
528546
529547 #[ test]
@@ -637,8 +655,5 @@ mod tests {
637655 assert_eq ! ( system_alert. metadata. schema_ids. matches( ';' ) . count( ) , 0 ) ; // 1 schema = 0 semicolons
638656 assert ! ( !log_batch. data. is_empty( ) ) ; // Should have encoded data
639657 assert_eq ! ( log_batch. metadata. schema_ids. matches( ';' ) . count( ) , 0 ) ; // 1 schema = 0 semicolons
640-
641- // Should have 4 different schemas cached
642- assert_eq ! ( encoder. schema_cache. read( ) . unwrap( ) . len( ) , 4 ) ;
643658 }
644659}
0 commit comments