-
-
Notifications
You must be signed in to change notification settings - Fork 14
MVP proposal #28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
lsfera
wants to merge
80
commits into
event-driven-io:master
Choose a base branch
from
lsfera:serialization
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
MVP proposal #28
Changes from all commits
Commits
Show all changes
80 commits
Select commit
Hold shift + click to select a range
ab10cc1
FS clean up
977f203
suppress CS1591 (Missing XML comment for publicly visible type or me…
ad74d3c
made UseTable() optional
b59d386
enforce AOT
71c1f14
rename IConsumes to IHandles
86656ec
Use double quote
438cf21
use ILKE. Replication slot name is forced to lcase
b24c01f
unused directive
b024c6f
remove static variables to enable multiple instances on same process
8cc1fed
Added DependencyIbjection project
380d06a
Added demo projrct for DI
acf5f4d
Enhanced logging
e918dfa
bumped version to 0.1.1
2055fed
allow tableDescriptor access
eadba73
renamed vars
c097047
expose NpgsqlDataSource along with connection string - close #16
ef9ac95
rename extension method
345cad6
move to files
645ec1a
simplified di registration
ca28054
collapse project
f4cfe0c
rename folder
f9bf48f
mark as implicit usage
7fff1b4
switch to prepared statement
f0c4160
dispose resources
8db4187
explicit defaults
c446721
add PublisherOptions
e71999b
Expose shortcut for table validation when bootstrapping publisher
2351af1
explain different available to publisher for validating table
9d3249a
file renamed
ea6f7fe
renamed IHandler to IMessageHandler
3887f14
first working version
0cf7595
removed unused
6809b02
files reorg
e20bfa9
marked classes as sealed
9656b3b
Reviewed Atrtibutes class allowed only, not inherit
06bd0a1
unused file
41a96de
unused
52b5567
expose singleton
253d4d4
reify memoization
678b2c8
formatting stuff
c60bc89
class renamed to PublisherOptions and SubscriberOptions. Move classes…
7729fcd
Ensure subscriber default options
5ae673e
rename `PublicationSetupOptions` to `PublicationOptions` and `Replica…
85f520b
move classes to files
20bc581
renamed files
a95cf77
table creation can be enforced when bootstrapping in both processes(p…
d3c5825
typo
f1d0961
Enforcing invariants on publisher/subscriber options builder
910902a
updated satellite packages to latest
5126bb9
simplify test
b236c91
additional examples
49408fa
consumer lookup logic must fallback on willdcard
376d748
use select pg_advisory_xact_lock to serialize access to message table…
8794c4f
mime type is internally exposed for future extension towards binary d…
fa05588
renamed nethod Name => Named for table name
7205446
tested table creation
ddbd7f6
simplified raw urn
964b883
Provide untyped append method
1de0a77
avoid usage checking on public method
575c2b5
provide additional usage patterns
44953bb
provide minimal dsl on typed consumer
2726be9
move methodInfo registration at configuration time
c777721
enable processed data trace only on trace enabled logging level
b3d4d17
corrected IErrorProcessor signature to accept KoEnvelope Id
8fd8041
added more publishing options
e56c3c1
embed ConsumeOptions with typed Consumes
cb460d1
Added EnableSubscriptionAutoHeal - see https://github.com/event-drive…
80ee62c
narrowed scope
35bc4ff
enabling cli invocation
0cf3e48
set container name
7e16bbb
added auto heal use case
414538e
enforce single INamingPolicy instance per subscription
cebdd98
Added minimal ServiceWorker configuatrion with tests
6cbcf1e
Added string routing capability and renamed to ([Message|Raw])RoutedB…
be963fb
move to source generated logger
4741c1a
use string interpolation to enable automatic renaming on refactory
321cc8b
use evtension method
cea26de
default on not found
0db1a31
added test clause
7968c6d
added ImTools
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| #usage: .\scripts\autoheal.ps1 -SolutionFile .\Blumchen.sln -ComposeFile .\docker-compose.yml | ||
| param( | ||
| [string]$SolutionFile, | ||
| [string]$ComposeFile | ||
| ) | ||
|
|
||
|
|
||
| $env:DOCKER_CLI_HINTS=$false #disable docker hints | ||
| Write-Host "Setup infrastrucure" | ||
|
|
||
| try { | ||
|
|
||
| start powershell { | ||
| docker compose up; | ||
| Read-Host; | ||
|
|
||
| } | ||
|
|
||
| Write-Host "Waiting for container readiness..." | ||
| do | ||
| { | ||
| Start-Sleep -s 5 | ||
| $state=$(docker inspect db|ConvertFrom-Json).State | ||
| $status=$state.Status | ||
| $exitCode=$state.ExitCode | ||
| $restart=$state.Restarting | ||
| }Until(($status -eq "running") -and ($exitCode -eq 0) -and ($restart -eq $false)) | ||
|
|
||
| Write-Host "...Done" | ||
|
|
||
| Write-Host "Start subscriber" | ||
| start powershell { | ||
| dotnet run --project ./src/SubscriberWorker/SubscriberWorker.csproj | ||
| Read-Host; | ||
| } | ||
|
|
||
| Write-Host "Publishing 10 messages to test the subscriptions are working properly: hit ENTER when done!" | ||
|
|
||
| start powershell { | ||
| dotnet run --project ./src/Publisher/Publisher.csproj -- -c 10 -t \"UserCreated|UserDeleted|UserModified\"; | ||
| } | ||
|
|
||
| Read-Host; | ||
|
|
||
| Write-Host "Start massive insert to force wal segment creation..." | ||
| start powershell { | ||
| dotnet run --project ./src/Publisher/Publisher.csproj -- -c 800000 -t "UserSubscribed" | ||
| } | ||
|
|
||
| Write-Host "Wait for subscribers to auto heal on error...reporting on row insert" | ||
|
|
||
| Start-Sleep -s 15 | ||
| do | ||
| { | ||
| docker exec -it db psql -h localhost -U postgres -w -c "select count(*) from outbox;" | ||
| }Until(Read-Host "Enter to report on counting rows(another key to proceed when done)" "") | ||
|
|
||
| Write-Host "Subscribers resiliency tested :-)" | ||
| Write-Host "Publishing 10 messages to test the subscriptions are still working properly: hit ENTER when done!" | ||
|
|
||
| start powershell { | ||
| dotnet run --project ./src/Publisher/Publisher.csproj -- -c 10 -t \"UserCreated|UserDeleted|UserModified\" | ||
| } | ||
| Read-Host; | ||
|
|
||
| Write-Host "We're done...: hit ENTER to shut down!" | ||
|
|
||
| Read-Host; | ||
|
|
||
| }catch { | ||
| Write-Host "An error occurred:" | ||
| Write-Host $_ | ||
| } | ||
| finally{ | ||
| docker compose -f $ComposeFile down --rmi local | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| namespace Blumchen; | ||
|
|
||
| public class ConfigurationException(string message): Exception(message); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| using Blumchen.Subscriptions.Replication; | ||
| using Microsoft.Extensions.Logging; | ||
|
|
||
| namespace Blumchen.DependencyInjection; | ||
|
|
||
| internal static partial class LoggerExtensions | ||
|
|
||
| { | ||
| [LoggerMessage(Message = "{workerName} started", Level = LogLevel.Information)] | ||
| public static partial void ServiceStarted(this ILogger logger, string workerName); | ||
|
|
||
| [LoggerMessage(Message = "{workerName} sopped", Level = LogLevel.Information)] | ||
| public static partial void ServiceStopped(this ILogger logger, string workerName); | ||
|
|
||
| [LoggerMessage(Message = "{message} processed", Level = LogLevel.Trace)] | ||
| public static partial void MessageProcessed(this ILogger logger, IEnvelope message); | ||
|
|
||
| } |
41 changes: 41 additions & 0 deletions
41
src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| using Blumchen.Subscriber; | ||
| using Blumchen.Subscriptions.Replication; | ||
| using Microsoft.Extensions.DependencyInjection; | ||
| using Microsoft.Extensions.Logging; | ||
| using Microsoft.Extensions.Logging.Abstractions; | ||
| using Npgsql; | ||
|
|
||
| #pragma warning disable IL2091 | ||
|
|
||
| namespace Blumchen.DependencyInjection; | ||
|
|
||
| public static class ServiceCollectionExtensions | ||
| { | ||
|
|
||
| public static IServiceCollection AddBlumchen<T>( | ||
| this IServiceCollection service, | ||
| Func<IServiceProvider, IWorkerOptionsBuilder, IWorkerOptionsBuilder> workerOptions) | ||
| where T : class, IMessageHandler => | ||
| service | ||
| .AddHostedService(provider => | ||
| new Worker<T>(workerOptions(provider, new WorkerOptionsBuilder()).Build(), | ||
| provider.GetRequiredService<ILogger<Worker<T>>>())); | ||
|
|
||
| public static IServiceCollection AddBlumchen<T>( | ||
| this IServiceCollection service, | ||
| string connectionString, | ||
| Func<IServiceProvider, IConsumes, OptionsBuilder> consumerFn) where T : class, IMessageHandler { | ||
| return service | ||
| .AddHostedService(provider => | ||
| new Worker<T>(MinimalWorkerOptions(provider, new WorkerOptionsBuilder()).Build(), | ||
| provider.GetService<ILogger<Worker<T>>>() ?? new NullLogger<Worker<T>>())); | ||
|
|
||
| IWorkerOptionsBuilder MinimalWorkerOptions(IServiceProvider provider, IWorkerOptionsBuilder builder) | ||
| => builder.Subscription(optionsBuilder => consumerFn(provider, optionsBuilder) | ||
| .ConnectionString(connectionString) | ||
| .DataSource(new NpgsqlDataSourceBuilder(connectionString) | ||
| .UseLoggerFactory(provider.GetService<ILoggerFactory>()).Build())); | ||
|
|
||
|
|
||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| using Blumchen.Subscriptions; | ||
| using Blumchen.Subscriptions.Replication; | ||
| using Microsoft.Extensions.Hosting; | ||
| using Microsoft.Extensions.Logging; | ||
|
|
||
| namespace Blumchen.DependencyInjection; | ||
|
|
||
| public class Worker<T>( | ||
| WorkerOptions options, | ||
| ILogger<Worker<T>> logger): BackgroundService where T : class, IMessageHandler | ||
| { | ||
| private string WorkerName { get; } = $"{nameof(Worker<T>)}<{typeof(T).Name}>"; | ||
| protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||
| { | ||
| await options.OuterPipeline.ExecuteAsync(async token => | ||
| await options.InnerPipeline.ExecuteAsync(async ct => | ||
| { | ||
| await using var subscription = new Subscription(); | ||
| await using var cursor = subscription.Subscribe(options.SubscriberOptions, ct) | ||
| .GetAsyncEnumerator(ct); | ||
| logger.ServiceStarted(WorkerName); | ||
| while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested) | ||
| logger.MessageProcessed(cursor.Current); | ||
| }, token).ConfigureAwait(false), stoppingToken).ConfigureAwait(false); | ||
| logger.ServiceStopped(WorkerName); | ||
| } | ||
|
|
||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.