99namespace MassTransit . PostgresOutbox . Abstractions ;
1010
1111public abstract class InboxConsumer < TMessage , TDbContext > : IConsumer < TMessage >
12- where TMessage : class
13- where TDbContext : DbContext , IInboxDbContext
12+ where TMessage : class
13+ where TDbContext : DbContext , IInboxDbContext
1414{
15- private readonly string _consumerId ;
16- private readonly IServiceScopeFactory _serviceScopeFactory ;
15+ private readonly string _consumerId ;
16+ private readonly IServiceScopeFactory _serviceScopeFactory ;
1717
18- protected InboxConsumer ( IServiceScopeFactory serviceScopeFactory )
19- {
20- _consumerId = GetType ( ) . ToString ( ) ;
21- _serviceScopeFactory = serviceScopeFactory ;
22- }
18+ protected InboxConsumer ( IServiceScopeFactory serviceScopeFactory )
19+ {
20+ _consumerId = GetType ( ) . ToString ( ) ;
21+ _serviceScopeFactory = serviceScopeFactory ;
22+ }
2323
24- public async Task Consume ( ConsumeContext < TMessage > context )
25- {
26- using var scope = _serviceScopeFactory . CreateScope ( ) ;
27- var messageId = context . Headers . Get < Guid > ( Constants . OutboxMessageId ) ;
24+ public async Task Consume ( ConsumeContext < TMessage > context )
25+ {
26+ var messageId = context . Headers . Get < Guid > ( Constants . OutboxMessageId ) ;
2827
29- var dbContext = scope . ServiceProvider . GetRequiredService < TDbContext > ( ) ;
30- var logger = scope . ServiceProvider . GetRequiredService < ILogger < InboxConsumer < TMessage , TDbContext > > > ( ) ;
28+ if ( messageId is null )
29+ {
30+ await Consume ( context . Message ) ;
31+ return ;
32+ }
3133
32- var exists = await dbContext . InboxMessages . AnyAsync ( x => x . MessageId == messageId && x . ConsumerId == _consumerId ) ;
34+ using var scope = _serviceScopeFactory . CreateScope ( ) ;
3335
34- if ( ! exists )
35- {
36- dbContext . InboxMessages . Add ( new InboxMessage
37- {
38- MessageId = messageId ! . Value ,
39- CreatedAt = DateTime . UtcNow ,
40- State = MessageState . New ,
41- ConsumerId = _consumerId ,
42- } ) ;
36+ var dbContext = scope . ServiceProvider . GetRequiredService < TDbContext > ( ) ;
37+ var logger = scope . ServiceProvider . GetRequiredService < ILogger < InboxConsumer < TMessage , TDbContext > > > ( ) ;
4338
44- await dbContext . SaveChangesAsync ( ) ;
45- }
39+ var exists =
40+ await dbContext . InboxMessages . AnyAsync ( x => x . MessageId == messageId && x . ConsumerId == _consumerId ) ;
4641
47- using var transactionScope = await dbContext . Database . BeginTransactionAsync ( System . Data . IsolationLevel . ReadCommitted ) ;
42+ if ( ! exists )
43+ {
44+ dbContext . InboxMessages . Add ( new InboxMessage
45+ {
46+ MessageId = messageId . Value ,
47+ CreatedAt = DateTime . UtcNow ,
48+ State = MessageState . New ,
49+ ConsumerId = _consumerId ,
50+ } ) ;
4851
49- var inboxMessage = await dbContext . InboxMessages
50- . Where ( x => x . MessageId == messageId )
51- . Where ( x => x . ConsumerId == _consumerId )
52- . Where ( x => x . State == MessageState . New )
53- . ForUpdate ( LockBehavior . SkipLocked )
54- . FirstOrDefaultAsync ( ) ;
52+ await dbContext . SaveChangesAsync ( ) ;
53+ }
5554
56- if ( inboxMessage == null )
57- {
58- return ;
59- }
55+ await using var transactionScope =
56+ await dbContext . Database . BeginTransactionAsync ( System . Data . IsolationLevel . ReadCommitted ) ;
6057
61- try
62- {
63- await Consume ( context . Message ) ;
64- inboxMessage . State = MessageState . Done ;
65- }
66- catch ( Exception ex )
67- {
68- logger . LogError ( ex , "Exception thrown while consuming message" ) ;
69- throw ;
70- }
71- finally
72- {
73- inboxMessage ! . UpdatedAt = DateTime . UtcNow ;
74- await dbContext . SaveChangesAsync ( ) ;
75- await transactionScope . CommitAsync ( ) ;
76- }
77- }
58+ var inboxMessage = await dbContext . InboxMessages
59+ . Where ( x => x . MessageId == messageId )
60+ . Where ( x => x . ConsumerId == _consumerId )
61+ . Where ( x => x . State == MessageState . New )
62+ . ForUpdate ( LockBehavior . SkipLocked )
63+ . FirstOrDefaultAsync ( ) ;
7864
79- public abstract Task Consume ( TMessage message ) ;
65+ if ( inboxMessage == null )
66+ {
67+ return ;
68+ }
69+
70+ try
71+ {
72+ await Consume ( context . Message ) ;
73+ inboxMessage . State = MessageState . Done ;
74+ }
75+ catch ( Exception ex )
76+ {
77+ logger . LogError ( ex , "Exception thrown while consuming message" ) ;
78+ throw ;
79+ }
80+ finally
81+ {
82+ inboxMessage . UpdatedAt = DateTime . UtcNow ;
83+ await dbContext . SaveChangesAsync ( ) ;
84+ await transactionScope . CommitAsync ( ) ;
85+ }
86+ }
87+
88+ protected abstract Task Consume ( TMessage message ) ;
8089}
0 commit comments