Skip to content

Commit 2fbf892

Browse files
committed
#17 Implement EmbeddedRedisShardedCluster
1 parent d029933 commit 2fbf892

File tree

15 files changed

+561
-35
lines changed

15 files changed

+561
-35
lines changed
Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package io.github.tobi.laa.spring.boot.embedded.redis
22

3-
import io.github.tobi.laa.spring.boot.embedded.redis.conf.RedisConf
43
import org.springframework.context.ApplicationContext
5-
import redis.clients.jedis.JedisPooled
4+
import redis.clients.jedis.UnifiedJedis
65
import redis.embedded.Redis
76
import java.util.concurrent.ConcurrentHashMap
87

@@ -11,11 +10,11 @@ import java.util.concurrent.ConcurrentHashMap
1110
*/
1211
internal object RedisStore {
1312

14-
private val internalStore = ConcurrentHashMap<ApplicationContext, Triple<Redis, RedisConf, JedisPooled>>()
13+
private val internalStore = ConcurrentHashMap<ApplicationContext, Pair<Redis, UnifiedJedis>>()
1514

1615
internal fun computeIfAbsent(
1716
context: ApplicationContext,
18-
supplier: () -> Triple<Redis, RedisConf, JedisPooled>
17+
supplier: () -> Pair<Redis, UnifiedJedis>
1918
) {
2019
internalStore.computeIfAbsent(context) { _ -> supplier.invoke() }
2120
}
@@ -24,11 +23,7 @@ internal object RedisStore {
2423
return internalStore[context]?.first
2524
}
2625

27-
internal fun conf(context: ApplicationContext): RedisConf? {
26+
internal fun client(context: ApplicationContext): UnifiedJedis? {
2827
return internalStore[context]?.second
2928
}
30-
31-
internal fun client(context: ApplicationContext): JedisPooled? {
32-
return internalStore[context]?.third
33-
}
3429
}

src/main/kotlin/io/github/tobi/laa/spring/boot/embedded/redis/junit/extension/RedisFlushAllExtension.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import org.junit.jupiter.api.extension.AfterAllCallback
88
import org.junit.jupiter.api.extension.AfterEachCallback
99
import org.junit.jupiter.api.extension.ExtensionContext
1010
import org.springframework.test.context.junit.jupiter.SpringExtension.getApplicationContext
11+
import redis.clients.jedis.JedisCluster
12+
import redis.clients.jedis.Protocol.Command.FLUSHALL
13+
import redis.clients.jedis.UnifiedJedis
1114

1215
/**
1316
* JUnit 5 extension to flush all Redis data after each test method or after all test methods of a test class.
@@ -33,6 +36,13 @@ internal class RedisFlushAllExtension : AfterEachCallback, AfterAllCallback {
3336

3437
private fun flushAll(extensionContext: ExtensionContext?) {
3538
val applicationContext = getApplicationContext(extensionContext!!)
36-
RedisStore.client(applicationContext)!!.flushAll()
39+
flushAll(RedisStore.client(applicationContext)!!)
40+
}
41+
42+
private fun flushAll(client: UnifiedJedis) {
43+
when (client) {
44+
is JedisCluster -> client.clusterNodes.values.forEach { it.resource.sendCommand(FLUSHALL) }
45+
else -> client.flushAll()
46+
}
3747
}
3848
}

src/main/kotlin/io/github/tobi/laa/spring/boot/embedded/redis/server/RedisServerContextCustomizer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ internal class RedisServerContextCustomizer(
3030
val client = createClient(server, conf)
3131
setSpringProperties(context, server, conf)
3232
addShutdownListener(context, server, client)
33-
Triple(server, conf, client)
33+
Pair(server, client)
3434
}
3535
}
3636

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package io.github.tobi.laa.spring.boot.embedded.redis.shardedcluster
22

33
import io.github.tobi.laa.spring.boot.embedded.redis.junit.extension.RedisFlushAllExtension
4-
import io.github.tobi.laa.spring.boot.embedded.redis.server.RedisServerContextCustomizerFactory
54
import org.junit.jupiter.api.extension.ExtendWith
65
import org.springframework.test.context.ContextCustomizerFactories
6+
import kotlin.reflect.KClass
77

88
/**
99
* Annotation to enable an [embedded Redis sharded cluster][redis.embedded.RedisShardedCluster] for tests.
@@ -12,5 +12,65 @@ import org.springframework.test.context.ContextCustomizerFactories
1212
@Retention(AnnotationRetention.RUNTIME)
1313
@MustBeDocumented
1414
@ExtendWith(RedisFlushAllExtension::class)
15-
@ContextCustomizerFactories(RedisServerContextCustomizerFactory::class)
16-
internal annotation class EmbeddedRedisShardedCluster
15+
@ContextCustomizerFactories(RedisShardedClusterContextCustomizerFactory::class)
16+
annotation class EmbeddedRedisShardedCluster(
17+
18+
/**
19+
* The shards of the Redis cluster. Must contain at least one shard.
20+
*
21+
* @see redis.embedded.core.RedisShardedClusterBuilder.shard
22+
*/
23+
val shards: Array<Shard> = [Shard()],
24+
25+
/**
26+
* The ports to start the nodes of the embedded Redis cluster on. Can be left empty, in which case free ports
27+
* upwards from `6379` will be automatically used.
28+
*
29+
* > **Warning**
30+
* If specified, the number of ports must be equal to the number of nodes in the cluster, that is, the sum of the
31+
* number of master and replica nodes.
32+
*
33+
* @see redis.embedded.core.RedisShardedClusterBuilder.serverPorts
34+
*/
35+
val ports: IntArray = [],
36+
37+
/**
38+
* The time in seconds to wait for the Redis cluster to be initialized. Must be greater than 0.
39+
*
40+
* @see redis.embedded.core.RedisShardedClusterBuilder.initializationTimeout
41+
*/
42+
val initializationTimeout: Long = 20,
43+
44+
/**
45+
* The path to the directory to execute the Redis server nodes in. If set, the Redis executable will be executed in
46+
* the given directory. Applies to all nodes.
47+
*
48+
* @see redis.embedded.core.ExecutableProvider.newJarResourceProvider
49+
*/
50+
val executeInDirectory: String = "",
51+
52+
/**
53+
* Customizes how the shards of the Redis cluster are built. Customizers are executed by their order in this array.
54+
* Each customizer must have no-arg constructor.
55+
*
56+
* @see RedisShardCustomizer
57+
*/
58+
val customizer: Array<KClass<out RedisShardCustomizer>> = []
59+
) {
60+
61+
/**
62+
* A single shard of the Redis cluster.
63+
*/
64+
annotation class Shard(
65+
/**
66+
* The name of the shard. Only relevant for differentiating between shards when supplying customizers. If
67+
* nothing is set, the shard will be given the common english name of a random bird.
68+
*/
69+
val name: String = "",
70+
71+
/**
72+
* The number of replicas for the shard. Must be greater than 0.
73+
*/
74+
val replicas: Int = 2
75+
)
76+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.github.tobi.laa.spring.boot.embedded.redis.shardedcluster
2+
3+
import redis.embedded.core.RedisServerBuilder
4+
5+
/**
6+
* Can be implemented to customize how the shards of the Redis cluster are built.
7+
*
8+
* Implementations _must_ have a no-arg constructor.
9+
*
10+
* @see EmbeddedRedisShardedCluster.customizer
11+
*/
12+
interface RedisShardCustomizer {
13+
14+
/**
15+
* Customizes how the main node of [shard] is built.
16+
*
17+
* @param builder The builder of the main node to customize.
18+
* @param config The configuration of the Redis cluster.
19+
* @param shard The name of the shard whose main node is being built.
20+
*/
21+
fun customizeMainNode(
22+
builder: RedisServerBuilder,
23+
config: EmbeddedRedisShardedCluster,
24+
shard: String
25+
)
26+
27+
/**
28+
* Customizes how the replicase of [shard] are built.
29+
*
30+
* @param builder The builders of the replicas to customize.
31+
* @param config The configuration of the Redis cluster.
32+
* @param shard The name of the shard whose replicas are being built.
33+
*/
34+
fun customizeReplicas(
35+
builder: List<RedisServerBuilder>,
36+
config: EmbeddedRedisShardedCluster,
37+
shard: String
38+
)
39+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package io.github.tobi.laa.spring.boot.embedded.redis.shardedcluster
2+
3+
import io.github.tobi.laa.spring.boot.embedded.redis.RedisStore
4+
import io.github.tobi.laa.spring.boot.embedded.redis.birds.BirdNameProvider
5+
import io.github.tobi.laa.spring.boot.embedded.redis.ports.PortProvider
6+
import org.springframework.boot.test.util.TestPropertyValues
7+
import org.springframework.context.ConfigurableApplicationContext
8+
import org.springframework.context.event.ContextClosedEvent
9+
import org.springframework.test.context.ContextCustomizer
10+
import org.springframework.test.context.MergedContextConfiguration
11+
import redis.clients.jedis.HostAndPort
12+
import redis.clients.jedis.JedisCluster
13+
import redis.clients.jedis.UnifiedJedis
14+
import redis.embedded.Redis
15+
import redis.embedded.RedisServer
16+
import redis.embedded.RedisServer.newRedisServer
17+
import redis.embedded.RedisShardedCluster
18+
import redis.embedded.core.ExecutableProvider
19+
import redis.embedded.core.RedisServerBuilder
20+
import java.io.File
21+
import java.time.Duration
22+
import java.util.stream.IntStream.range
23+
import kotlin.reflect.full.createInstance
24+
import kotlin.streams.toList
25+
26+
private const val CLUSTER_IP = "127.0.0.1"
27+
28+
internal class RedisShardedClusterContextCustomizer(
29+
private val config: EmbeddedRedisShardedCluster,
30+
private val portProvider: PortProvider = PortProvider()
31+
) : ContextCustomizer {
32+
33+
override fun customizeContext(context: ConfigurableApplicationContext, mergedConfig: MergedContextConfiguration) {
34+
RedisStore.computeIfAbsent(context) {
35+
val cluster = createAndStartCluster()
36+
val addresses = parseAddresses(cluster)
37+
val client = createClient(addresses)
38+
setSpringProperties(context, addresses)
39+
addShutdownListener(context, cluster, client)
40+
Pair(cluster, client)
41+
}
42+
}
43+
44+
private fun createAndStartCluster(): RedisShardedCluster {
45+
val cluster = createCluster()
46+
cluster.start()
47+
return cluster
48+
}
49+
50+
private fun createCluster(): RedisShardedCluster {
51+
val ports = ports().iterator()
52+
val shards = config.shards.map { createShard(it, ports) }
53+
val nodes = shards.map { it.second + it.first }.flatten().toList()
54+
val replicasPortsByMainNodePort =
55+
shards.associate { s -> s.first.ports().first() to s.second.map { it.ports() }.flatten().toSet() }
56+
val cluster =
57+
RedisShardedCluster(nodes, replicasPortsByMainNodePort, Duration.ofSeconds(config.initializationTimeout))
58+
return cluster
59+
}
60+
61+
private fun ports(): List<Int> {
62+
return if (config.ports.isEmpty()) {
63+
val nOfNodes = config.shards.map { it.replicas + 1 }.sum()
64+
range(0, nOfNodes).map { _ -> portProvider.next() }.toList()
65+
} else {
66+
config.ports.asList()
67+
}
68+
}
69+
70+
private fun createShard(
71+
shard: EmbeddedRedisShardedCluster.Shard,
72+
ports: Iterator<Int>
73+
): Pair<RedisServer, List<RedisServer>> {
74+
val name = shard.name.ifEmpty { BirdNameProvider.next() }
75+
76+
val mainNodeBuilder = createNodeBuilder(ports.next())
77+
config.customizer.forEach { c -> c.createInstance().customizeMainNode(mainNodeBuilder, config, name) }
78+
79+
val replicaBuilders = range(0, shard.replicas).mapToObj { _ -> createNodeBuilder(ports.next()) }.toList()
80+
config.customizer.forEach { c -> c.createInstance().customizeReplicas(replicaBuilders, config, name) }
81+
82+
return mainNodeBuilder.build() to replicaBuilders.map { it.build() }
83+
}
84+
85+
private fun createNodeBuilder(port: Int): RedisServerBuilder {
86+
val builder = newRedisServer()
87+
.bind(CLUSTER_IP)
88+
.port(port)
89+
.setting("cluster-enabled yes")
90+
.setting("cluster-config-file nodes-replica-$port.conf")
91+
.setting("cluster-node-timeout 5000")
92+
.setting("appendonly no")
93+
if (config.executeInDirectory.isNotEmpty()) {
94+
builder.executableProvider(ExecutableProvider.newJarResourceProvider(File(config.executeInDirectory)))
95+
}
96+
return builder
97+
}
98+
99+
private fun parseAddresses(cluster: RedisShardedCluster): List<Pair<String, Int>> =
100+
cluster.servers()
101+
.map { CLUSTER_IP to it.ports().first() }
102+
.toList()
103+
104+
private fun createClient(addresses: List<Pair<String, Int>>): JedisCluster {
105+
return JedisCluster(addresses.map { HostAndPort(it.first, it.second) }.first())
106+
}
107+
108+
private fun setSpringProperties(context: ConfigurableApplicationContext, addresses: List<Pair<String, Int>>) {
109+
TestPropertyValues.of(
110+
mapOf(
111+
"spring.data.redis.cluster.nodes" to addresses.joinToString(",") { "${it.first}:${it.second}" }
112+
)
113+
).applyTo(context.environment)
114+
}
115+
116+
private fun addShutdownListener(context: ConfigurableApplicationContext, server: Redis, client: UnifiedJedis) {
117+
context.addApplicationListener { event ->
118+
if (event is ContextClosedEvent) {
119+
client.close()
120+
server.stop()
121+
}
122+
}
123+
}
124+
125+
override fun equals(other: Any?): Boolean {
126+
if (this === other) return true
127+
if (javaClass != other?.javaClass) return false
128+
129+
other as RedisShardedClusterContextCustomizer
130+
131+
return config == other.config
132+
}
133+
134+
override fun hashCode(): Int {
135+
return config.hashCode()
136+
}
137+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.github.tobi.laa.spring.boot.embedded.redis.shardedcluster
2+
3+
import io.github.tobi.laa.spring.boot.embedded.redis.findTestClassAnnotation
4+
import org.springframework.test.context.ContextConfigurationAttributes
5+
import org.springframework.test.context.ContextCustomizer
6+
import org.springframework.test.context.ContextCustomizerFactory
7+
8+
internal class RedisShardedClusterContextCustomizerFactory : ContextCustomizerFactory {
9+
10+
override fun createContextCustomizer(
11+
testClass: Class<*>,
12+
configAttributes: MutableList<ContextConfigurationAttributes>
13+
): ContextCustomizer {
14+
val embeddedRedisShardedCluster = findTestClassAnnotation(testClass, EmbeddedRedisShardedCluster::class.java)
15+
return RedisShardedClusterContextCustomizer(embeddedRedisShardedCluster!!)
16+
}
17+
}

src/test/kotlin/io/github/tobi/laa/spring/boot/embedded/redis/IntegrationTest.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.github.tobi.laa.spring.boot.embedded.redis
22

33
import org.springframework.boot.test.context.SpringBootTest
4+
import org.springframework.test.annotation.DirtiesContext
45

56
/**
67
* Common annotation shared by all integration tests
@@ -9,4 +10,5 @@ import org.springframework.boot.test.context.SpringBootTest
910
@Retention(AnnotationRetention.RUNTIME)
1011
@MustBeDocumented
1112
@SpringBootTest
13+
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
1214
annotation class IntegrationTest

src/test/kotlin/io/github/tobi/laa/spring/boot/embedded/redis/RedisStoreTest.kt

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,6 @@ internal class RedisStoreTest {
2121
Assertions.assertThat(RedisStore.server(context)).isNull()
2222
}
2323

24-
@Test
25-
@DisplayName("Unknown application context should yield null conf")
26-
fun unknownAppContext_confIsNull() {
27-
Assertions.assertThat(RedisStore.conf(context)).isNull()
28-
}
29-
3024
@Test
3125
@DisplayName("Unknown application context should yield null client")
3226
fun unknownAppContext_clientIsNull() {

0 commit comments

Comments
 (0)