11using System . Collections . Concurrent ;
22using System . Text . Json . Serialization ;
3- using Blumchen . Configuration ;
43using Blumchen . Serialization ;
54using Blumchen . Subscriptions ;
65using Blumchen . Subscriptions . Management ;
76using Microsoft . Extensions . Hosting ;
87using Microsoft . Extensions . Logging ;
8+ using Npgsql ;
99using Polly ;
1010
1111
1212namespace Blumchen . Workers ;
1313
1414public abstract class Worker < T > (
15- DatabaseOptions databaseOptions ,
15+ NpgsqlDataSource dataSource ,
1616 IHandler < T > handler ,
1717 JsonSerializerContext jsonSerializerContext ,
1818 IErrorProcessor errorProcessor ,
@@ -21,9 +21,8 @@ public abstract class Worker<T>(
2121 PublicationManagement . PublicationSetupOptions publicationSetupOptions ,
2222 ReplicationSlotManagement . ReplicationSlotSetupOptions replicationSlotSetupOptions ,
2323 Func < TableDescriptorBuilder , TableDescriptorBuilder > tableDescriptorBuilder ,
24- ILoggerFactory loggerFactory ) : BackgroundService where T : class
24+ ILogger logger ) : BackgroundService where T : class
2525{
26- private readonly ILogger < Worker < T > > _logger = loggerFactory . CreateLogger < Worker < T > > ( ) ;
2726 private string WorkerName { get ; } = $ "{ nameof ( Worker < T > ) } <{ typeof ( T ) . Name } >";
2827 private static readonly ConcurrentDictionary < string , Action < ILogger , string , object [ ] > > LoggingActions = new ( StringComparer . OrdinalIgnoreCase ) ;
2928 private static void Notify ( ILogger logger , LogLevel level , string template , params object [ ] parameters )
@@ -33,9 +32,9 @@ static Action<ILogger, string, object[]> LoggerAction(LogLevel ll, bool enabled)
3332 {
3433 ( LogLevel . Information , true ) => ( logger , template , parameters ) => logger . LogInformation ( template , parameters ) ,
3534 ( LogLevel . Debug , true ) => ( logger , template , parameters ) => logger . LogDebug ( template , parameters ) ,
36- ( _, _) => ( _ , __ , ___ ) => { }
35+ ( _, _) => ( _ , _ , _ ) => { }
3736 } ;
38- LoggingActions . GetOrAdd ( template , s => LoggerAction ( level , logger . IsEnabled ( level ) ) ) ( logger , template , parameters ) ;
37+ LoggingActions . GetOrAdd ( template , _ => LoggerAction ( level , logger . IsEnabled ( level ) ) ) ( logger , template , parameters ) ;
3938 }
4039
4140 protected override async Task ExecuteAsync ( CancellationToken stoppingToken )
@@ -45,21 +44,21 @@ await pipeline.ExecuteAsync(async token =>
4544 await using var subscription = new Subscription ( ) ;
4645 await using var cursor = subscription . Subscribe ( builder =>
4746 builder
48- . ConnectionString ( databaseOptions . ConnectionString )
47+ . DataSource ( dataSource )
4948 . WithTable ( tableDescriptorBuilder )
5049 . WithErrorProcessor ( errorProcessor )
5150 . Handles < T , IHandler < T > > ( handler )
5251 . NamingPolicy ( namingPolicy )
5352 . JsonContext ( jsonSerializerContext )
5453 . WithPublicationOptions ( publicationSetupOptions )
5554 . WithReplicationOptions ( replicationSlotSetupOptions )
56- , ct : token , loggerFactory : loggerFactory ) . GetAsyncEnumerator ( token ) ;
57- Notify ( _logger , LogLevel . Information , "{WorkerName} started" , WorkerName ) ;
55+ , ct : token ) . GetAsyncEnumerator ( token ) ;
56+ Notify ( logger , LogLevel . Information , "{WorkerName} started" , WorkerName ) ;
5857 while ( await cursor . MoveNextAsync ( ) . ConfigureAwait ( false ) && ! token . IsCancellationRequested )
59- Notify ( _logger , LogLevel . Debug , "{cursor.Current} processed" , cursor . Current ) ;
58+ Notify ( logger , LogLevel . Debug , "{cursor.Current} processed" , cursor . Current ) ;
6059
6160 } , stoppingToken ) . ConfigureAwait ( false ) ;
62- Notify ( _logger , LogLevel . Information , "{WorkerName} stopped" , WorkerName ) ;
61+ Notify ( logger , LogLevel . Information , "{WorkerName} stopped" , WorkerName ) ;
6362 return ;
6463 }
6564
0 commit comments