Skip to content

Commit 0d1b7dc

Browse files
authored
Merge pull request #287 from AxonIQ/dead-letter-example-code
Update Dead-Letter Queue Code samples
2 parents 22addc2 + b4d7c16 commit 0d1b7dc

File tree

2 files changed

+120
-42
lines changed

2 files changed

+120
-42
lines changed

axon-framework/events/event-processors/README.md

Lines changed: 47 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,15 @@ To that end, the supported dead-letter queue is a so-called `SequencedDeadLetter
277277
Integral to its design is to allow for queueing failed events and events that belong to a faulty sequence.
278278
It does so by maintaining a sequence identifier for each event, determined by the [sequencing policy](/axon-framework/events/event-processors/streaming.md#sequential-processing).
279279

280+
> **Is there support for Sagas?**
281+
>
282+
> Currently, there is *no* support for using a dead-letter queue for [sagas](/axon-framework/sagas/README.md).
283+
> We've taken this decision as we cannot support a sequenced dead lettering approach as we do for regular event handling.
284+
>
285+
> Furthermore, we cannot do this, as a saga's associations can vary widely between events.
286+
> Due to this, the sequence of events may change, breaking this level of support.
287+
> Hence, there's no way of knowing whether a next event in the stream does or does not belong to a saga.
288+
280289
Note that you *cannot* share a dead-letter queue between different processing groups.
281290
Hence, each processing group you want to enable this behavior for should receive a unique dead-letter queue instance.
282291

@@ -305,21 +314,20 @@ A `JpaSequencedDeadLetterQueue` configuration example:
305314
{% tabs %}
306315
{% tab title="Axon Configuration API" %}
307316
```java
308-
public class DeadLetterQueueExampleConfig {
309-
310-
public ConfigurerModule configureDeadLetterQueueFor(String processingGroup) {
311-
return configurer -> configurer.eventProcessing(
312-
eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue(
313-
processingGroup,
314-
configuration -> JpaSequencedDeadLetterQueue.builder()
315-
.processingGroup(processingGroup)
316-
.maxSequences(256)
317-
.maxSequenceSize(256)
318-
.entityManagerProvider(configuration.getComponent(EntityManagerProvider.class))
319-
.transactionManager(configuration.getComponent(TransactionManager.class))
320-
.serializer(configuration.serializer())
321-
.build()
322-
)
317+
public class AxonConfig {
318+
// ...
319+
public void configureDeadLetterQueue(EventProcessingConfigurer processingConfigurer) {
320+
// Replace "my-processing-group" for the processing group you want to configure the DLQ on.
321+
processingConfigurer.registerDeadLetterQueue(
322+
"my-processing-group",
323+
config -> JpaSequencedDeadLetterQueue.builder()
324+
.processingGroup("my-processing-group")
325+
.maxSequences(256)
326+
.maxSequenceSize(256)
327+
.entityManagerProvider(config.getComponent(EntityManagerProvider.class))
328+
.transactionManager(config.getComponent(TransactionManager.class))
329+
.serializer(config.serializer())
330+
.build()
323331
);
324332
}
325333
}
@@ -328,22 +336,21 @@ public class DeadLetterQueueExampleConfig {
328336
{% tab title="Spring Boot AutoConfiguration" %}
329337
```java
330338
@Configuration
331-
public class DeadLetterQueueExampleConfig {
332-
333-
@Autowired
334-
public ConfigurerModule configureDeadLetterQueueFor(String processingGroup) {
335-
return configurer -> configurer.eventProcessing(
336-
eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue(
337-
processingGroup,
338-
configuration -> JpaSequencedDeadLetterQueue.builder()
339-
.processingGroup(processingGroup)
340-
.maxSequences(256)
341-
.maxSequenceSize(256)
342-
.entityManagerProvider(configuration.getComponent(EntityManagerProvider.class))
343-
.transactionManager(configuration.getComponent(TransactionManager.class))
344-
.serializer(configuration.serializer())
345-
.build()
346-
)
339+
public class AxonConfig {
340+
// ...
341+
@Bean
342+
public ConfigurerModule deadLetterQueueConfigurerModule() {
343+
// Replace "my-processing-group" for the processing group you want to configure the DLQ on.
344+
return configurer -> configurer.eventProcessing().registerDeadLetterQueue(
345+
"my-processing-group",
346+
config -> JpaSequencedDeadLetterQueue.builder()
347+
.processingGroup("my-processing-group")
348+
.maxSequences(256)
349+
.maxSequenceSize(256)
350+
.entityManagerProvider(config.getComponent(EntityManagerProvider.class))
351+
.transactionManager(config.getComponent(TransactionManager.class))
352+
.serializer(config.serializer())
353+
.build()
347354
);
348355
}
349356
}
@@ -505,23 +512,25 @@ See the following example for configuring our custom policy:
505512
{% tabs %}
506513
{% tab title="Axon Configuration API" %}
507514
```java
508-
public class EnqueuePolicyConfigurer {
509-
510-
public void configureEnqueuePolicy(EventProcessingConfigurer configurer, String processingGroup) {
511-
configurer.registerDeadLetterPolicy(processingGroup, config -> new MyEnqueuePolicy());
515+
public class AxonConfig {
516+
// ...
517+
public void configureEnqueuePolicy(EventProcessingConfigurer configurer) {
518+
// Replace "my-processing-group" for the processing group you want to configure the policy on.
519+
configurer.registerDeadLetterPolicy("my-processing-group", config -> new MyEnqueuePolicy());
512520
}
513521
}
514522
```
515523
{% endtab %}
516524
{% tab title="Spring Boot AutoConfiguration" %}
517525
```java
518526
@Configuration
519-
public class EnqueuePolicyConfigurer {
527+
public class AxonConfig {
520528

521529
@Bean
522-
public ConfigurerModule configureEnqueuePolicy(String processingGroup) {
530+
public ConfigurerModule enqueuePolicyConfigurerModule() {
531+
// Replace "my-processing-group" for the processing group you want to configure the policy on.
523532
return configurer -> configurer.eventProcessing()
524-
.registerDeadLetterPolicy(processingGroup, config -> new MyEnqueuePolicy());
533+
.registerDeadLetterPolicy("my-processing-group", config -> new MyEnqueuePolicy());
525534
}
526535
}
527536
```

axon-framework/events/event-processors/streaming.md

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,13 +377,13 @@ To be able to reopen the stream at a later point, we should keep the progress so
377377
The progress is kept by updating and saving the `TrackingToken` after handling batches of events.
378378
Keeping the progress requires CRUD operation, for which the Streaming Processor uses the [`TokenStore`](#token-store).
379379

380-
For a Streaming Processor to process any events, it needs "a claim" on a `TrackingToken`.
380+
For a Streaming Processor to process any events, it needs ["a claim"](#token-claims) on a `TrackingToken`.
381381
The processor will update this claim every time it has finished handling a batch of events.
382382
This so-called "claim extension" is, just as updating and saving of tokens, delegated to the Token Store.
383383
Hence, the Streaming Processors achieves collaboration among instances/threads through token claims.
384384

385385
In the absence of a claim, a processor will actively try to retrieve one.
386-
If a token claim is not extended for a configurable time window, other processor threads are able to "steal" the claim.
386+
If a token claim is not extended for a configurable amount of time, other processor threads can ["steal"](#token-stealing) the claim.
387387
Token stealing can, for example, happen if event processing is slow or encountered some exceptions.
388388

389389
### Initial Tracking Token
@@ -419,7 +419,7 @@ In those cases, we recommend to validate if any of the above situations occurred
419419
There are a couple of things we can configure when it comes to tokens.
420420
We can separate these options in "initial token" and "token claim" configuration, as described in the following sections:
421421

422-
**Initial Token**
422+
#### Initial Token
423423

424424
The [initial token](#initial-tracking-token) for a `StreamingEventProcessor` is configurable for every processor instance.
425425
When configuring the initial token builder function, the received input parameter is the `StreamableMessageSource`.
@@ -501,7 +501,7 @@ public class AxonConfig {
501501
{% endtab %}
502502
{% endtabs %}
503503

504-
**Token Claims**
504+
#### Token Claims
505505

506506
As described [here](#tracking-tokens), a streaming processor should claim a token before it is allowed to perform any processing work.
507507
There are several scenarios where a processor may keep the claim for too long.
@@ -587,6 +587,75 @@ public class AxonConfig {
587587
{% endtab %}
588588
{% endtabs %}
589589

590+
#### Token Stealing
591+
592+
As described at the [start](#tracking-tokens), streaming processor threads can "steal" tokens from one another.
593+
A token is "stolen" when a thread loses a [token claim](#token-claims).
594+
Situations like this internally result in an `UnableToClaimTokenException,` caught by both streaming event processor implementations and translated into warn- or info-level log statements.
595+
596+
Where the framework uses token claims to ensure that a single thread is processing a sequence of events, it supports token stealing to guarantee event processing is not blocked forever.
597+
In short, the framework uses token stealing to unblock your streaming processor threads when processing takes too long.
598+
Examples may include literal slow processing, blocking exceptional scenarios, and deadlocks.
599+
600+
However, token stealing may occur as a surprise for some applications, making it an unwanted side effect.
601+
As such, it is good to be aware of why tokens get stolen (as described above), but also when this happens and what the consequences are.
602+
603+
##### When is a Token stolen?
604+
605+
In practical terms, a token is stolen whenever the _claim timeout_ is exceeded.
606+
607+
This timeout is met whenever the token's timestamp (e.g., the `timestamp` column of your `token_entry` table) exceeds the `claimTimeout` of the `TokenStore`.
608+
By default, the `claimTimeout` value equals 10 seconds.
609+
To adjust it, you must configure a `TokenStore` instance through its builder, as shown in the [Token Store](#token-store) section.
610+
611+
The token's timestamp is equally crucial in deciding when the timeout is met.
612+
The streaming processor thread holding the claim is in charge of updating the token timestamp.
613+
This timestamp is updated whenever the thread finishes a batch of events or whenever the processor extends the claim.
614+
When to extend a claim differs between the Tracking and Pooled Streaming processor.
615+
You should check out the [token claim](#token-claims) section if you want to know how to configure these values.
616+
617+
To further clarify, a streaming processor's thread needs to be able to update the token claim and, by extension, the timestamp to ensure it won't get stolen.
618+
Hence, a staling processor thread will, one way or another, eventually lose the claim.
619+
620+
Examples of when a thread may get its token stolen are:
621+
- Overall slow event handling
622+
- Too large event batch size
623+
- Blocking operations inside event handlers
624+
- Blocking exceptions inside event handlers
625+
626+
##### What are the consequences of Token stealing?
627+
628+
The consequence of token stealing is that an event may be handled twice (or more).
629+
630+
When a thread steals a token, the original thread was _already_ processing events from the token's position.
631+
To protect against doubling event handling, Axon Framework will combine committing the event handling task with updating the token.
632+
As the token claim is required to update the token, the original thread will fail the update.
633+
Following this, a rollback occurs on the [Unit of Work](/axon-framework/messaging-concepts/unit-of-work.md), resolving most issues arising from token stealing.
634+
635+
The ability to rollback event handling tasks sheds light on the consequences of token stealing.
636+
Most event processors project events into a projection stored within a database.
637+
Furthermore, if you store the projection in the same database as the token, the rollback will ensure the change is not persisted.
638+
Thus, the consequence of token stealing is limited to wasting processor cycles.
639+
This scenario is why we recommend storing tokens and projections in the same database.
640+
641+
If a rollback is out of the question for an event handling task, we strongly recommend making the task idempotent.
642+
You may have this scenario when, for example, the projection and tokens do not reside in the same database.
643+
or when the event handler dispatches an operation (e.g., through the `CommandGateway`).
644+
In making the invoked operation idempotent, you ensure that whenever the thread stealing a token handles an event twice (or more), the outcome will be identical.
645+
646+
Without idempotency, the consequences of token stealing can be manyfold:
647+
- Your projection (stored in a different database than your tokens!) may incorrectly project the state.
648+
- An event handler putting messages on a queue will put a message on the queue again.
649+
- A Saga Event Handler invoking a third-party service will invoke that service again.
650+
- An event handler sending an email will send that email again.
651+
652+
In short, any operation introducing a side effect that isn't handled in an idempotent fashion will occur again when a token is stolen.
653+
654+
Concluding, we can separate the consequence of token stealing into roughly three scenarios:
655+
1. We can rollback the operation. In this case, the only consequence is wasted processor cycles.
656+
2. The operation is idempotent. In this case, the only consequence is wasted processor cycles.
657+
3. When the task cannot be rolled back nor performed in an idempotent fashion, compensating actions may be the way out.
658+
590659
### Token Store
591660

592661
The `TokenStore` provides the CRUD operations for the `StreamingEventProcessor` to interact with `TrackingTokens`.

0 commit comments

Comments
 (0)