Skip to content

Commit 697dd40

Browse files
authored
Overhaul partition handling (#82)
* GH-75 initial commit for overhauling partition handling * GH-75 initial commit for overhauling partition handling * GH-75 add optimization for locking and introduce partition lock table * GH-75 add more comments and fix tests in core module * GH-75 add outbox.rebalance-interval to spring-configuration-metadata * GH-75 add integration tests and improved partition handling * GH-75 fix tests * GH-75 remove lock entity and schemas * GH-75 separate integration tests * GH-75 add tests * GH-75 fix run configs [skip ci] * GH-75 cleanup and improve logging and introduce separate ThreadPoolTaskExecutors for heartbeat and rebalancing * GH-75 refactored partition handling * GH-75 cleanup and some more tests * GH-75 linting * GH-75 add flyway schema migrations to examples remove oracle support and add some tests * GH-75 remove pessimistic locking and add comments about potential SqlExceptionHelper messages in log --------- Authored-by: Roland Beisel <info@rolandbeisel.de>
1 parent d62217e commit 697dd40

File tree

110 files changed

+2795
-2075
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+2795
-2075
lines changed

.run/DemoApplication Oracle.run.xml renamed to .run/DemoApplication Multicaster.run.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<component name="ProjectRunConfigurationManager">
2-
<configuration default="false" name="DemoApplication Oracle" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot">
3-
<module name="namastack-outbox-example-oracle.main" />
2+
<configuration default="false" name="DemoApplication Multicaster" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot">
3+
<module name="namastack-outbox-example-multicaster.main" />
44
<option name="SPRING_BOOT_MAIN_CLASS" value="io.namastack.demo.DemoApplication" />
55
<method v="2">
66
<option name="Make" enabled="true" />

.run/OutboxPerformanceTestProcessorApplication 1.run.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<envs>
44
<env name="SERVER_PORT" value="9001" />
55
</envs>
6-
<module name="namastack-outbox-performance-test.namastack-outbox-performance-test-processor.main" />
6+
<module name="namastack-outbox-performance-tests.namastack-outbox-performance-test-processor.main" />
77
<option name="SPRING_BOOT_MAIN_CLASS" value="io.namastack.performance.OutboxPerformanceTestProcessorApplication" />
88
<extension name="coverage">
99
<pattern>

.run/OutboxPerformanceTestProcessorApplication 2.run.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<envs>
44
<env name="SERVER_PORT" value="9002" />
55
</envs>
6-
<module name="namastack-outbox-performance-test.namastack-outbox-performance-test-processor.main" />
6+
<module name="namastack-outbox-performance-tests.namastack-outbox-performance-test-processor.main" />
77
<option name="SPRING_BOOT_MAIN_CLASS" value="io.namastack.performance.OutboxPerformanceTestProcessorApplication" />
88
<extension name="coverage">
99
<pattern>

.run/OutboxPerformanceTestProcessorApplication 3.run.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<envs>
44
<env name="SERVER_PORT" value="9003" />
55
</envs>
6-
<module name="namastack-outbox-performance-test.namastack-outbox-performance-test-processor.main" />
6+
<module name="namastack-outbox-performance-tests.namastack-outbox-performance-test-processor.main" />
77
<option name="SPRING_BOOT_MAIN_CLASS" value="io.namastack.performance.OutboxPerformanceTestProcessorApplication" />
88
<extension name="coverage">
99
<pattern>

.run/OutboxPerformanceTestProducerApplication.run.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<component name="ProjectRunConfigurationManager">
22
<configuration default="false" name="OutboxPerformanceTestProducerApplication" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" nameIsGenerated="true">
3-
<module name="namastack-outbox-performance-test.namastack-outbox-performance-test-producer.main" />
3+
<module name="namastack-outbox-performance-tests.namastack-outbox-performance-test-producer.main" />
44
<option name="SPRING_BOOT_MAIN_CLASS" value="io.namastack.performance.OutboxPerformanceTestProducerApplication" />
55
<extension name="coverage">
66
<pattern>

namastack-outbox-core/src/main/kotlin/io/namastack/outbox/OutboxCoreAutoConfiguration.kt

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package io.namastack.outbox
22

3+
import io.namastack.outbox.instance.OutboxInstanceRegistry
4+
import io.namastack.outbox.instance.OutboxInstanceRepository
5+
import io.namastack.outbox.partition.PartitionAssignmentRepository
36
import io.namastack.outbox.partition.PartitionCoordinator
47
import io.namastack.outbox.retry.OutboxRetryPolicy
58
import io.namastack.outbox.retry.OutboxRetryPolicyFactory
@@ -14,6 +17,7 @@ import org.springframework.context.annotation.Bean
1417
import org.springframework.context.event.SimpleApplicationEventMulticaster
1518
import org.springframework.core.task.TaskExecutor
1619
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
20+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
1721
import java.time.Clock
1822

1923
/**
@@ -40,11 +44,10 @@ class OutboxCoreAutoConfiguration {
4044
fun clock(): Clock = Clock.systemDefaultZone()
4145

4246
/**
43-
* Provides a configurable ThreadPoolTaskExecutor for parallel processing of aggregateIds.
47+
* Provides a ThreadPoolTaskExecutor for parallel processing of aggregateIds.
4448
*
45-
* The pool size can be configured via OutboxProperties. This executor is used by the
46-
* OutboxProcessingScheduler to process multiple aggregateIds in parallel while maintaining
47-
* strict ordering per aggregateId.
49+
* The pool size is configurable via OutboxProperties. Used by OutboxProcessingScheduler
50+
* to process multiple aggregateIds in parallel while maintaining strict ordering per aggregateId.
4851
*
4952
* @param properties Outbox configuration properties
5053
* @return Configured ThreadPoolTaskExecutor
@@ -56,11 +59,46 @@ class OutboxCoreAutoConfiguration {
5659
executor.corePoolSize = properties.processing.executorCorePoolSize
5760
executor.maxPoolSize = properties.processing.executorMaxPoolSize
5861
executor.setThreadNamePrefix("outbox-proc-")
62+
executor.setWaitForTasksToCompleteOnShutdown(true)
5963
executor.initialize()
6064

6165
return executor
6266
}
6367

68+
/**
69+
* Scheduler for general outbox tasks (e.g. batch processing).
70+
*
71+
* Pool size is set to 5 by default. Only used internally by the outbox library.
72+
*
73+
* @return ThreadPoolTaskScheduler for outbox jobs
74+
*/
75+
@Bean
76+
@ConditionalOnMissingBean(name = ["outboxDefaultScheduler"])
77+
fun outboxDefaultScheduler(): ThreadPoolTaskScheduler =
78+
ThreadPoolTaskScheduler().apply {
79+
poolSize = 5
80+
threadNamePrefix = "outbox-scheduler-"
81+
setWaitForTasksToCompleteOnShutdown(true)
82+
initialize()
83+
}
84+
85+
/**
86+
* Scheduler for heartbeat and rebalance tasks.
87+
*
88+
* Pool size is set to 1. Used for periodic signals and partition rebalancing.
89+
*
90+
* @return ThreadPoolTaskScheduler for heartbeat/rebalance jobs
91+
*/
92+
@Bean
93+
@ConditionalOnMissingBean(name = ["outboxRebalancingScheduler"])
94+
fun outboxRebalancingScheduler(): ThreadPoolTaskScheduler =
95+
ThreadPoolTaskScheduler().apply {
96+
poolSize = 1
97+
threadNamePrefix = "outbox-rebalancing-"
98+
setWaitForTasksToCompleteOnShutdown(true)
99+
initialize()
100+
}
101+
64102
/**
65103
* Creates a retry policy based on configuration properties.
66104
*
@@ -91,21 +129,30 @@ class OutboxCoreAutoConfiguration {
91129
/**
92130
* Creates the partition coordinator for managing partition assignments.
93131
*
94-
* @param instanceRegistry Registry for managing instances
95-
* @return PartitionCoordinator bean
132+
* @param instanceRegistry Registry for active instances and current instance identification
133+
* @param partitionAssignmentRepository Repository for persisting partition assignments
134+
* @param clock Clock for timestamp generation
135+
* @return PartitionCoordinator bean for managing partition lifecycle
96136
*/
97137
@Bean
98138
@ConditionalOnMissingBean
99-
fun partitionCoordinator(instanceRegistry: OutboxInstanceRegistry): PartitionCoordinator =
100-
PartitionCoordinator(instanceRegistry)
139+
fun partitionCoordinator(
140+
instanceRegistry: OutboxInstanceRegistry,
141+
partitionAssignmentRepository: PartitionAssignmentRepository,
142+
clock: Clock,
143+
): PartitionCoordinator =
144+
PartitionCoordinator(
145+
instanceRegistry = instanceRegistry,
146+
partitionAssignmentRepository = partitionAssignmentRepository,
147+
clock = clock,
148+
)
101149

102150
/**
103151
* Creates the partition-aware outbox processing scheduler.
104152
*
105153
* @param recordRepository Repository for accessing outbox records
106154
* @param recordProcessor Processor for handling individual records
107155
* @param partitionCoordinator Coordinator for partition assignments
108-
* @param instanceRegistry Registry for instance management
109156
* @param taskExecutor TaskExecutor for parallel processing of aggregateIds
110157
* @param retryPolicy Policy for determining retry behavior
111158
* @param properties Configuration properties
@@ -118,18 +165,15 @@ class OutboxCoreAutoConfiguration {
118165
recordRepository: OutboxRecordRepository,
119166
recordProcessor: OutboxRecordProcessor,
120167
partitionCoordinator: PartitionCoordinator,
121-
instanceRegistry: OutboxInstanceRegistry,
122168
retryPolicy: OutboxRetryPolicy,
123169
properties: OutboxProperties,
124-
@Qualifier("outboxTaskExecutor")
125-
taskExecutor: TaskExecutor,
170+
@Qualifier("outboxTaskExecutor") taskExecutor: TaskExecutor,
126171
clock: Clock,
127172
): OutboxProcessingScheduler =
128173
OutboxProcessingScheduler(
129174
recordRepository = recordRepository,
130175
recordProcessor = recordProcessor,
131176
partitionCoordinator = partitionCoordinator,
132-
instanceRegistry = instanceRegistry,
133177
retryPolicy = retryPolicy,
134178
properties = properties,
135179
taskExecutor = taskExecutor,

namastack-outbox-core/src/main/kotlin/io/namastack/outbox/OutboxEventMulticaster.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,13 @@ class OutboxEventMulticaster(
6969
eventType,
7070
)
7171

72-
log.debug("Saving @OutboxEvent to outbox: ${payload::class.simpleName}")
72+
val classSimpleName = payload::class.simpleName
73+
74+
log.debug("Saving @OutboxEvent to outbox: $classSimpleName")
7375
saveOutboxRecord(payload, annotation)
7476

7577
if (outboxProperties.processing.publishAfterSave) {
76-
log.debug("Publishing @OutboxEvent to listeners: ${payload::class.simpleName}")
78+
log.debug("Publishing @OutboxEvent to listeners: $classSimpleName")
7779
delegateEventMulticaster.multicastEvent(event, eventType)
7880
}
7981
}

0 commit comments

Comments
 (0)