|
| 1 | +using System.Collections.Concurrent; |
| 2 | +using System.Text.Json.Serialization; |
| 3 | +using Blumchen.Configuration; |
| 4 | +using Blumchen.Serialization; |
| 5 | +using Blumchen.Subscriptions; |
| 6 | +using Blumchen.Subscriptions.Management; |
| 7 | +using Microsoft.Extensions.Hosting; |
| 8 | +using Microsoft.Extensions.Logging; |
| 9 | +using Polly; |
| 10 | + |
| 11 | + |
| 12 | +namespace Blumchen.Workers; |
| 13 | + |
| 14 | +public abstract class Worker<T>( |
| 15 | + DatabaseOptions databaseOptions, |
| 16 | + IHandler<T> handler, |
| 17 | + JsonSerializerContext jsonSerializerContext, |
| 18 | + IErrorProcessor errorProcessor, |
| 19 | + ResiliencePipeline pipeline, |
| 20 | + INamingPolicy namingPolicy, |
| 21 | + PublicationManagement.PublicationSetupOptions publicationSetupOptions, |
| 22 | + ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotSetupOptions, |
| 23 | + ILoggerFactory loggerFactory): BackgroundService where T : class |
| 24 | +{ |
| 25 | + private readonly ILogger<Worker<T>> _logger = loggerFactory.CreateLogger<Worker<T>>(); |
| 26 | + private string WorkerName { get; } = $"{nameof(Worker<T>)}<{typeof(T).Name}>"; |
| 27 | + private static readonly ConcurrentDictionary<string, Action<ILogger, string, object[]>> _actions = new(StringComparer.OrdinalIgnoreCase); |
| 28 | + private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters) |
| 29 | + { |
| 30 | + static Action<ILogger, string, object[]> LoggerAction(LogLevel ll, bool enabled) => |
| 31 | + (ll, enabled) switch |
| 32 | + { |
| 33 | + (LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters), |
| 34 | + (LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters), |
| 35 | + (_, _) => (_, __, ___) => { } |
| 36 | + }; |
| 37 | + _actions.GetOrAdd(template,s => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters); |
| 38 | + } |
| 39 | + |
| 40 | + protected override async Task ExecuteAsync(CancellationToken stoppingToken) |
| 41 | + { |
| 42 | + await pipeline.ExecuteAsync(async token => |
| 43 | + { |
| 44 | + await using var subscription = new Subscription(); |
| 45 | + await using var cursor = subscription.Subscribe(builder => |
| 46 | + builder |
| 47 | + .ConnectionString(databaseOptions.ConnectionString) |
| 48 | + .WithErrorProcessor(errorProcessor) |
| 49 | + .Handles<T, IHandler<T>>(handler) |
| 50 | + .NamingPolicy(namingPolicy) |
| 51 | + .JsonContext(jsonSerializerContext) |
| 52 | + .WithPublicationOptions(publicationSetupOptions) |
| 53 | + .WithReplicationOptions(replicationSlotSetupOptions) |
| 54 | + , ct: token, loggerFactory: loggerFactory).GetAsyncEnumerator(token); |
| 55 | + Notify(_logger, LogLevel.Information,"{WorkerName} started", WorkerName); |
| 56 | + while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested) |
| 57 | + Notify(_logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current); |
| 58 | + |
| 59 | + }, stoppingToken).ConfigureAwait(false); |
| 60 | + Notify(_logger, LogLevel.Information, "{WorkerName} stopped", WorkerName); |
| 61 | + return; |
| 62 | + } |
| 63 | + |
| 64 | +} |
0 commit comments