@@ -412,4 +412,105 @@ describe('Integration Tests', () => {
412412 expect ( transportEvents [ 0 ] . expireAt ) . toBeGreaterThanOrEqual ( beforeEmit + 60000 ) ;
413413 } ) ;
414414 } ) ;
415+
416+ describe ( 'immediateProcessing configuration' , ( ) => {
417+ it ( 'should not process event immediately when immediateProcessing is false, but process via poller' , async ( ) => {
418+ context = await createTestApp ( {
419+ events : [
420+ {
421+ name : 'UserCreated' ,
422+ listeners : {
423+ expiresAtTTL : 60000 ,
424+ readyToRetryAfterTTL : 50 ,
425+ maxExecutionTimeTTL : 30000 ,
426+ } ,
427+ immediateProcessing : false ,
428+ } ,
429+ ] ,
430+ additionalEntities : [ User ] ,
431+ retryEveryMilliseconds : 100 ,
432+ maxInboxOutboxTransportEventPerRetry : 10 ,
433+ } ) ;
434+
435+ const emitter = context . module . get ( TransactionalEventEmitter ) ;
436+ const orm = context . orm ;
437+
438+ const handledEvents : UserCreatedEvent [ ] = [ ] ;
439+ const listener : IListener < UserCreatedEvent > = {
440+ getName : ( ) => 'ImmediateProcessingListener' ,
441+ handle : async ( event : UserCreatedEvent ) => {
442+ handledEvents . push ( event ) ;
443+ } ,
444+ } ;
445+ emitter . addListener ( 'UserCreated' , listener ) ;
446+
447+ const user = new User ( ) ;
448+ user . email = 'deferred@example.com' ;
449+ user . name = 'Deferred User' ;
450+
451+ const event = new UserCreatedEvent ( 1 , 'deferred@example.com' ) ;
452+
453+ await emitter . emitAsync ( event , [ { operation : 'persist' as const , entity : user } ] ) ;
454+
455+ expect ( handledEvents ) . toHaveLength ( 0 ) ;
456+
457+ const em = orm . em . fork ( ) ;
458+ const transportEvents = await em . find ( MikroOrmInboxOutboxTransportEvent , { eventName : 'UserCreated' } ) ;
459+ expect ( transportEvents ) . toHaveLength ( 1 ) ;
460+
461+ await new Promise ( resolve => setTimeout ( resolve , 300 ) ) ;
462+
463+ expect ( handledEvents ) . toHaveLength ( 1 ) ;
464+ expect ( handledEvents [ 0 ] ) . toMatchObject ( {
465+ name : 'UserCreated' ,
466+ userId : 1 ,
467+ email : 'deferred@example.com' ,
468+ } ) ;
469+ } ) ;
470+
471+ it ( 'should process event immediately when immediateProcessing is true (default)' , async ( ) => {
472+ context = await createTestApp ( {
473+ events : [
474+ {
475+ name : 'UserCreated' ,
476+ listeners : {
477+ expiresAtTTL : 60000 ,
478+ readyToRetryAfterTTL : 5000 ,
479+ maxExecutionTimeTTL : 30000 ,
480+ } ,
481+ immediateProcessing : true ,
482+ } ,
483+ ] ,
484+ additionalEntities : [ User ] ,
485+ retryEveryMilliseconds : 10000 ,
486+ maxInboxOutboxTransportEventPerRetry : 10 ,
487+ } ) ;
488+
489+ const emitter = context . module . get ( TransactionalEventEmitter ) ;
490+
491+ const handledEvents : UserCreatedEvent [ ] = [ ] ;
492+ const listener : IListener < UserCreatedEvent > = {
493+ getName : ( ) => 'ImmediateListener' ,
494+ handle : async ( event : UserCreatedEvent ) => {
495+ handledEvents . push ( event ) ;
496+ } ,
497+ } ;
498+ emitter . addListener ( 'UserCreated' , listener ) ;
499+
500+ const user = new User ( ) ;
501+ user . email = 'immediate@example.com' ;
502+ user . name = 'Immediate User' ;
503+
504+ const event = new UserCreatedEvent ( 1 , 'immediate@example.com' ) ;
505+
506+ await emitter . emitAsync ( event , [ { operation : 'persist' as const , entity : user } ] ) ;
507+
508+ expect ( handledEvents ) . toHaveLength ( 1 ) ;
509+ expect ( handledEvents [ 0 ] ) . toMatchObject ( {
510+ name : 'UserCreated' ,
511+ userId : 1 ,
512+ email : 'immediate@example.com' ,
513+ } ) ;
514+ } ) ;
515+ } ) ;
415516} ) ;
0 commit comments