|
| 1 | +:linkattrs: |
| 2 | +:project-owner: redis-field-engineering |
| 3 | +:project-name: redis-streams-java |
| 4 | +:project-group: com.redis |
| 5 | +:project-version: 0.3.5 |
| 6 | +:dist-repo-name: redis-streams-java-dist |
| 7 | +:name: Redis Streams Java |
| 8 | +:toc: |
| 9 | +:toc-title: |
| 10 | +:toc-placement!: |
| 11 | + |
| 12 | += {name} |
| 13 | + |
| 14 | +{name} is a Java library that provides a topic abstraction over Redis streams. |
| 15 | +Each topic is backed by one or more Redis streams. As you publish messages to a topic, a new stream |
| 16 | +is automatically created as soon as the current stream grows beyond a predefined size. This allows |
| 17 | +streams to scale linearly across the shards of your Redis cluster. |
| 18 | + |
| 19 | +The library provides core functionality for writing to a distributed Redis Stream as well as reading |
| 20 | +from a distributed Redis Stream in Redis. This works in all Redis deployment scenarios: |
| 21 | + |
| 22 | +* A Pair of Redis Enterprise or Redis Cloud Active-Active Clusters |
| 23 | +* Redis Enterprise or Redis Cloud Instance. |
| 24 | +* Single Instance of Redis Stack |
| 25 | + |
| 26 | +In fact the library takes special care to ensure that reading from a Consumer Group of a single topic across two Active-Active clusters |
| 27 | +is done so in a way that honors the ordering and pending entry list of the consumer group consistently. |
| 28 | + |
| 29 | +[discrete] |
| 30 | +== Table of Contents |
| 31 | +toc::[] |
| 32 | + |
| 33 | + |
| 34 | +== Requirements |
| 35 | + |
| 36 | +{name} requires a https://javadoc.io/doc/redis.clients/jedis/5.0.1/redis/clients/jedis/JedisPooled.html[`JedisPooled`] connection object. You may want to tune your `JedisPooled` instance for your own needs. |
| 37 | + |
| 38 | +== Quick start |
| 39 | + |
| 40 | +== Installation |
| 41 | + |
| 42 | +To run {name} in production, you'll need one of the following: |
| 43 | + |
| 44 | +* A https://redis.io/docs/stack/[Redis Stack] instance |
| 45 | +* https://redis.com/redis-enterprise-cloud/overview/[Redis Cloud] |
| 46 | +* https://redis.com/redis-enterprise-software/overview/[Redis Enterprise] deployment |
| 47 | + |
| 48 | +=== {name} Connector |
| 49 | + |
| 50 | +Next, you'll need to install the {name} plugin and configure it. |
| 51 | + |
| 52 | +=== Redis installation |
| 53 | + |
| 54 | +For a self-managed deployment, or for testing locally, install https://redis.io/docs/stack/[Redis Stack] or spin up a free https://redis.com/try-free/[Redis Cloud] instance. |
| 55 | +If you need a fully-managed, cloud-based deployment of Redis on AWS, GCP, or Azure, see all the https://redis.com/redis-enterprise-cloud/overview/[Redis Cloud] offerings. |
| 56 | +For deployment in your own private cloud or data center, consider https://redis.com/redis-enterprise-software/overview/[Redis Enterprise]. |
| 57 | + |
| 58 | +== Documentation |
| 59 | + |
| 60 | +== Add the library to your project |
| 61 | + |
| 62 | +To install the library simply add the following: |
| 63 | + |
| 64 | +=== Maven |
| 65 | +Add the following to your `pom.xml` file: |
| 66 | + |
| 67 | +[source,xml] |
| 68 | +[subs="verbatim,attributes"] |
| 69 | +.pom.xml |
| 70 | +---- |
| 71 | +<dependency> |
| 72 | + <groupId>{project-group}</groupId> |
| 73 | + <artifactId>{project-name}</artifactId> |
| 74 | + <version>{project-version}</version> |
| 75 | +</dependency> |
| 76 | +---- |
| 77 | + |
| 78 | +=== Gradle |
| 79 | + |
| 80 | +Add the following to your `build.gradle` file |
| 81 | + |
| 82 | +[source,groovy] |
| 83 | +[subs="attributes"] |
| 84 | +.build.gradle |
| 85 | +---- |
| 86 | +dependencies { |
| 87 | + implementation '{project-group}:{project-name}:{project-version}' |
| 88 | +} |
| 89 | +---- |
| 90 | + |
| 91 | +== Usage |
| 92 | + |
| 93 | +=== Components |
| 94 | + |
| 95 | +Generically, there are three primary components of the streams library |
| 96 | + |
| 97 | +1. `TopicManager` - Oversees the topics, tracks the status of consumers within the library. |
| 98 | +2. `ConsumerGroup` - Manages and coordinates consumption from topics. |
| 99 | +3. `TopicProducer` - Produces messages for the topic. |
| 100 | + |
| 101 | + |
| 102 | +==== Jedis Connection |
| 103 | +To connect any of these key components you will need a `JedisPooled` connection to Redis. |
| 104 | + |
| 105 | +==== TopicManager |
| 106 | + |
| 107 | +To initialize the `TopicManager`, simply pass in your Jedis Connection along with a topic manager configuration to `createTopic`. |
| 108 | + |
| 109 | + |
| 110 | +```java |
| 111 | +JedisPooled jedis = new JedisPooled("redis://localhost:6379"); |
| 112 | +SerialTopicConfig config = new SerialTopicConfig("my-topic"); |
| 113 | +TopicManager topicManager = TopicManager.createTopic(jedis, config); |
| 114 | +``` |
| 115 | + |
| 116 | +This will create the topic manager, as well as creating the items in Redis required to support the topic. |
| 117 | + |
| 118 | +==== TopicProducer |
| 119 | + |
| 120 | +The next step is to create a `TopicProducer`, to create one, simply pass in your Jedis Connection along with the topic name to the `TopicProducer` constructor. |
| 121 | + |
| 122 | +```java |
| 123 | +Producer producer = new TopicProducer(jedis, "my-topic"); |
| 124 | +``` |
| 125 | + |
| 126 | +==== ConsumerGroup |
| 127 | + |
| 128 | +The final item to create is a `ConsumerGroup`, the consumer group is responsible for coordinating consumption of the topic. To create a `ConsumerGroup`, pass your Jedis Connection along with the topic name and your consumer name into the `ConsumerGroup` constructor |
| 129 | + |
| 130 | +```java |
| 131 | +ConsumerGroup consumerGroup = new ConsumerGroup(jedis, "my-topic", "test-group"); |
| 132 | +``` |
| 133 | + |
| 134 | +== Producing messages |
| 135 | + |
| 136 | +To add a message to the topic, simply pass a `Map<String,String>` into the `TopicProducer` |
| 137 | + |
| 138 | +``` |
| 139 | +Map<String, String> msg = Map.of("temp", "81", "humidity", "0.92", "city", "Satellite Beach"); |
| 140 | +producer.produce(msg); |
| 141 | +``` |
| 142 | + |
| 143 | +== Consuming messages |
| 144 | + |
| 145 | +To consume messages, simply call `consume` on you consumer group, passing in your consumer name: |
| 146 | + |
| 147 | +TopicEntry entry = consumerGroup.consume("my-consumer"); |
| 148 | + |
| 149 | +The message contains: |
| 150 | +1. The message id (the monotonic id created by Redis when the message was produced) |
| 151 | +2. The Stream the message was read from |
| 152 | +3. The message itself |
| 153 | + |
| 154 | +=== Acknowledge Messages |
| 155 | + |
| 156 | +After you have consumed a message, you must then acknowledge it, to do so, simply call `acknowledge` passing in the `AckMessage` constructed from the `TopicEntry` received from consuming a message. |
| 157 | + |
| 158 | +```java |
| 159 | +TopicEntry entry = consumerGroup.consume("test-consumer"); |
| 160 | +// Some extra processing |
| 161 | +// ... |
| 162 | +consumerGroup.acknowledge(new AckMessage(entry)); |
| 163 | +``` |
| 164 | + |
| 165 | +== Get Pending Messages |
| 166 | + |
| 167 | +If your application is unable to acknowledge the message (for example if the process died during processing), the messages remain in a pending state, you can acquire any pending messages using the `TopicManager`. |
| 168 | +Then you can acknowledge those messages using the consumerGroup: |
| 169 | + |
| 170 | +```java |
| 171 | +List<PendingEntry> pendingEntryList = topicManager.getPendingEntries("my-group", query); |
| 172 | +consumerGroup.acknowledge(new AckMessage(pendingEntryList.get(0))); |
| 173 | +``` |
| 174 | + |
| 175 | +== Checking Consumer Stats |
| 176 | + |
| 177 | +If you want to keep an eye on what is going on with your topic, and the consumer groups within the topic, you can use the use the `TopicManager`'s `getConsumerGroupStats` method: |
| 178 | + |
| 179 | +```java |
| 180 | +ConsumerGroupStatus stats = topicManager.getConsumerGroupStatus("my-group"); |
| 181 | +System.out.printf("Consumer Group Name: %s%n", stats.getGroupName()); |
| 182 | +System.out.printf("Consumer Group Topic Size: %d%n", stats.getTopicEntryCount()); |
| 183 | +System.out.printf("Consumer Group Pending Entries: %d%n", stats.getPendingEntryCount()); |
| 184 | +System.out.printf("Consumer Group Lag: %d%n", stats.getConsumerLag()); |
| 185 | +``` |
| 186 | + |
| 187 | +== NOACK Consumer Group |
| 188 | + |
| 189 | +A `NoAckConsumerGroup` implementation exists which allows you to read from a stream in the context of a |
| 190 | +consumer group with no need to acknowledge any messages that you retrieved from the stream. This is useful when |
| 191 | +you want to ensure "exactly once" delivery semantics and are comfortable losing a message if something |
| 192 | +happens after the entry is delivered. To utilize this, just initialize the `NoAckConsumerGroup`, and consume |
| 193 | +as you would with the normal `ConsumerGroup` the key difference is that there is no need to acknowledge any |
| 194 | + |
| 195 | +```java |
| 196 | +NoAckConsumerGroup noack = new NoAckConsumerGroup(jedis, "my-topic", "no-ack-group"); |
| 197 | +TopicEntry entry = noack.consume("my-consumer"); |
| 198 | +// your apps processing |
| 199 | +``` |
| 200 | + |
| 201 | +== Single Cluster PEL |
| 202 | + |
| 203 | +There is also a "Single Cluster PEL" topic manager and consumer group. This implementation does not replicate |
| 204 | +the Pending Entries List (PEL) across Cluster in an Active-Active configuration, making it more performant than its |
| 205 | +standard counterpart for those Active Active deployments. The caveat is that your consumer group PEL will not be synchronized |
| 206 | +across clusters, so you will not be able to claim any entries dropped outside of the original region of consumption. |
| 207 | + |
| 208 | +To read without replicating the PEL, simply initialize the `SingleClusterPelConsumer` group and use it as you would with any |
| 209 | +other consumer group: |
| 210 | + |
| 211 | +```java |
| 212 | +SingleClusterPelConsumerGroup singleClusterPel = new SingleClusterPelConsumerGroup(jedis, "my-topic", "pel-group"); |
| 213 | +TopicEntry entry = singleClusterPel.consume("my-consumer"); |
| 214 | +// your apps processing |
| 215 | +singleClusterPel.acknowledge(new AckMessage(entry)); |
| 216 | +``` |
| 217 | + |
| 218 | +=== Consumer Group Stats and Pending Entries with Single Cluster PEL |
| 219 | + |
| 220 | +The method for gather Consumer group stats and getting pending entries is naturally different with the Single Cluster PEL |
| 221 | +implementation. You must therefore use a specialized `SingleCLusterPelTopicManager` to retrieve these e.g.: |
| 222 | + |
| 223 | +```java |
| 224 | +SingleClusterPelTopicManager singleClusterPelTopicManager = new SingleClusterPelTopicManager(jedis, config); |
| 225 | +PendingEntryQuery query = new PendingEntryQuery(); |
| 226 | +query.setCount(1); |
| 227 | +List<PendingEntry> pendingEntriesSingleCLuster = singleClusterPelTopicManager.getPendingEntries("pel-group", query); |
| 228 | +ConsumerGroupStatus consumerGroupStatsSingleCluster = singleClusterPelTopicManager.getConsumerGroupStatus("pel-group"); |
| 229 | +``` |
| 230 | + |
| 231 | + |
| 232 | +== Support |
| 233 | + |
| 234 | +{name} is supported by Redis, Inc. on a good faith effort basis. |
| 235 | +To report bugs, request features, or receive assistance, please https://github.com/{project-owner}/{dist-repo-name}/issues[file an issue]. |
0 commit comments