33
44using Confluent . Kafka ;
55using Microsoft . Extensions . Logging ;
6+ using Oteldemo ;
67using Microsoft . EntityFrameworkCore ;
78using System . Diagnostics ;
8- using OpenTelemetry ;
9- using OpenTelemetry . Context . Propagation ;
10- using System . Data ;
11- using Microsoft . Data . SqlClient ;
12- using System . Text ;
13- using System . Text . Json ;
14- using Oteldemo ; // <- add this so OrderResult resolves
159
16- // NOTE: adjust namespaces to your project layout
1710namespace Accounting ;
1811
19- /// <summary>
20- /// Entities for SQL Server
21- /// </summary>
22-
23- /// <summary>
24- /// Raw message table (one row per consumed Kafka message)
25- /// </summary>
26- internal class RawKafkaMessageEntity
27- {
28- public long Id { get ; set ; } // IDENTITY
29- public string ? Key { get ; set ; } // Kafka key (nullable)
30- public byte [ ] Value { get ; set ; } = default ! ; // Kafka value (raw bytes)
31- public string ? HeadersJson { get ; set ; } // Kafka headers (serialized as JSON)
32- public DateTime ReceivedAtUtc { get ; set ; } // Timestamp (UTC)
33- }
34-
35- /// <summary>
36- /// Output table for "messages written today"
37- /// </summary>
38- internal class TodayMessageEntity
39- {
40- public long Id { get ; set ; } // IDENTITY
41- public long MessageId { get ; set ; } // FK to RawKafkaMessages.Id
42- public string ? Key { get ; set ; }
43- public DateTime ReceivedAtUtc { get ; set ; }
44- }
45-
4612internal class DBContext : DbContext
4713{
48- public DbSet < OrderEntity > Orders { get ; set ; } = default ! ;
49- public DbSet < OrderItemEntity > CartItems { get ; set ; } = default ! ;
50- public DbSet < ShippingEntity > Shipping { get ; set ; } = default ! ;
51-
52- public DbSet < RawKafkaMessageEntity > RawKafkaMessages { get ; set ; } = default ! ;
53- public DbSet < TodayMessageEntity > TodayMessages { get ; set ; } = default ! ;
14+ public DbSet < OrderEntity > Orders { get ; set ; }
15+ public DbSet < OrderItemEntity > CartItems { get ; set ; }
16+ public DbSet < ShippingEntity > Shipping { get ; set ; }
5417
5518 protected override void OnConfiguring ( DbContextOptionsBuilder optionsBuilder )
5619 {
57- // Use a SQL Server connection string:
58- // e.g. "Server=tcp:host,1433;Database=MyDb;User Id=sa;Password=...;Encrypt=True;TrustServerCertificate=True"
5920 var connectionString = Environment . GetEnvironmentVariable ( "DB_CONNECTION_STRING" ) ;
60- optionsBuilder . UseSqlServer ( connectionString ) ;
61- }
62-
63- protected override void OnModelCreating ( ModelBuilder modelBuilder )
64- {
65- modelBuilder . Entity < OrderEntity > ( b =>
66- {
67- b . ToTable ( "orders" ) ;
68- b . HasKey ( x => x . Id ) ;
69- } ) ;
70-
71- modelBuilder . Entity < OrderItemEntity > ( b =>
72- {
73- b . ToTable ( "orderitem" ) ; // match attribute (or drop this line & let attribute win)
74- b . HasKey ( x => new { x . ProductId , x . OrderId } ) ; // ✅ correct composite key
75- b . Property ( x => x . ProductId ) . IsRequired ( ) ;
76- b . Property ( x => x . OrderId ) . IsRequired ( ) ;
77- } ) ;
78-
79- modelBuilder . Entity < ShippingEntity > ( b =>
80- {
81- b . ToTable ( "shipping" ) ; // matches attribute
82- b . HasKey ( x => x . ShippingTrackingId ) ;
83- } ) ;
84-
85- modelBuilder . Entity < RawKafkaMessageEntity > ( b =>
86- {
87- b . ToTable ( "RawKafkaMessages" ) ;
88- b . HasKey ( x => x . Id ) ;
89- b . Property ( x => x . Id ) . UseIdentityColumn ( ) ;
90- b . Property ( x => x . Value ) . HasColumnType ( "varbinary(max)" ) . IsRequired ( ) ;
91- b . Property ( x => x . HeadersJson ) . HasColumnType ( "nvarchar(max)" ) ;
92- b . Property ( x => x . ReceivedAtUtc ) . HasColumnType ( "datetime2" ) . IsRequired ( ) ;
93- b . Property ( x => x . Key ) . HasMaxLength ( 1024 ) ;
94- b . HasIndex ( x => x . ReceivedAtUtc ) ;
95- } ) ;
96-
97- modelBuilder . Entity < TodayMessageEntity > ( b =>
98- {
99- b . ToTable ( "TodayMessages" ) ;
100- b . HasKey ( x => x . Id ) ;
101- b . Property ( x => x . Id ) . UseIdentityColumn ( ) ;
102- b . Property ( x => x . ReceivedAtUtc ) . HasColumnType ( "datetime2" ) . IsRequired ( ) ;
103- b . HasIndex ( x => x . MessageId ) . IsUnique ( ) ; // keep 1-1 with raw row
104- } ) ;
105- }
106- }
107-
108- /// <summary>
109- /// Minimal, extensible SQL task runner. Add more tasks to the list to grow the pipeline.
110- /// </summary>
111- internal interface ISqlPostProcessor
112- {
113- void Run ( SqlConnection conn , SqlTransaction ? tx = null ) ;
114- }
115-
116- internal class TodayMessagesSqlTask : ISqlPostProcessor
117- {
118- public void Run ( SqlConnection conn , SqlTransaction ? tx = null )
119- {
120- // Insert "today's" messages (UTC) into TodayMessages, idempotently.
121- // Using CAST(date) to ignore time component.
122- var cmd = conn . CreateCommand ( ) ;
123- cmd . Transaction = tx ;
124- cmd . CommandText = @"
125- INSERT INTO TodayMessages (MessageId, [Key], ReceivedAtUtc)
126- SELECT m.Id, m.[Key], m.ReceivedAtUtc
127- FROM RawKafkaMessages m
128- LEFT JOIN TodayMessages t ON t.MessageId = m.Id
129- WHERE t.MessageId IS NULL
130- AND CAST(m.ReceivedAtUtc AS date) = CAST(SYSUTCDATETIME() AS date);
131- " ;
132- cmd . CommandType = CommandType . Text ;
133- cmd . ExecuteNonQuery ( ) ;
134- }
135- }
136-
137- internal class SqlPostInsertPipeline
138- {
139- private readonly List < ISqlPostProcessor > _tasks = new ( ) ;
140-
141- public SqlPostInsertPipeline Add ( ISqlPostProcessor task )
142- {
143- _tasks . Add ( task ) ;
144- return this ;
145- }
14621
147- public void RunAll ( SqlConnection conn , SqlTransaction ? tx = null )
148- {
149- foreach ( var t in _tasks )
150- {
151- t . Run ( conn , tx ) ;
152- }
22+ optionsBuilder . UseNpgsql ( connectionString ) . UseSnakeCaseNamingConvention ( ) ;
15323 }
15424}
15525
156- internal static class KafkaHeaderSerializer
157- {
158- public static string ToJson ( Headers headers )
159- {
160- var dict = new Dictionary < string , List < string > > ( ) ;
161- foreach ( var h in headers )
162- {
163- if ( ! dict . TryGetValue ( h . Key , out var list ) )
164- {
165- list = new List < string > ( ) ;
166- dict [ h . Key ] = list ;
167- }
168- list . Add ( h . GetValueBytes ( ) is { } bytes ? Encoding . UTF8 . GetString ( bytes ) : "" ) ;
169- }
170- return JsonSerializer . Serialize ( dict ) ;
171- }
172- }
17326
17427internal class Consumer : IDisposable
17528{
17629 private const string TopicName = "orders" ;
17730
178- private readonly ILogger _logger ;
179- private readonly IConsumer < string , byte [ ] > _consumer ;
31+ private ILogger _logger ;
32+ private IConsumer < string , byte [ ] > _consumer ;
18033 private bool _isListening ;
181- private readonly DBContext ? _dbContext ;
182- private readonly SqlPostInsertPipeline _sqlPipeline ;
34+ private DBContext ? _dbContext ;
18335 private static readonly ActivitySource MyActivitySource = new ( "Accounting.Consumer" ) ;
184- private static readonly TextMapPropagator Propagator = Propagators . DefaultTextMapPropagator ;
18536
18637 public Consumer ( ILogger < Consumer > logger )
18738 {
@@ -192,17 +43,9 @@ public Consumer(ILogger<Consumer> logger)
19243
19344 _consumer = BuildConsumer ( servers ) ;
19445 _consumer . Subscribe ( TopicName ) ;
195- _logger . LogInformation ( $ "Connecting to Kafka: { servers } ") ;
19646
197- var connStr = Environment . GetEnvironmentVariable ( "DB_CONNECTION_STRING" ) ;
198- _dbContext = connStr == null ? null : new DBContext ( ) ;
199-
200- // Make sure DB exists for demo purposes (you likely want proper migrations)
201- _dbContext ? . Database . EnsureCreated ( ) ;
202-
203- // Build SQL pipeline (add more tasks here later)
204- _sqlPipeline = new SqlPostInsertPipeline ( )
205- . Add ( new TodayMessagesSqlTask ( ) ) ;
47+ _logger . LogInformation ( $ "Connecting to Kafka: { servers } ") ;
48+ _dbContext = Environment . GetEnvironmentVariable ( "DB_CONNECTION_STRING" ) == null ? null : new DBContext ( ) ;
20649 }
20750
20851 public void StartListening ( )
@@ -215,24 +58,8 @@ public void StartListening()
21558 {
21659 try
21760 {
61+ using var activity = MyActivitySource . StartActivity ( "order-consumed" , ActivityKind . Internal ) ;
21862 var consumeResult = _consumer . Consume ( ) ;
219-
220- // Extract parent context from Kafka headers
221- var parentContext = Propagator . Extract ( default , consumeResult . Message . Headers ,
222- ( headers , key ) =>
223- {
224- return headers . TryGetLastBytes ( key , out var value )
225- ? new [ ] { Encoding . UTF8 . GetString ( value ) }
226- : Array . Empty < string > ( ) ;
227- } ) ;
228-
229- OpenTelemetry . Baggage . Current = parentContext . Baggage ;
230-
231- using var activity = MyActivitySource . StartActivity (
232- "order-consumed" ,
233- ActivityKind . Consumer ,
234- parentContext . ActivityContext ) ;
235-
23663 ProcessMessage ( consumeResult . Message ) ;
23764 }
23865 catch ( ConsumeException e )
@@ -244,6 +71,7 @@ public void StartListening()
24471 catch ( OperationCanceledException )
24572 {
24673 _logger . LogInformation ( "Closing consumer" ) ;
74+
24775 _consumer . Close ( ) ;
24876 }
24977 }
@@ -257,16 +85,14 @@ private void ProcessMessage(Message<string, byte[]> message)
25785
25886 if ( _dbContext == null )
25987 {
260- // DB not configured – just log and return
26188 return ;
26289 }
26390
264- var utcNow = DateTime . UtcNow ;
265-
266- // 1) Save your domain entities (existing behavior)
267- var orderEntity = new OrderEntity { Id = order . OrderId } ;
91+ var orderEntity = new OrderEntity
92+ {
93+ Id = order . OrderId
94+ } ;
26895 _dbContext . Add ( orderEntity ) ;
269-
27096 foreach ( var item in order . Items )
27197 {
27298 var orderItem = new OrderItemEntity
@@ -278,6 +104,7 @@ private void ProcessMessage(Message<string, byte[]> message)
278104 Quantity = item . Item . Quantity ,
279105 OrderId = order . OrderId
280106 } ;
107+
281108 _dbContext . Add ( orderItem ) ;
282109 }
283110
@@ -295,53 +122,32 @@ private void ProcessMessage(Message<string, byte[]> message)
295122 OrderId = order . OrderId
296123 } ;
297124 _dbContext . Add ( shipping ) ;
298-
299- // 2) Also store the raw Kafka message (key, value, headers)
300- var raw = new RawKafkaMessageEntity
301- {
302- Key = message . Key ,
303- Value = message . Value ?? Array . Empty < byte > ( ) ,
304- HeadersJson = message . Headers != null ? KafkaHeaderSerializer . ToJson ( message . Headers ) : null ,
305- ReceivedAtUtc = utcNow
306- } ;
307- _dbContext . Add ( raw ) ;
308-
309- // 3) Save everything via EF first
310125 _dbContext . SaveChanges ( ) ;
311-
312- // 4) Run the SQL pipeline (extensible hook for adding more SQL tasks)
313- // Use a single connection & transaction for deterministic behavior.
314- var connStr = _dbContext . Database . GetConnectionString ( ) ;
315- using var conn = new SqlConnection ( connStr ) ;
316- conn . Open ( ) ;
317- using var tx = conn . BeginTransaction ( System . Data . IsolationLevel . ReadCommitted ) ;
318-
319- _sqlPipeline . RunAll ( conn , tx ) ;
320-
321- tx . Commit ( ) ;
322126 }
323127 catch ( Exception ex )
324128 {
325- _logger . LogError ( ex , "Order processing failed:" ) ;
129+ _logger . LogError ( ex , "Order parsing failed:" ) ;
326130 }
327131 }
328132
329133 private IConsumer < string , byte [ ] > BuildConsumer ( string servers )
330134 {
331135 var conf = new ConsumerConfig
332136 {
333- GroupId = "accounting" ,
137+ GroupId = $ "accounting",
334138 BootstrapServers = servers ,
139+ // https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example
335140 AutoOffsetReset = AutoOffsetReset . Earliest ,
336141 EnableAutoCommit = true
337142 } ;
338143
339- return new ConsumerBuilder < string , byte [ ] > ( conf ) . Build ( ) ;
144+ return new ConsumerBuilder < string , byte [ ] > ( conf )
145+ . Build ( ) ;
340146 }
341147
342148 public void Dispose ( )
343149 {
344150 _isListening = false ;
345151 _consumer ? . Dispose ( ) ;
346152 }
347- }
153+ }
0 commit comments