Skip to content

Commit a8d2be2

Browse files
MnatsakanMargaryanMnatsakanMargaryan
authored andcommitted
first working commit
1 parent 3817d20 commit a8d2be2

File tree

6 files changed

+218
-5
lines changed

6 files changed

+218
-5
lines changed

src/PandaNuGet/Abstractions/InboxConsumer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
using EFCore.PostgresExtensions.Enums;
22
using EFCore.PostgresExtensions.Extensions;
3-
using FinHub.Mock1.Box.Entities;
4-
using FinHub.Mock1.Box.Enums;
5-
using FinHub.Mock1.Box.Implementations;
63
using MassTransit;
4+
using MassTransit.PostgresOutbox;
75
using MassTransit.PostgresOutbox.Abstractions;
6+
using MassTransit.PostgresOutbox.Entities;
7+
using MassTransit.PostgresOutbox.Enums;
88
using Microsoft.EntityFrameworkCore;
99
using Microsoft.Extensions.DependencyInjection;
1010
using Microsoft.Extensions.Logging;

src/PandaNuGet/Extensions/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using FinHub.Mock1.Box.Abstractions;
2-
using FinHub.Mock1.Box.Implementations;
3-
using FinHub.Mock1.Box.Jobs;
42
using MassTransit.PostgresOutbox.Abstractions;
3+
using MassTransit.PostgresOutbox.Jobs;
54
using Microsoft.EntityFrameworkCore;
65
using Microsoft.Extensions.DependencyInjection;
76

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
using MassTransit.PostgresOutbox.Abstractions;
2+
using Microsoft.EntityFrameworkCore;
3+
using Microsoft.Extensions.DependencyInjection;
4+
using Microsoft.Extensions.Hosting;
5+
using Microsoft.Extensions.Logging;
6+
7+
namespace MassTransit.PostgresOutbox.Jobs
8+
{
9+
internal class InboxMessageRemovalService<TDbContext> : BackgroundService
10+
where TDbContext : DbContext, IInboxDbContext
11+
{
12+
private readonly int _beforeInDays;
13+
private readonly PeriodicTimer _timer;
14+
private readonly ILogger<InboxMessageRemovalService<TDbContext>> _logger;
15+
private readonly IServiceScopeFactory _serviceScopeFactory;
16+
17+
public InboxMessageRemovalService(IServiceScopeFactory serviceScopeFactory, ILogger<InboxMessageRemovalService<TDbContext>> logger, Settings settings)
18+
{
19+
_serviceScopeFactory = serviceScopeFactory;
20+
_logger = logger;
21+
_timer = new(settings.InboxRemovalTimerPeriod);
22+
_beforeInDays = settings.InboxRemovalBeforeInDays;
23+
}
24+
25+
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
26+
{
27+
while (await _timer.WaitForNextTickAsync(cancellationToken))
28+
{
29+
_logger.LogInformation($"{nameof(InboxMessageRemovalService<TDbContext>)} started iteration");
30+
31+
using var scope = _serviceScopeFactory.CreateScope();
32+
using var dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>();
33+
34+
try
35+
{
36+
var daysBefore = DateTime.UtcNow.AddDays(-_beforeInDays);
37+
38+
await dbContext.InboxMessages
39+
.Where(x => x.State == Enums.MessageState.Done)
40+
.Where(x => x.UpdatedAt < daysBefore)
41+
.ExecuteDeleteAsync(cancellationToken);
42+
}
43+
catch (Exception ex)
44+
{
45+
_logger.LogError(ex, ex.Message);
46+
}
47+
48+
_logger.LogInformation($"{nameof(InboxMessageRemovalService<TDbContext>)} finished iteration");
49+
}
50+
}
51+
}
52+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
using EFCore.PostgresExtensions.Enums;
2+
using EFCore.PostgresExtensions.Extensions;
3+
using FinHub.Mock1.Box.Abstractions;
4+
using MassTransit;
5+
using MassTransit.PostgresOutbox.Enums;
6+
using Microsoft.EntityFrameworkCore;
7+
using Microsoft.Extensions.DependencyInjection;
8+
using Microsoft.Extensions.Hosting;
9+
using Microsoft.Extensions.Logging;
10+
using Newtonsoft.Json;
11+
12+
namespace MassTransit.PostgresOutbox.Jobs
13+
{
14+
internal class OutboxMessagePublisherService<TDbContext> : BackgroundService
15+
where TDbContext : DbContext, IOutboxDbContext
16+
{
17+
private readonly int _batchCount;
18+
private readonly PeriodicTimer _timer;
19+
private readonly ILogger<OutboxMessagePublisherService<TDbContext>> _logger;
20+
private readonly IServiceScopeFactory _serviceScopeFactory;
21+
private readonly IPublishEndpoint _publishEndpoint;
22+
23+
public OutboxMessagePublisherService(IPublishEndpoint publishEndpoint,
24+
IServiceScopeFactory serviceScopeFactory,
25+
ILogger<OutboxMessagePublisherService<TDbContext>> logger, Settings settings)
26+
{
27+
_publishEndpoint = publishEndpoint;
28+
_serviceScopeFactory = serviceScopeFactory;
29+
_logger = logger;
30+
_timer = new(settings.PublisherTimerPeriod);
31+
_batchCount = settings.PublisherBatchCount;
32+
}
33+
34+
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
35+
{
36+
while (await _timer.WaitForNextTickAsync(cancellationToken))
37+
{
38+
_logger.LogInformation($"{nameof(OutboxMessagePublisherService<TDbContext>)} started iteration");
39+
40+
using var scope = _serviceScopeFactory.CreateScope();
41+
using var dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>();
42+
using var transactionScope = await dbContext.Database.BeginTransactionAsync(System.Data.IsolationLevel.ReadCommitted, cancellationToken);
43+
44+
try
45+
{
46+
var messages = await dbContext.OutboxMessages
47+
.Where(x => x.State == MessageState.New)
48+
.OrderBy(x => x.CreatedAt)
49+
.ForUpdate(LockBehavior.SkipLocked)
50+
.Take(_batchCount)
51+
.ToListAsync(cancellationToken);
52+
53+
if (messages.Count == 0)
54+
{
55+
continue;
56+
}
57+
58+
var publishedMessageIds = new List<Guid>(messages.Count);
59+
60+
foreach (var message in messages)
61+
{
62+
try
63+
{
64+
var type = Type.GetType(message.Type);
65+
66+
var messageObject = JsonConvert.DeserializeObject(message.Payload, type!);
67+
68+
await _publishEndpoint.Publish(messageObject!, type!, x => x.Headers.Set(Constants.OutboxMessageId, message.Id), cancellationToken);
69+
70+
publishedMessageIds.Add(message.Id);
71+
}
72+
catch (Exception ex)
73+
{
74+
_logger.LogError(ex, ex.Message);
75+
}
76+
}
77+
78+
if (publishedMessageIds.Count == 0)
79+
{
80+
continue;
81+
}
82+
83+
var utcNow = DateTime.UtcNow;
84+
85+
await dbContext.OutboxMessages
86+
.Where(b => publishedMessageIds.Contains(b.Id))
87+
.ExecuteUpdateAsync(x => x.SetProperty(m => m.State, MessageState.Done)
88+
.SetProperty(m => m.UpdatedAt, utcNow), cancellationToken: cancellationToken);
89+
90+
await dbContext.SaveChangesAsync();
91+
await transactionScope.CommitAsync();
92+
93+
}
94+
catch (Exception ex)
95+
{
96+
_logger.LogError(ex, ex.Message);
97+
await transactionScope.RollbackAsync(cancellationToken);
98+
}
99+
100+
_logger.LogInformation($"{nameof(OutboxMessagePublisherService<TDbContext>)} finished iteration");
101+
}
102+
}
103+
}
104+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using FinHub.Mock1.Box.Abstractions;
2+
using MassTransit.PostgresOutbox;
3+
using MassTransit.PostgresOutbox.Enums;
4+
using Microsoft.EntityFrameworkCore;
5+
using Microsoft.Extensions.DependencyInjection;
6+
using Microsoft.Extensions.Hosting;
7+
using Microsoft.Extensions.Logging;
8+
9+
namespace MassTransit.PostgresOutbox.Jobs
10+
{
11+
internal class OutboxMessageRemovalService<TDbContext> : BackgroundService
12+
where TDbContext : DbContext, IOutboxDbContext
13+
{
14+
private readonly int _beforeInDays;
15+
private readonly PeriodicTimer _timer;
16+
private readonly ILogger<OutboxMessageRemovalService<TDbContext>> _logger;
17+
private readonly IServiceScopeFactory _serviceScopeFactory;
18+
19+
public OutboxMessageRemovalService(IServiceScopeFactory serviceScopeFactory, ILogger<OutboxMessageRemovalService<TDbContext>> logger, Settings settings)
20+
{
21+
_serviceScopeFactory = serviceScopeFactory;
22+
_logger = logger;
23+
_timer = new(settings.OutboxRemovalTimerPeriod);
24+
_beforeInDays = settings.OutboxRemovalBeforeInDays;
25+
}
26+
27+
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
28+
{
29+
while (await _timer.WaitForNextTickAsync(cancellationToken))
30+
{
31+
_logger.LogInformation($"{nameof(OutboxMessageRemovalService<TDbContext>)} started iteration");
32+
33+
using var scope = _serviceScopeFactory.CreateScope();
34+
using var dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>();
35+
36+
try
37+
{
38+
var daysBefore = DateTime.UtcNow.AddDays(-_beforeInDays);
39+
40+
await dbContext.OutboxMessages
41+
.Where(x => x.State == MessageState.Done)
42+
.Where(x => x.UpdatedAt < daysBefore)
43+
.ExecuteDeleteAsync(cancellationToken);
44+
}
45+
catch (Exception ex)
46+
{
47+
_logger.LogError(ex, ex.Message);
48+
}
49+
50+
_logger.LogInformation($"{nameof(OutboxMessageRemovalService<TDbContext>)} finished iteration");
51+
}
52+
}
53+
}
54+
}

src/PandaNuGet/MassTransit.PostgresOutbox.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@
2323
</ItemGroup>
2424

2525
<ItemGroup>
26+
<PackageReference Include="MassTransit" Version="8.2.0" />
27+
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.1" />
2628
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.3" />
29+
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
30+
<PackageReference Include="Pandatech.EFCore.PostgresExtensions" Version="1.0.0" />
2731
</ItemGroup>
2832

2933
</Project>

0 commit comments

Comments
 (0)