Skip to content

Commit 81e0c63

Browse files
committed
Merge remote-tracking branch 'origin/4.6' into dead-letter-example-code
2 parents 1852df8 + 1e9bc8d commit 81e0c63

File tree

12 files changed

+260
-12
lines changed

12 files changed

+260
-12
lines changed

SUMMARY.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
* [Major Releases](release-notes/rn-extensions/rn-cdi/rn-cdi-major-releases.md)
2222
* [JGroups](release-notes/rn-extensions/rn-jgroups/README.md)
2323
* [Major Releases](release-notes/rn-extensions/rn-jgroups/rn-jgroups-major-releases.md)
24+
* [Minor Releases](release-notes/rn-extensions/rn-jgroups/rn-jgroups-minor-releases.md)
2425
* [Kafka](release-notes/rn-extensions/rn-kafka/README.md)
2526
* [Major Releases](release-notes/rn-extensions/rn-kafka/rn-kafka-major-releases.md)
2627
* [Minor Releases](release-notes/rn-extensions/rn-kafka/rn-kafka-minor-releases.md)
@@ -32,6 +33,7 @@
3233
* [Minor Releases](release-notes/rn-extensions/rn-mongo/rn-mongo-minor-releases.md)
3334
* [Multi-Tenancy](release-notes/rn-extensions/rn-multi-tenancy/README.md)
3435
* [Major Releases](release-notes/rn-extensions/rn-multi-tenancy/rn-multi-tenancy-major-releases.md)
36+
* [Minor Releases](release-notes/rn-extensions/rn-multi-tenancy/rn-multi-tenancy-minor-releases.md)
3537
* [Reactor](release-notes/rn-extensions/rn-reactor/README.md)
3638
* [Major Releases](release-notes/rn-extensions/rn-reactor/rn-reactor-major-releases.md)
3739
* [Minor Releases](release-notes/rn-extensions/rn-reactor/rn-reactor-minor-releases.md)

axon-framework/events/event-processors/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,15 @@ To that end, the supported dead-letter queue is a so-called `SequencedDeadLetter
277277
Integral to its design is to allow for queueing failed events and events that belong to a faulty sequence.
278278
It does so by maintaining a sequence identifier for each event, determined by the [sequencing policy](/axon-framework/events/event-processors/streaming.md#sequential-processing).
279279

280+
> **Is there support for Sagas?**
281+
>
282+
> Currently, there is *no* support for using a dead-letter queue for [sagas](/axon-framework/sagas/README.md).
283+
> We've taken this decision as we cannot support a sequenced dead lettering approach as we do for regular event handling.
284+
>
285+
> Furthermore, we cannot do this, as a saga's associations can vary widely between events.
286+
> Due to this, the sequence of events may change, breaking this level of support.
287+
> Hence, there's no way of knowing whether a next event in the stream does or does not belong to a saga.
288+
280289
Note that you *cannot* share a dead-letter queue between different processing groups.
281290
Hence, each processing group you want to enable this behavior for should receive a unique dead-letter queue instance.
282291

axon-framework/events/event-processors/streaming.md

Lines changed: 78 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -201,15 +201,15 @@ public class AxonConfig {
201201

202202
### Configuring a Pooled Streaming Processor
203203

204-
Firstly, to specify that every new processors should default to a `PooledStreamingEventProcessor`, you can invoke the `usingPooledStreamingProcessors` method:
204+
Firstly, to specify that every new processors should default to a `PooledStreamingEventProcessor`, you can invoke the `usingPooledStreamingEventProcessors` method:
205205

206206
{% tabs %}
207207
{% tab title="Axon Configuration API" %}
208208
```java
209209
public class AxonConfig {
210210
// ...
211211
public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) {
212-
processingConfigurer.usingPooledStreamingProcessors();
212+
processingConfigurer.usingPooledStreamingEventProcessors();
213213
}
214214
}
215215
```
@@ -222,7 +222,7 @@ public class AxonConfig {
222222
// ...
223223
@Autowired
224224
public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) {
225-
processingConfigurer.usingPooledStreamingProcessors();
225+
processingConfigurer.usingPooledStreamingEventProcessors();
226226
}
227227
}
228228
```
@@ -377,13 +377,13 @@ To be able to reopen the stream at a later point, we should keep the progress so
377377
The progress is kept by updating and saving the `TrackingToken` after handling batches of events.
378378
Keeping the progress requires CRUD operation, for which the Streaming Processor uses the [`TokenStore`](#token-store).
379379

380-
For a Streaming Processor to process any events, it needs "a claim" on a `TrackingToken`.
380+
For a Streaming Processor to process any events, it needs ["a claim"](#token-claims) on a `TrackingToken`.
381381
The processor will update this claim every time it has finished handling a batch of events.
382382
This so-called "claim extension" is, just as updating and saving of tokens, delegated to the Token Store.
383383
Hence, the Streaming Processors achieves collaboration among instances/threads through token claims.
384384

385385
In the absence of a claim, a processor will actively try to retrieve one.
386-
If a token claim is not extended for a configurable time window, other processor threads are able to "steal" the claim.
386+
If a token claim is not extended for a configurable amount of time, other processor threads can ["steal"](#token-stealing) the claim.
387387
Token stealing can, for example, happen if event processing is slow or encountered some exceptions.
388388

389389
### Initial Tracking Token
@@ -419,7 +419,7 @@ In those cases, we recommend to validate if any of the above situations occurred
419419
There are a couple of things we can configure when it comes to tokens.
420420
We can separate these options in "initial token" and "token claim" configuration, as described in the following sections:
421421

422-
**Initial Token**
422+
#### Initial Token
423423

424424
The [initial token](#initial-tracking-token) for a `StreamingEventProcessor` is configurable for every processor instance.
425425
When configuring the initial token builder function, the received input parameter is the `StreamableMessageSource`.
@@ -501,7 +501,7 @@ public class AxonConfig {
501501
{% endtab %}
502502
{% endtabs %}
503503

504-
**Token Claims**
504+
#### Token Claims
505505

506506
As described [here](#tracking-tokens), a streaming processor should claim a token before it is allowed to perform any processing work.
507507
There are several scenarios where a processor may keep the claim for too long.
@@ -587,6 +587,75 @@ public class AxonConfig {
587587
{% endtab %}
588588
{% endtabs %}
589589

590+
#### Token Stealing
591+
592+
As described at the [start](#tracking-tokens), streaming processor threads can "steal" tokens from one another.
593+
A token is "stolen" when a thread loses a [token claim](#token-claims).
594+
Situations like this internally result in an `UnableToClaimTokenException,` caught by both streaming event processor implementations and translated into warn- or info-level log statements.
595+
596+
Where the framework uses token claims to ensure that a single thread is processing a sequence of events, it supports token stealing to guarantee event processing is not blocked forever.
597+
In short, the framework uses token stealing to unblock your streaming processor threads when processing takes too long.
598+
Examples may include literal slow processing, blocking exceptional scenarios, and deadlocks.
599+
600+
However, token stealing may occur as a surprise for some applications, making it an unwanted side effect.
601+
As such, it is good to be aware of why tokens get stolen (as described above), but also when this happens and what the consequences are.
602+
603+
##### When is a Token stolen?
604+
605+
In practical terms, a token is stolen whenever the _claim timeout_ is exceeded.
606+
607+
This timeout is met whenever the token's timestamp (e.g., the `timestamp` column of your `token_entry` table) exceeds the `claimTimeout` of the `TokenStore`.
608+
By default, the `claimTimeout` value equals 10 seconds.
609+
To adjust it, you must configure a `TokenStore` instance through its builder, as shown in the [Token Store](#token-store) section.
610+
611+
The token's timestamp is equally crucial in deciding when the timeout is met.
612+
The streaming processor thread holding the claim is in charge of updating the token timestamp.
613+
This timestamp is updated whenever the thread finishes a batch of events or whenever the processor extends the claim.
614+
When to extend a claim differs between the Tracking and Pooled Streaming processor.
615+
You should check out the [token claim](#token-claims) section if you want to know how to configure these values.
616+
617+
To further clarify, a streaming processor's thread needs to be able to update the token claim and, by extension, the timestamp to ensure it won't get stolen.
618+
Hence, a staling processor thread will, one way or another, eventually lose the claim.
619+
620+
Examples of when a thread may get its token stolen are:
621+
- Overall slow event handling
622+
- Too large event batch size
623+
- Blocking operations inside event handlers
624+
- Blocking exceptions inside event handlers
625+
626+
##### What are the consequences of Token stealing?
627+
628+
The consequence of token stealing is that an event may be handled twice (or more).
629+
630+
When a thread steals a token, the original thread was _already_ processing events from the token's position.
631+
To protect against doubling event handling, Axon Framework will combine committing the event handling task with updating the token.
632+
As the token claim is required to update the token, the original thread will fail the update.
633+
Following this, a rollback occurs on the [Unit of Work](/axon-framework/messaging-concepts/unit-of-work.md), resolving most issues arising from token stealing.
634+
635+
The ability to rollback event handling tasks sheds light on the consequences of token stealing.
636+
Most event processors project events into a projection stored within a database.
637+
Furthermore, if you store the projection in the same database as the token, the rollback will ensure the change is not persisted.
638+
Thus, the consequence of token stealing is limited to wasting processor cycles.
639+
This scenario is why we recommend storing tokens and projections in the same database.
640+
641+
If a rollback is out of the question for an event handling task, we strongly recommend making the task idempotent.
642+
You may have this scenario when, for example, the projection and tokens do not reside in the same database.
643+
or when the event handler dispatches an operation (e.g., through the `CommandGateway`).
644+
In making the invoked operation idempotent, you ensure that whenever the thread stealing a token handles an event twice (or more), the outcome will be identical.
645+
646+
Without idempotency, the consequences of token stealing can be manyfold:
647+
- Your projection (stored in a different database than your tokens!) may incorrectly project the state.
648+
- An event handler putting messages on a queue will put a message on the queue again.
649+
- A Saga Event Handler invoking a third-party service will invoke that service again.
650+
- An event handler sending an email will send that email again.
651+
652+
In short, any operation introducing a side effect that isn't handled in an idempotent fashion will occur again when a token is stolen.
653+
654+
Concluding, we can separate the consequence of token stealing into roughly three scenarios:
655+
1. We can rollback the operation. In this case, the only consequence is wasted processor cycles.
656+
2. The operation is idempotent. In this case, the only consequence is wasted processor cycles.
657+
3. When the task cannot be rolled back nor performed in an idempotent fashion, compensating actions may be the way out.
658+
590659
### Token Store
591660

592661
The `TokenStore` provides the CRUD operations for the `StreamingEventProcessor` to interact with `TrackingTokens`.
@@ -1037,7 +1106,7 @@ public class AxonConfig {
10371106

10381107
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
10391108
(config, builder) -> builder.coordinatorExecutor(coordinatorExecutorBuilder)
1040-
.workerExecutorService(workerExecutorBuilder)
1109+
.workerExecutor(workerExecutorBuilder)
10411110
.initialSegmentCount(32);
10421111

10431112
processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);
@@ -1062,7 +1131,7 @@ public class AxonConfig {
10621131

10631132
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
10641133
(config, builder) -> builder.coordinatorExecutor(coordinatorExecutorBuilder)
1065-
.workerExecutorService(workerExecutorBuilder)
1134+
.workerExecutor(workerExecutorBuilder)
10661135
.initialSegmentCount(32);
10671136

10681137
processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);

axon-framework/tuning/event-snapshots.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ You could take the stance of dropping all the snapshots which are stored (for a
109109
It is also possible to filter out snapshot events when reading your Aggregate from the event store.
110110
To that end, a `SnapshotFilter` can be defined per Aggregate type or for the entire `EventStore`.
111111

112-
113112
The `SnapshotFilter` is a functional interface, providing two main operations: `allow(DomainEventData<?)` and `combine(SnapshotFilter)`.
114113
The former provides the `DomainEventData` which reflects the snapshot events.
115114
The latter allows combining several `SnapshotFilter`s together.
@@ -147,7 +146,24 @@ public class GiftCard {...}
147146
{% endtab %}
148147
{% endtabs %}
149148

149+
The above snippet would be feasible to follow _if_ fine-grained control is required when filtering snapshots from the store.
150+
For example, when your snapshots are not based on the Aggregate class (which is the default).
151+
When this is not required, you can base yourself on the default `SnapshotFilter` - the `RevisionSnapshotFilter`.
152+
153+
To configure this `SnapshotFilter`, all you have to do is use the `@Revision` annotation on your Aggregate class.
154+
In doing so, the `RevisionSnapshotFilter` is set, filtering non-matching snapshots from the `Repository`'s loading process, based on the value maintained within the `@Revision` annotation.
150155

156+
Through this, with every new production deployment of your application that adjusts the Aggregate state, you would only have to adjust the revision value in the annotation.
157+
Check out the following example for how to set this up:
158+
159+
```java
160+
// "1" is an example revision value.
161+
// You're free to choose whatever value that fits your application's versioning scheme.
162+
@Revision("1")
163+
public class GiftCard {
164+
// Omitted aggregate internals for simplicity.
165+
}
166+
```
151167

152168
### Initializing an Aggregate based on a Snapshot Event
153169

release-notes/rn-axon-framework/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ The release notes section for the Axon framework for all major/minor releases.
1111
| | [4.2](rn-af-major-releases.md#release-42) |
1212
| | [4.1](rn-af-major-releases.md#release-41) |
1313
| | [4.0](rn-af-major-releases.md#release-40) |
14-
| _**Minor**_ | [4.5](rn-af-minor-releases.md#release-45) |
14+
| _**Minor**_ | [4.6](rn-af-minor-releases.md#release-46) |
15+
| | [4.5](rn-af-minor-releases.md#release-45) |
1516
| | [4.4](rn-af-minor-releases.md#release-44) |
1617
| | [4.3](rn-af-minor-releases.md#release-43) |
1718
| | [4.2](rn-af-minor-releases.md#release-42) |

0 commit comments

Comments
 (0)