Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ plugins {
dependencies {
jacocoAggregation(project(":namastack-outbox-actuator"))
jacocoAggregation(project(":namastack-outbox-core"))
jacocoAggregation(project(":namastack-outbox-kafka"))
jacocoAggregation(project(":namastack-outbox-jackson"))
jacocoAggregation(project(":namastack-outbox-jpa"))
jacocoAggregation(project(":namastack-outbox-metrics"))
jacocoAggregation(project(":namastack-outbox-starter-jpa"))
Expand All @@ -29,7 +31,7 @@ val isRelease = project.hasProperty("release") && project.property("release") ==

allprojects {
group = "io.namastack"
version = "0.3.0" + if (!isRelease) "-SNAPSHOT" else ""
version = "0.4.0" + if (!isRelease) "-SNAPSHOT" else ""

repositories {
mavenLocal()
Expand Down
3 changes: 3 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ publish = "0.35.0"
hibernate = "7.1.7.Final"
springOrm = "6.2.12"
springTx = "6.2.12"
springKafka = "4.0.0"
h2 = "2.4.240"
mockk = "1.14.6"
assertj = "3.27.6"
Expand All @@ -28,6 +29,8 @@ spring-boot-starter-logging = { module = "org.springframework.boot:spring-boot-s
spring-boot-starter-test = { module = "org.springframework.boot:spring-boot-starter-test", version.ref = "springBoot" }
spring-boot-starter-data-jpa = { module = "org.springframework.boot:spring-boot-starter-data-jpa", version.ref = "springBoot" }
spring-boot-actuator = { module = "org.springframework.boot:spring-boot-actuator", version.ref = "springBoot" }
spring-kafka = { module = "org.springframework.kafka:spring-kafka", version.ref = "springKafka" }
spring-kafka-test = { module = "org.springframework.kafka:spring-kafka-test", version.ref = "springKafka" }
spring-tx = { module = "org.springframework:spring-tx", version.ref = "springTx" }

# Jakarta dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.namastack.outbox
import io.namastack.outbox.partition.PartitionCoordinator
import io.namastack.outbox.retry.OutboxRetryPolicy
import io.namastack.outbox.retry.OutboxRetryPolicyFactory
import io.namastack.outbox.routing.RoutingConfiguration
import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.AutoConfiguration
Expand Down Expand Up @@ -162,4 +163,16 @@ class OutboxCoreAutoConfiguration {
outboxProperties = outboxProperties,
clock = clock,
)

/**
* Creates a default routing configuration if none is provided.
*
* The default configuration routes all events using their eventType as the target
* and the aggregateId as the partition key to maintain strict ordering per aggregateId.
*
* @return Default RoutingConfiguration bean
*/
@Bean
@ConditionalOnMissingBean
fun routingConfiguration(): RoutingConfiguration = RoutingConfiguration.default()
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ class OutboxRecord internal constructor(
}

companion object {
/**
* Creates a new builder for constructing OutboxRecord instances.
*
* @return A new Builder instance
*/
@JvmStatic
fun builder() = Builder()

/**
* Restores an OutboxRecord from persisted data.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package io.namastack.outbox.routing

/**
* Configuration for event routing in the outbox pattern.
*
* Defines routing rules for different event types. Each event type can have:
* - A specific routing rule (via .route())
* - A wildcard routing rule as fallback (via .routeAll())
*
* @author Roland Beisel
* @since 0.4.0
*/
class RoutingConfiguration private constructor(
internal val routes: Map<String, RoutingRule>,
) {
/**
* Gets the routing rule for the given event type.
*
* Returns the specific route for the event type, or falls back to the wildcard route.
* If neither exists, throws an IllegalArgumentException.
*
* @param eventType The event type to get the routing rule for
* @return The matching RoutingRule
* @throws IllegalArgumentException if no route found for event type and no wildcard route configured
*/
fun getRoute(eventType: String): RoutingRule =
routes[eventType]
?: routes[WILDCARD_ROUTE_KEY]
?: throw IllegalArgumentException(
"No routing rule found for event type '$eventType'. " +
"Configure with .route(\"$eventType\") { ... } or use .routeAll() as fallback",
)

/**
* Builder for constructing RoutingConfiguration instances with fluent API.
*
* @author Roland Beisel
* @since 0.4.0
*/
class Builder {
private val routes = mutableMapOf<String, RoutingRule>()

/**
* Defines a routing rule for a specific event type.
*
* If called multiple times with the same event type, the last definition wins.
*
* @param eventType The event type to configure (e.g., "order.created")
* @param builder Lambda to configure the routing rule
* @return This builder for method chaining
*/
fun route(
eventType: String,
builder: RoutingRule.Builder.() -> Unit,
): Builder {
val rule = RoutingRule.Builder().apply(builder).build()
routes[eventType] = rule

return this
}

/**
* Defines a wildcard routing rule that applies to all event types without a specific rule.
*
* This acts as a fallback/default route. If called multiple times, the last definition wins.
*
* @param builder Lambda to configure the routing rule
* @return This builder for method chaining
*/
fun routeAll(builder: RoutingRule.Builder.() -> Unit): Builder {
val rule = RoutingRule.Builder().apply(builder).build()
routes[WILDCARD_ROUTE_KEY] = rule

return this
}

/**
* Builds the RoutingConfiguration.
*
* @return A new immutable RoutingConfiguration instance
*/
fun build(): RoutingConfiguration = RoutingConfiguration(routes)
}

/**
* Companion object providing factory methods and constants.
*
* @author Roland Beisel
* @since 0.4.0
*/
companion object {
/**
* Key used to store the wildcard routing rule.
*
* The wildcard key "*" is used as a fallback when no specific route is configured
* for an event type.
*/
const val WILDCARD_ROUTE_KEY = "*"

/**
* Creates a new builder for configuring RoutingConfiguration.
*
* @return A new Builder instance for fluent configuration
*/
@JvmStatic
fun builder() = Builder()

/**
* Creates a default RoutingConfiguration with sensible defaults.
*
* The default configuration:
* - Routes all events using their eventType as the target/topic
* - Sets the aggregateId as the partition key to maintain ordering
* - Uses the raw payload without transformation
*
* @return A new RoutingConfiguration with default routing rules
*/
fun default(): RoutingConfiguration =
builder()
.routeAll {
target { record ->
RoutingTarget
.forTarget(record.eventType)
.withKey(record.aggregateId)
}
mapper { record -> record.payload }
}.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.namastack.outbox.routing

import io.namastack.outbox.OutboxRecord

/**
* Represents a routing rule for an outbox record.
*
* A routing rule defines:
* - How to map/transform the event payload
* - Which headers to attach to the message
* - Where to route the event (target and optional key)
*
* @author Roland Beisel
* @since 0.4.0
*/
class RoutingRule private constructor(
val mapper: (OutboxRecord) -> Any,
val headers: (OutboxRecord) -> Map<String, String>,
val target: (OutboxRecord) -> RoutingTarget,
) {
/**
* Builder for creating RoutingRule instances with fluent API.
*/
class Builder {
private var mapperFn: (OutboxRecord) -> Any = { it.payload }
private var headersFn: (OutboxRecord) -> Map<String, String> = { emptyMap() }
private var targetFn: ((OutboxRecord) -> RoutingTarget)? = null

/**
* Sets the mapper function to transform the outbox record payload.
*
* @param fn Function to transform OutboxRecord to Any
* @return This builder for chaining
*/
fun mapper(fn: (OutboxRecord) -> Any) =
apply {
this.mapperFn = fn
}

/**
* Adds headers to the routing rule.
*
* Can be called multiple times. Headers are merged (later calls override earlier ones).
*
* @param fn Function to extract headers from OutboxRecord
* @return This builder for chaining
*/
fun headers(fn: (OutboxRecord) -> Map<String, String>) =
apply {
val previous = headersFn
headersFn = { record ->
previous.invoke(record) + fn.invoke(record)
}
}

/**
* Adds a single header to the routing rule.
*
* Can be called multiple times. Later calls override earlier ones for same key.
*
* @param key The header key
* @param value The header value
* @return This builder for chaining
*/
fun header(
key: String,
value: String,
) = apply {
val previous = headersFn
headersFn = { record ->
previous.invoke(record) + (key to value)
}
}

/**
* Sets the routing target using a custom function.
*
* @param fn Function to determine RoutingTarget from OutboxRecord
* @return This builder for chaining
*/
fun target(fn: (OutboxRecord) -> RoutingTarget) =
apply {
targetFn = fn
}

/**
* Sets the routing target with a fixed destination and optional key.
*
* @param targetValue The target destination (e.g., "kafka:orders")
* @param key Optional partition key
* @return This builder for chaining
*/
fun target(
targetValue: String,
key: String? = null,
) = apply {
targetFn = { RoutingTarget(targetValue, key) }
}

/**
* Builds the RoutingRule.
*
* @return A new RoutingRule instance
* @throws IllegalStateException if target is not configured
*/
fun build(): RoutingRule {
val targetFunction =
targetFn
?: error("RoutingRule configuration incomplete: target must be set via .target() method")

return RoutingRule(
mapper = mapperFn,
headers = headersFn,
target = targetFunction,
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.namastack.outbox.routing

/**
* Represents a routing destination for an outbox record.
*
* A routing target specifies where an event should be published, including:
* - The target destination (e.g., "kafka:orders-topic" or "amqp:user.events.queue")
* - An optional key/partition key for message ordering or partitioning
*
* Headers are configured separately in EventExternalizationConfiguration.
*
* @param target The destination identifier (e.g., topic name, queue name)
* @param key Optional partition/message key for ordering or distribution
*
* @author Roland Beisel
* @since 0.4.0
*/
data class RoutingTarget(
val target: String,
val key: String? = null,
) {
/**
* Creates a new RoutingTarget with the given key.
*
* @param key The partition/message key
* @return A new RoutingTarget instance with updated key
*/
fun withKey(key: String): RoutingTarget = copy(key = key)

companion object {
/**
* Creates a RoutingTarget with only a target.
*/
@JvmStatic
fun forTarget(target: String): RoutingTarget = RoutingTarget(target = target)
}
}
Loading
Loading