diff --git a/pom.xml b/pom.xml
index 0e3024e099..0dfebdef85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.dataspring-data-redis
- 4.0.0-SNAPSHOT
+ 4.0.0-3232-SNAPSHOTSpring Data RedisSpring Data module for Redis
diff --git a/src/main/antora/modules/ROOT/pages/appendix.adoc b/src/main/antora/modules/ROOT/pages/appendix.adoc
index 669bf82204..8c9addf6ec 100644
--- a/src/main/antora/modules/ROOT/pages/appendix.adoc
+++ b/src/main/antora/modules/ROOT/pages/appendix.adoc
@@ -184,6 +184,22 @@ link:https://www.springframework.org/schema/redis/spring-redis-1.0.xsd[Spring Da
|UNSUBSCRIBE |X
|UNWATCH |X
|WATCH |X
+|XACK |X
+|XACKDEL |X
+|XADD |X
+|XAUTOCLAIM |X
+|XCLAIM |X
+|XDEL |X
+|XDELEX |X
+|XGROUP |X
+|XINFO |X
+|XLEN |X
+|XPENDING |X
+|XRANGE |X
+|XREAD |X
+|XREADGROUP |X
+|XREVRANGE |X
+|XTRIM |X
|ZADD |X
|ZCARD |X
|ZCOUNT |X
diff --git a/src/main/java/org/springframework/data/redis/connection/ClusterTopology.java b/src/main/java/org/springframework/data/redis/connection/ClusterTopology.java
index cf3c9fc2b1..22072db5af 100644
--- a/src/main/java/org/springframework/data/redis/connection/ClusterTopology.java
+++ b/src/main/java/org/springframework/data/redis/connection/ClusterTopology.java
@@ -186,7 +186,7 @@ public RedisClusterNode lookup(String nodeId) {
}
/**
- * Get the {@link RedisClusterNode} matching matching either {@link RedisClusterNode#getHost() host} and
+ * Get the {@link RedisClusterNode} matching either {@link RedisClusterNode#getHost() host} and
* {@link RedisClusterNode#getPort() port} or {@link RedisClusterNode#getId() nodeId}
*
* @param node must not be {@literal null}
diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java
index ad35115b73..ba8db75508 100644
--- a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java
@@ -2905,6 +2905,18 @@ public Long xDel(String key, RecordId... recordIds) {
return convertAndReturn(delegate.xDel(serialize(key), recordIds), Converters.identityConverter());
}
+ @Override
+ public List xDelEx(String key, XDelOptions options, RecordId... recordIds) {
+ return convertAndReturn(delegate.xDelEx(serialize(key), options, recordIds),
+ Converters.identityConverter());
+ }
+
+ @Override
+ public List xAckDel(String key, String group, XDelOptions options, RecordId... recordIds) {
+ return convertAndReturn(delegate.xAckDel(serialize(key), group, options, recordIds),
+ Converters.identityConverter());
+ }
+
@Override
public String xGroupCreate(String key, ReadOffset readOffset, String group) {
return convertAndReturn(delegate.xGroupCreate(serialize(key), group, readOffset), Converters.identityConverter());
@@ -3021,6 +3033,11 @@ public Long xTrim(String key, long count, boolean approximateTrimming) {
return convertAndReturn(delegate.xTrim(serialize(key), count, approximateTrimming), Converters.identityConverter());
}
+ @Override
+ public Long xTrim(String key, XTrimOptions options) {
+ return convertAndReturn(delegate.xTrim(serialize(key), options), Converters.identityConverter());
+ }
+
@Override
public Long xAck(byte[] key, String group, RecordId... recordIds) {
return delegate.xAck(key, group, recordIds);
@@ -3046,6 +3063,16 @@ public Long xDel(byte[] key, RecordId... recordIds) {
return delegate.xDel(key, recordIds);
}
+ @Override
+ public List xDelEx(byte[] key, XDelOptions options, RecordId... recordIds) {
+ return delegate.xDelEx(key, options, recordIds);
+ }
+
+ @Override
+ public List xAckDel(byte[] key, String group, XDelOptions options, RecordId... recordIds) {
+ return delegate.xAckDel(key, group, options, recordIds);
+ }
+
@Override
public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset) {
return delegate.xGroupCreate(key, groupName, readOffset);
@@ -3129,6 +3156,11 @@ public Long xTrim(byte[] key, long count, boolean approximateTrimming) {
return delegate.xTrim(key, count, approximateTrimming);
}
+ @Override
+ public Long xTrim(byte[] key, XTrimOptions options) {
+ return delegate.xTrim(key, options);
+ }
+
/**
* Specifies if pipelined and tx results should be deserialized to Strings. If false, results of
* {@link #closePipeline()} and {@link #exec()} will be of the type returned by the underlying connection
diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java
index dd02f85661..4ba0292d2f 100644
--- a/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java
@@ -550,6 +550,20 @@ default Long xDel(byte[] key, RecordId... recordIds) {
return streamCommands().xDel(key, recordIds);
}
+ /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
+ @Override
+ @Deprecated
+ default List xDelEx(byte[] key, XDelOptions options, RecordId... recordIds) {
+ return streamCommands().xDelEx(key, options, recordIds);
+ }
+
+ /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
+ @Override
+ @Deprecated
+ default List xAckDel(byte[] key, String group, XDelOptions options, RecordId... recordIds) {
+ return streamCommands().xAckDel(key, group, options, recordIds);
+ }
+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
@Override
@Deprecated
@@ -686,12 +700,20 @@ default Long xTrim(byte[] key, long count) {
return xTrim(key, count, false);
}
+ /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
@Override
@Deprecated
default Long xTrim(byte[] key, long count, boolean approximateTrimming) {
return streamCommands().xTrim(key, count, approximateTrimming);
}
+ /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
+ @Override
+ @Deprecated
+ default Long xTrim(byte[] key, XTrimOptions options) {
+ return streamCommands().xTrim(key, options);
+ }
+
// LIST COMMANDS
/** @deprecated in favor of {@link RedisConnection#listCommands()}}. */
diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
index 9796288f50..30b0b31ff5 100644
--- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
@@ -33,9 +33,16 @@
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
+import org.springframework.data.redis.connection.RedisStreamCommands.MaxLenTrimStrategy;
+import org.springframework.data.redis.connection.RedisStreamCommands.TrimOperator;
+import org.springframework.data.redis.connection.RedisStreamCommands.TrimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.TrimStrategy;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.PendingMessage;
@@ -200,20 +207,13 @@ default Mono xAck(ByteBuffer key, String group, RecordId... recordIds) {
class AddStreamRecord extends KeyCommand {
private final ByteBufferRecord record;
- private final boolean nomkstream;
- private final @Nullable Long maxlen;
- private final boolean approximateTrimming;
- private final @Nullable RecordId minId;
+ private final XAddOptions options;
- private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen, boolean nomkstream,
- boolean approximateTrimming, @Nullable RecordId minId) {
+ private AddStreamRecord(ByteBufferRecord record, XAddOptions options) {
super(record.getStream());
this.record = record;
- this.maxlen = maxlen;
- this.nomkstream = nomkstream;
- this.approximateTrimming = approximateTrimming;
- this.minId = minId;
+ this.options = options;
}
/**
@@ -226,7 +226,7 @@ public static AddStreamRecord of(ByteBufferRecord record) {
Assert.notNull(record, "Record must not be null");
- return new AddStreamRecord(record, null, false, false, null);
+ return new AddStreamRecord(record, XAddOptions.none());
}
/**
@@ -239,7 +239,7 @@ public static AddStreamRecord body(Map body) {
Assert.notNull(body, "Body must not be null");
- return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false, false, null);
+ return new AddStreamRecord(StreamRecords.rawBuffer(body), XAddOptions.none());
}
/**
@@ -249,7 +249,7 @@ public static AddStreamRecord body(Map body) {
* @return a new {@link ReactiveGeoCommands.GeoAddCommand} with {@literal key} applied.
*/
public AddStreamRecord to(ByteBuffer key) {
- return new AddStreamRecord(record.withStreamKey(key), maxlen, nomkstream, approximateTrimming, minId);
+ return new AddStreamRecord(record.withStreamKey(key), options);
}
/**
@@ -259,7 +259,7 @@ public AddStreamRecord to(ByteBuffer key) {
* @since 2.6
*/
public AddStreamRecord makeNoStream() {
- return new AddStreamRecord(record, maxlen, true, approximateTrimming, minId);
+ return new AddStreamRecord(record, XAddOptions.makeNoStream());
}
/**
@@ -270,7 +270,7 @@ public AddStreamRecord makeNoStream() {
* @since 2.6
*/
public AddStreamRecord makeNoStream(boolean makeNoStream) {
- return new AddStreamRecord(record, maxlen, makeNoStream, approximateTrimming, minId);
+ return new AddStreamRecord(record, XAddOptions.makeNoStream(makeNoStream));
}
/**
@@ -279,7 +279,7 @@ public AddStreamRecord makeNoStream(boolean makeNoStream) {
* @return new instance of {@link AddStreamRecord}.
*/
public AddStreamRecord maxlen(long maxlen) {
- return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId);
+ return new AddStreamRecord(record, XAddOptions.maxlen(maxlen));
}
/**
@@ -290,7 +290,7 @@ public AddStreamRecord maxlen(long maxlen) {
* @since 2.7
*/
public AddStreamRecord minId(RecordId minId) {
- return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId);
+ return new AddStreamRecord(record, options.minId(minId));
}
/**
@@ -299,7 +299,23 @@ public AddStreamRecord minId(RecordId minId) {
* @return new instance of {@link AddStreamRecord}.
*/
public AddStreamRecord approximateTrimming(boolean approximateTrimming) {
- return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId);
+ return new AddStreamRecord(record, options.approximateTrimming(approximateTrimming));
+ }
+
+ /**
+ * Apply the given {@link XAddOptions} to configure the {@literal XADD} command.
+ *
+ * This method allows setting all XADD options at once, including trimming strategies
+ * ({@literal MAXLEN}, {@literal MINID}), stream creation behavior ({@literal NOMKSTREAM}),
+ * and other parameters. Constructs a new command instance with all previously configured
+ * properties except the options, which are replaced by the provided {@link XAddOptions}.
+ *
+ * @param options the {@link XAddOptions} to apply. Must not be {@literal null}.
+ * @return a new {@link AddStreamRecord} with the specified options applied.
+ * @since 4.0
+ */
+ public AddStreamRecord withOptions(XAddOptions options) {
+ return new AddStreamRecord(record, options);
}
/**
@@ -318,7 +334,7 @@ public ByteBufferRecord getRecord() {
* @since 2.6
*/
public boolean isNoMkStream() {
- return nomkstream;
+ return options.isNoMkStream();
}
/**
@@ -328,23 +344,21 @@ public boolean isNoMkStream() {
* @since 2.3
*/
public @Nullable Long getMaxlen() {
- return maxlen;
+ return options.getMaxlen();
}
/**
* @return {@literal true} if {@literal MAXLEN} is set.
* @since 2.3
*/
- public boolean hasMaxlen() {
- return maxlen != null;
- }
+ public boolean hasMaxlen() { return options.hasMaxlen(); }
/**
* @return {@literal true} if {@literal approximateTrimming} is set.
* @since 2.7
*/
public boolean isApproximateTrimming() {
- return approximateTrimming;
+ return options.isApproximateTrimming();
}
/**
@@ -352,7 +366,7 @@ public boolean isApproximateTrimming() {
* @since 2.7
*/
public @Nullable RecordId getMinId() {
- return minId;
+ return options.getMinId();
}
/**
@@ -360,7 +374,15 @@ public boolean isApproximateTrimming() {
* @since 2.7
*/
public boolean hasMinId() {
- return minId != null;
+ return options.hasMinId();
+ }
+
+ /**
+ * @return the XAddOptions options.
+ * @since 4.0
+ */
+ public XAddOptions getOptions() {
+ return options;
}
}
@@ -409,18 +431,8 @@ default Mono xAdd(ByteBufferRecord record, XAddOptions xAddOptions) {
Assert.notNull(record, "Record must not be null");
Assert.notNull(xAddOptions, "XAddOptions must not be null");
- AddStreamRecord addStreamRecord = AddStreamRecord.of(record)
- .approximateTrimming(xAddOptions.isApproximateTrimming()).makeNoStream(xAddOptions.isNoMkStream());
-
- if (xAddOptions.hasMaxlen()) {
- addStreamRecord = addStreamRecord.maxlen(xAddOptions.getMaxlen());
- }
-
- if (xAddOptions.hasMinId()) {
- addStreamRecord = addStreamRecord.minId(xAddOptions.getMinId());
- }
-
- return xAdd(Mono.just(addStreamRecord)).next().map(CommandResponse::getOutput);
+ return xAdd(Mono.just(AddStreamRecord.of(record).withOptions(xAddOptions))).next()
+ .map(CommandResponse::getOutput);
}
/**
@@ -602,6 +614,194 @@ public List getRecordIds() {
}
}
+ /**
+ * {@code XDELEX} command parameters.
+ *
+ * @author Viktoriya Kutsarova
+ * @since 4.0
+ * @see Redis Documentation: XDELEX
+ */
+ class DeleteExCommand extends KeyCommand {
+
+ private final List recordIds;
+ private final XDelOptions options;
+
+ private DeleteExCommand(@Nullable ByteBuffer key, List recordIds, XDelOptions options) {
+
+ super(key);
+ this.recordIds = recordIds;
+ this.options = options;
+ }
+
+ /**
+ * Creates a new {@link DeleteExCommand} given a {@link ByteBuffer key}.
+ *
+ * @param key must not be {@literal null}.
+ * @return a new {@link DeleteExCommand} for {@link ByteBuffer key}.
+ */
+ public static DeleteExCommand stream(ByteBuffer key) {
+
+ Assert.notNull(key, "Key must not be null");
+
+ return new DeleteExCommand(key, Collections.emptyList(), XDelOptions.defaults());
+ }
+
+ /**
+ * Applies the {@literal recordIds}. Constructs a new command instance with all previously configured properties.
+ *
+ * @param recordIds must not be {@literal null}.
+ * @return a new {@link DeleteExCommand} with {@literal recordIds} applied.
+ */
+ public DeleteExCommand records(String... recordIds) {
+
+ Assert.notNull(recordIds, "RecordIds must not be null");
+
+ return records(Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new));
+ }
+
+ /**
+ * Applies the {@literal recordIds}. Constructs a new command instance with all previously configured properties.
+ *
+ * @param recordIds must not be {@literal null}.
+ * @return a new {@link DeleteExCommand} with {@literal recordIds} applied.
+ */
+ public DeleteExCommand records(RecordId... recordIds) {
+
+ Assert.notNull(recordIds, "RecordIds must not be null");
+
+ List newRecordIds = new ArrayList<>(getRecordIds().size() + recordIds.length);
+ newRecordIds.addAll(getRecordIds());
+ newRecordIds.addAll(Arrays.asList(recordIds));
+
+ return new DeleteExCommand(getKey(), newRecordIds, options);
+ }
+
+ /**
+ * Applies the {@link XDelOptions}. Constructs a new command instance with all previously configured properties.
+ *
+ * @param options must not be {@literal null}.
+ * @return a new {@link DeleteExCommand} with {@link XDelOptions} applied.
+ */
+ public DeleteExCommand withOptions(XDelOptions options) {
+
+ Assert.notNull(options, "XDelOptions must not be null");
+
+ return new DeleteExCommand(getKey(), recordIds, options);
+ }
+
+ public List getRecordIds() {
+ return recordIds;
+ }
+
+ public XDelOptions getOptions() {
+ return options;
+ }
+ }
+
+ /**
+ * {@code XACKDEL} command parameters.
+ *
+ * @author Viktoriya Kutsarova
+ * @since 4.0
+ * @see Redis Documentation: XACKDEL
+ */
+ class AcknowledgeDeleteCommand extends KeyCommand {
+
+ private final @Nullable String group;
+ private final List recordIds;
+ private final XDelOptions options;
+
+ private AcknowledgeDeleteCommand(@Nullable ByteBuffer key, @Nullable String group, List recordIds,
+ XDelOptions options) {
+
+ super(key);
+ this.group = group;
+ this.recordIds = recordIds;
+ this.options = options;
+ }
+
+ /**
+ * Creates a new {@link AcknowledgeDeleteCommand} given a {@link ByteBuffer key}.
+ *
+ * @param key must not be {@literal null}.
+ * @return a new {@link AcknowledgeDeleteCommand} for {@link ByteBuffer key}.
+ */
+ public static AcknowledgeDeleteCommand stream(ByteBuffer key) {
+
+ Assert.notNull(key, "Key must not be null");
+
+ return new AcknowledgeDeleteCommand(key, null, Collections.emptyList(), XDelOptions.defaults());
+ }
+
+ /**
+ * Applies the {@literal group}. Constructs a new command instance with all previously configured properties.
+ *
+ * @param group must not be {@literal null}.
+ * @return a new {@link AcknowledgeDeleteCommand} with {@literal group} applied.
+ */
+ public AcknowledgeDeleteCommand group(String group) {
+
+ Assert.notNull(group, "Group must not be null");
+
+ return new AcknowledgeDeleteCommand(getKey(), group, recordIds, options);
+ }
+
+ /**
+ * Applies the {@literal recordIds}. Constructs a new command instance with all previously configured properties.
+ *
+ * @param recordIds must not be {@literal null}.
+ * @return a new {@link AcknowledgeDeleteCommand} with {@literal recordIds} applied.
+ */
+ public AcknowledgeDeleteCommand records(String... recordIds) {
+
+ Assert.notNull(recordIds, "RecordIds must not be null");
+
+ return records(Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new));
+ }
+
+ /**
+ * Applies the {@literal recordIds}. Constructs a new command instance with all previously configured properties.
+ *
+ * @param recordIds must not be {@literal null}.
+ * @return a new {@link AcknowledgeDeleteCommand} with {@literal recordIds} applied.
+ */
+ public AcknowledgeDeleteCommand records(RecordId... recordIds) {
+
+ Assert.notNull(recordIds, "RecordIds must not be null");
+
+ List newRecordIds = new ArrayList<>(getRecordIds().size() + recordIds.length);
+ newRecordIds.addAll(getRecordIds());
+ newRecordIds.addAll(Arrays.asList(recordIds));
+
+ return new AcknowledgeDeleteCommand(getKey(), group, newRecordIds, options);
+ }
+
+ /**
+ * Applies the {@link XDelOptions}. Constructs a new command instance with all previously configured properties.
+ *
+ * @param options must not be {@literal null}.
+ * @return a new {@link AcknowledgeDeleteCommand} with {@link XDelOptions} applied.
+ */
+ public AcknowledgeDeleteCommand withOptions(XDelOptions options) {
+
+ Assert.notNull(options, "XDelOptions must not be null");
+
+ return new AcknowledgeDeleteCommand(getKey(), group, recordIds, options);
+ }
+
+ public @Nullable String getGroup() {
+ return group;
+ }
+
+ public List getRecordIds() {
+ return recordIds;
+ }
+
+ public XDelOptions getOptions() {
+ return options;
+ }
+ }
+
/**
* Removes the specified entries from the stream. Returns the number of items deleted, that may be different from the
* number of IDs passed in case certain IDs do not exist.
@@ -646,6 +846,128 @@ default Mono xDel(ByteBuffer key, RecordId... recordIds) {
*/
Flux> xDel(Publisher commands);
+ /**
+ * Deletes one or multiple entries from the stream at the specified key with extended options.
+ *
+ * XDELEX is an extension of the Redis Streams XDEL command that provides more control over how message entries
+ * are deleted concerning consumer groups.
+ *
+ * @param key the stream key.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's.
+ * @return {@link Flux} emitting {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XDELEX
+ * @since 4.0
+ */
+ default Flux xDelEx(ByteBuffer key,
+ XDelOptions options, String... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(options, "XDelOptions must not be null");
+ Assert.notNull(recordIds, "RecordIds must not be null");
+
+ return xDelEx(Mono.just(DeleteExCommand.stream(key).withOptions(options).records(recordIds)))
+ .flatMap(response -> Flux.fromIterable(response.getOutput()));
+ }
+
+ /**
+ * Deletes one or multiple entries from the stream at the specified key with extended options.
+ *
+ * XDELEX is an extension of the Redis Streams XDEL command that provides more control over how message entries
+ * are deleted concerning consumer groups.
+ *
+ * @param key the stream key.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's.
+ * @return {@link Flux} emitting {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XDELEX
+ * @since 4.0
+ */
+ default Flux xDelEx(ByteBuffer key,
+ XDelOptions options, RecordId... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(options, "XDelOptions must not be null");
+ Assert.notNull(recordIds, "RecordIds must not be null");
+
+ return xDelEx(Mono.just(DeleteExCommand.stream(key).withOptions(options).records(recordIds)))
+ .flatMap(response -> Flux.fromIterable(response.getOutput()));
+ }
+
+ /**
+ * Deletes one or multiple entries from the stream with extended options.
+ *
+ * @param commands must not be {@literal null}.
+ * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} per {@link DeleteExCommand}.
+ * @see Redis Documentation: XDELEX
+ * @since 4.0
+ */
+ Flux>> xDelEx(
+ Publisher commands);
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key.
+ *
+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the
+ * given consumer group and simultaneously attempts to delete the corresponding entries from the stream.
+ *
+ * @param key the stream key.
+ * @param group name of the consumer group.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's.
+ * @return {@link Flux} emitting {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XACKDEL
+ * @since 4.0
+ */
+ default Flux xAckDel(ByteBuffer key, String group,
+ XDelOptions options, String... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(group, "Group must not be null");
+ Assert.notNull(options, "XDelOptions must not be null");
+ Assert.notNull(recordIds, "RecordIds must not be null");
+
+ return xAckDel(Mono.just(AcknowledgeDeleteCommand.stream(key).group(group).withOptions(options).records(recordIds)))
+ .flatMap(response -> Flux.fromIterable(response.getOutput()));
+ }
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key.
+ *
+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the
+ * given consumer group and simultaneously attempts to delete the corresponding entries from the stream.
+ *
+ * @param key the stream key.
+ * @param group name of the consumer group.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's.
+ * @return {@link Flux} emitting {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XACKDEL
+ * @since 4.0
+ */
+ default Flux xAckDel(ByteBuffer key, String group,
+ XDelOptions options, RecordId... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(group, "Group must not be null");
+ Assert.notNull(options, "XDelOptions must not be null");
+ Assert.notNull(recordIds, "RecordIds must not be null");
+
+ return xAckDel(Mono.just(AcknowledgeDeleteCommand.stream(key).group(group).withOptions(options).records(recordIds)))
+ .flatMap(response -> Flux.fromIterable(response.getOutput()));
+ }
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group.
+ *
+ * @param commands must not be {@literal null}.
+ * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} per {@link AcknowledgeDeleteCommand}.
+ * @see Redis Documentation: XACKDEL
+ * @since 4.0
+ */
+ Flux>> xAckDel(
+ Publisher commands);
+
/**
* Get the size of the stream stored at {@literal key}.
*
@@ -1565,13 +1887,11 @@ default Flux xRevRange(ByteBuffer key, Range range, Li
*/
class TrimCommand extends KeyCommand {
- private @Nullable Long count;
- private boolean approximateTrimming;
+ private final XTrimOptions options;
- private TrimCommand(@Nullable ByteBuffer key, @Nullable Long count, boolean approximateTrimming) {
+ private TrimCommand(@Nullable ByteBuffer key, XTrimOptions options) {
super(key);
- this.count = count;
- this.approximateTrimming = approximateTrimming;
+ this.options = options;
}
/**
@@ -1579,23 +1899,43 @@ private TrimCommand(@Nullable ByteBuffer key, @Nullable Long count, boolean appr
*
* @param key must not be {@literal null}.
* @return a new {@link TrimCommand} for {@link ByteBuffer key}.
+ * @since 4.0
+ * @deprecated since 4.0, prefer {@link #stream(ByteBuffer, XTrimOptions)} instead.
*/
+ @Deprecated(since = "4.0", forRemoval = false)
public static TrimCommand stream(ByteBuffer key) {
Assert.notNull(key, "Key must not be null");
- return new TrimCommand(key, null, false);
+ return new TrimCommand(key, XTrimOptions.trim(TrimOptions.maxLen(0)));
}
/**
- * Applies the numeric {@literal count}. Constructs a new command instance with all previously configured
+ * Creates a new {@link TrimCommand} given a {@link ByteBuffer key} and {@link XTrimOptions}.
+ *
+ * @param key must not be {@literal null}.
+ * @param options must not be {@literal null}.
+ * @return a new {@link TrimCommand} for {@link ByteBuffer key}.
+ * @since 4.0
+ */
+ public static TrimCommand stream(ByteBuffer key, XTrimOptions options) {
+ return new TrimCommand(key, options);
+ }
+
+ /**
+ * Applies the numeric {@literal threshold}. Constructs a new command instance with all previously configured
* properties.
*
- * @param count
- * @return a new {@link TrimCommand} with {@literal count} applied.
+ * @param threshold
+ * @return a new {@link TrimCommand} with {@literal threshold} applied.
+ * @deprecated since 4.0: specify a concrete trim strategy (MAXLEN or MINID) via {@link XTrimOptions}
+ * and {@link TrimOptions} instead of using this method. Prefer
+ * {@code options(XTrimOptions.trim(TrimOptions.maxLen(threshold)))} or construct with
+ * {@code stream(key, XTrimOptions.trim(TrimOptions.maxLen(threshold)))}.
*/
- public TrimCommand to(long count) {
- return new TrimCommand(getKey(), count, approximateTrimming);
+ @Deprecated(since = "4.0", forRemoval = false)
+ public TrimCommand to(long threshold) {
+ return new TrimCommand(getKey(), XTrimOptions.trim(TrimOptions.maxLen(threshold)));
}
/**
@@ -1603,7 +1943,11 @@ public TrimCommand to(long count) {
*
* @return a new {@link TrimCommand} with {@literal approximateTrimming} applied.
* @since 2.4
+ * @deprecated since 4.0: do not toggle the trim operator in isolation. Specify a concrete trim
+ * strategy (MAXLEN or MINID) and operator via {@link XTrimOptions} and {@link TrimOptions}, e.g.
+ * {@code options(XTrimOptions.trim(TrimOptions.maxLen(n).approximate()))}.
*/
+ @Deprecated(since = "4.0", forRemoval = false)
public TrimCommand approximate() {
return approximate(true);
}
@@ -1614,20 +1958,59 @@ public TrimCommand approximate() {
* @param approximateTrimming
* @return a new {@link TrimCommand} with {@literal approximateTrimming} applied.
* @since 2.4
+ * @deprecated since 4.0: do not toggle the trim operator in isolation. Specify a concrete trim
+ * strategy (MAXLEN or MINID) and operator via {@link XTrimOptions} and {@link TrimOptions}, e.g.
+ * {@code options(XTrimOptions.trim(TrimOptions.maxLen(n).approximate()))} or
+ * {@code options(XTrimOptions.trim(TrimOptions.minId(id).exact()))}.
*/
+ @Deprecated(since = "4.0", forRemoval = false)
public TrimCommand approximate(boolean approximateTrimming) {
- return new TrimCommand(getKey(), count, approximateTrimming);
+ if (approximateTrimming) {
+ return new TrimCommand(getKey(), XTrimOptions.trim(options.getTrimOptions().approximate()));
+ }
+ return new TrimCommand(getKey(), XTrimOptions.trim(options.getTrimOptions().exact()));
+ }
+
+ /**
+ * Apply the given {@link XTrimOptions} to configure the {@literal XTRIM} command.
+ *
+ * This method allows setting all XTRIM options at once, including trimming strategies
+ * ({@literal MAXLEN}, {@literal MINID}) and other parameters. Constructs a new command instance with all
+ * previously configured properties except the options, which are replaced by the provided {@link XTrimOptions}.
+ *
+ * @param options the {@link XTrimOptions} to apply. Must not be {@literal null}.
+ * @return a new {@link TrimCommand} with the specified options applied.
+ * @since 4.0
+ */
+ public TrimCommand options(XTrimOptions options) {
+ return new TrimCommand(getKey(), options);
}
/**
+ * Returns the MAXLEN threshold if the active trim strategy is {@literal MAXLEN}; otherwise {@literal null}.
+ *
* @return can be {@literal null}.
+ * @deprecated since 4.0: Inspect {@link #getOptions()} -> {@link XTrimOptions#getTrimOptions()} ->
+ * {@link TrimOptions#getTrimStrategy()} and obtain the threshold from the concrete strategy instead. For example:
+ * {@code if (strategy instanceof MaxLenTrimStrategy m) { m.threshold(); }} or
+ * {@code if (strategy instanceof MinIdTrimStrategy i) { i.threshold(); }}.
*/
+ @Deprecated(since = "4.0", forRemoval = false)
public @Nullable Long getCount() {
- return count;
+ TrimStrategy strategy = options.getTrimOptions().getTrimStrategy();
+ if (strategy instanceof MaxLenTrimStrategy maxLen) {
+ return maxLen.threshold();
+ }
+ return null;
}
+
public boolean isApproximateTrimming() {
- return approximateTrimming;
+ return options.getTrimOptions().getTrimOperator() == TrimOperator.APPROXIMATE;
+ }
+
+ public XTrimOptions getOptions() {
+ return options;
}
}
@@ -1661,6 +2044,14 @@ default Mono xTrim(ByteBuffer key, long count, boolean approximateTrimming
.map(NumericResponse::getOutput);
}
+ default Mono xTrim(ByteBuffer key, XTrimOptions options) {
+
+ Assert.notNull(key, "Key must not be null");
+
+ return xTrim(Mono.just(TrimCommand.stream(key).options(options))).next()
+ .map(NumericResponse::getOutput);
+ }
+
/**
* Trims the stream to {@code count} elements.
*
diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java
index 2204513eda..3d9d78a5bd 100644
--- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java
@@ -46,6 +46,7 @@
* @author Dengliming
* @author Mark John Moreno
* @author Jeonggyu Choi
+ * @author Viktoriya Kutsarova
* @since 2.2
* @see RedisCommands
* @see Redis Documentation - Streams
@@ -105,13 +106,252 @@ default RecordId xAdd(@NonNull MapRecord record) {
* assignment over server generated ones make sure to provide an id via {@code Record#withId}.
*
* @param record the {@link MapRecord record} to append.
- * @param options additional options (eg. {@literal MAXLEN}). Must not be {@literal null}, use
+ * @param options additional options (e.g. {@literal MAXLEN}). Must not be {@literal null}, use
* {@link XAddOptions#none()} instead.
* @return the {@link RecordId id} after save. {@literal null} when used in pipeline / transaction.
* @since 2.3
*/
RecordId xAdd(MapRecord record, @NonNull XAddOptions options);
+
+ sealed interface TrimStrategy permits MaxLenTrimStrategy, MinIdTrimStrategy {
+ }
+
+ final class MaxLenTrimStrategy implements TrimStrategy {
+ private final long threshold;
+
+ private MaxLenTrimStrategy(long threshold) {
+ this.threshold = threshold;
+ }
+
+ public long threshold() {
+ return threshold;
+ }
+
+ }
+
+ final class MinIdTrimStrategy implements TrimStrategy {
+ private final RecordId threshold;
+
+ private MinIdTrimStrategy(RecordId threshold) {
+ this.threshold = threshold;
+ }
+
+ public RecordId threshold() {
+ return threshold;
+ }
+ }
+
+ enum TrimOperator {
+ EXACT,
+ APPROXIMATE
+ }
+
+ @NullMarked
+ class TrimOptions {
+
+ private final TrimStrategy trimStrategy;
+ private final TrimOperator trimOperator;
+ private final @Nullable Long limit;
+ private final @Nullable StreamDeletionPolicy pendingReferences;
+
+ private TrimOptions(TrimStrategy trimStrategy, TrimOperator trimOperator, @Nullable Long limit, @Nullable StreamDeletionPolicy pendingReferences) {
+ Assert.notNull(trimStrategy, "Trim strategy must not be null");
+ this.trimStrategy = trimStrategy;
+ this.trimOperator = trimOperator;
+ this.limit = limit;
+ this.pendingReferences = pendingReferences;
+ }
+
+
+ /**
+ * Create trim options using the MAXLEN strategy with the given threshold.
+ *
+ * Produces {@link TrimOptions} with the exact ("=") operator by default; call {@link #approximate()} to use
+ * approximate ("~") trimming.
+ *
+ * @param maxLen maximum number of entries to retain in the stream
+ * @return new {@link TrimOptions} configured with the MAXLEN strategy
+ * @since 4.0
+ */
+ public static TrimOptions maxLen(long maxLen) {
+ return new TrimOptions(new MaxLenTrimStrategy(maxLen), TrimOperator.EXACT, null, null);
+ }
+
+
+ /**
+ * Create trim options using the MINID strategy with the given minimum id.
+ *
+ * Produces {@link TrimOptions} with the exact ("=") operator by default; call {@link #approximate()} to use
+ * approximate ("~") trimming.
+ *
+ * @param minId minimum id; entries with an id lower than this value are eligible for trimming
+ * @return new {@link TrimOptions} configured with the MINID strategy
+ * @since 4.0
+ */
+ public static TrimOptions minId(RecordId minId) {
+ return new TrimOptions(new MinIdTrimStrategy(minId), TrimOperator.EXACT, null, null);
+ }
+
+ /**
+ * Apply specified trim operator.
+ *
+ * This is a member method that preserves all other options.
+ *
+ * @param trimOperator the operator to use when trimming
+ * @return new instance of {@link XTrimOptions}.
+ */
+ public TrimOptions trim(TrimOperator trimOperator) {
+ return new TrimOptions(trimStrategy, trimOperator, limit, pendingReferences);
+ }
+
+ /**
+ * Use approximate trimming ("~").
+ *
+ * This is a member method that preserves all other options.
+ *
+ * @return new instance of {@link TrimOptions} with {@link TrimOperator#APPROXIMATE}.
+ */
+ public TrimOptions approximate() {
+ return new TrimOptions(trimStrategy, TrimOperator.APPROXIMATE, limit, pendingReferences);
+ }
+
+ /**
+ * Use exact trimming ("=").
+ *
+ * This is a member method that preserves all other options.
+ *
+ * @return new instance of {@link TrimOptions} with {@link TrimOperator#EXACT}.
+ */
+ public TrimOptions exact() {
+ return new TrimOptions(trimStrategy, TrimOperator.EXACT, limit, pendingReferences);
+ }
+
+
+ /**
+ * Limit the maximum number of entries considered when trimming.
+ *
+ * This is a member method that preserves all other options.
+ *
+ * @param limit the maximum number of entries to examine for trimming.
+ * @return new instance of {@link XTrimOptions}.
+ */
+ public TrimOptions limit(long limit) {
+ return new TrimOptions(trimStrategy, trimOperator, limit, pendingReferences);
+ }
+
+ /**
+ * Set the deletion policy for trimming.
+ *
+ * This is a member method that preserves all other options.
+ *
+ * @param pendingReferences the deletion policy to apply.
+ * @return new instance of {@link XTrimOptions}.
+ */
+ public TrimOptions pendingReferences(StreamDeletionPolicy pendingReferences) {
+ return new TrimOptions(trimStrategy, trimOperator, limit, pendingReferences);
+ }
+
+ public TrimStrategy getTrimStrategy() {
+ return trimStrategy;
+ }
+
+ /**
+ * @return strategy to use when trimming entries
+ */
+ public TrimOperator getTrimOperator() {
+ return trimOperator;
+ }
+
+ /**
+ * @return the limit to retain during trimming.
+ * @since 4.0
+ */
+ public @Nullable Long getLimit() {
+ return limit;
+ }
+
+ /**
+ * @return {@literal true} if {@literal LIMIT} is set.
+ * @since 4.0
+ */
+ public boolean hasLimit() {
+ return limit != null;
+ }
+
+ /**
+ * @return the deletion policy.
+ * @since 4.0
+ */
+ public @Nullable StreamDeletionPolicy getPendingReferences() {
+ return pendingReferences;
+ }
+
+ /**
+ * @return {@literal true} if {@literal DELETION_POLICY} is set.
+ * @since 4.0
+ */
+ public boolean hasDeletionPolicy() {
+ return pendingReferences != null;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TrimOptions that)) {
+ return false;
+ }
+ if (this.trimStrategy.equals(that.trimStrategy)) {
+ return false;
+ }
+ if (this.trimOperator.equals(that.trimOperator)) {
+ return false;
+ }
+ return ObjectUtils.nullSafeEquals(pendingReferences, that.pendingReferences);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = trimStrategy.hashCode();
+ result = 31 * result + trimOperator.hashCode();
+ result = 31 * result + ObjectUtils.nullSafeHashCode(limit);
+ result = 31 * result + ObjectUtils.nullSafeHashCode(pendingReferences);
+ return result;
+ }
+ }
+
+ @NullMarked
+ class XTrimOptions {
+
+ private final TrimOptions trimOptions;
+
+ private XTrimOptions(TrimOptions trimOptions) {
+ this.trimOptions = trimOptions;
+ }
+
+ public static XTrimOptions trim(TrimOptions trimOptions) {
+ return new XTrimOptions(trimOptions);
+ }
+
+ /**
+ * Backward-compatible factory alias for creating {@link XTrimOptions} from {@link TrimOptions}.
+ *
+ * @param trimOptions the trim options to apply for XTRIM
+ * @return new {@link XTrimOptions}
+ * @since 4.0
+ */
+ public static XTrimOptions of(TrimOptions trimOptions) {
+ return trim(trimOptions);
+ }
+
+
+ public TrimOptions getTrimOptions() {
+ return trimOptions;
+ }
+ }
+
/**
* Additional options applicable for {@literal XADD} command.
*
@@ -123,28 +363,30 @@ default RecordId xAdd(@NonNull MapRecord record) {
@NullMarked
class XAddOptions {
- private static final XAddOptions NONE = new XAddOptions(null, false, false, null);
+ public static XAddOptions NONE = new XAddOptions(false, null);
- private final @Nullable Long maxlen;
private final boolean nomkstream;
- private final boolean approximateTrimming;
- private final @Nullable RecordId minId;
+ private final @Nullable TrimOptions trimOptions;
- private XAddOptions(@Nullable Long maxlen, boolean nomkstream, boolean approximateTrimming,
- @Nullable RecordId minId) {
- this.maxlen = maxlen;
+ private XAddOptions(boolean nomkstream, @Nullable TrimOptions trimOptions) {
this.nomkstream = nomkstream;
- this.approximateTrimming = approximateTrimming;
- this.minId = minId;
+ this.trimOptions = trimOptions;
}
/**
- * @return
+ * Create default add options.
+ *
+ * @return new instance of {@link XAddOptions} with default values
+ * @since 2.6
*/
public static XAddOptions none() {
return NONE;
}
+ public static XAddOptions trim(@Nullable TrimOptions trimOptions) {
+ return new XAddOptions(false, trimOptions);
+ }
+
/**
* Disable creation of stream if it does not already exist.
*
@@ -152,7 +394,7 @@ public static XAddOptions none() {
* @since 2.6
*/
public static XAddOptions makeNoStream() {
- return new XAddOptions(null, true, false, null);
+ return new XAddOptions(true, null);
}
/**
@@ -163,7 +405,7 @@ public static XAddOptions makeNoStream() {
* @since 2.6
*/
public static XAddOptions makeNoStream(boolean makeNoStream) {
- return new XAddOptions(null, makeNoStream, false, null);
+ return new XAddOptions(makeNoStream, null);
}
/**
@@ -172,7 +414,7 @@ public static XAddOptions makeNoStream(boolean makeNoStream) {
* @return new instance of {@link XAddOptions}.
*/
public static XAddOptions maxlen(long maxlen) {
- return new XAddOptions(maxlen, false, false, null);
+ return new XAddOptions(false, TrimOptions.maxLen(maxlen));
}
/**
@@ -183,16 +425,26 @@ public static XAddOptions maxlen(long maxlen) {
* @since 2.7
*/
public XAddOptions minId(RecordId minId) {
- return new XAddOptions(maxlen, nomkstream, approximateTrimming, minId);
+ return new XAddOptions(nomkstream, TrimOptions.minId(minId));
}
/**
* Apply efficient trimming for capped streams using the {@code ~} flag.
*
* @return new instance of {@link XAddOptions}.
+ * @deprecated since 4.0: callers must specify a concrete trim strategy (MAXLEN or MINID)
+ * via {@link TrimOptions}; do not use this method to only toggle approximate/exact.
+ * Prefer {@code XAddOptions.trim(TrimOptions.maxLen(n).approximate())} or
+ * {@code XAddOptions.trim(TrimOptions.minId(id).exact())}.
*/
+ @Deprecated(since = "4.0", forRemoval = false)
public XAddOptions approximateTrimming(boolean approximateTrimming) {
- return new XAddOptions(maxlen, nomkstream, approximateTrimming, minId);
+ TrimOptions trimOptions = this.trimOptions != null ? this.trimOptions : TrimOptions.maxLen(0);
+ if (approximateTrimming) {
+ return new XAddOptions(nomkstream, trimOptions.approximate());
+ } else {
+ return new XAddOptions(nomkstream, trimOptions.exact());
+ }
}
/**
@@ -209,29 +461,31 @@ public boolean isNoMkStream() {
* @return can be {@literal null}.
*/
public @Nullable Long getMaxlen() {
- return maxlen;
+ return trimOptions != null && trimOptions.getTrimStrategy() instanceof MaxLenTrimStrategy maxLenTrimStrategy
+ ? maxLenTrimStrategy.threshold() : null;
}
/**
* @return {@literal true} if {@literal MAXLEN} is set.
*/
public boolean hasMaxlen() {
- return maxlen != null;
+ return trimOptions != null && trimOptions.getTrimStrategy() instanceof MaxLenTrimStrategy;
}
/**
* @return {@literal true} if {@literal approximateTrimming} is set.
*/
public boolean isApproximateTrimming() {
- return approximateTrimming;
+ return trimOptions != null && trimOptions.getTrimOperator() == TrimOperator.APPROXIMATE;
}
/**
- * @return the minimum record Id to retain during trimming.
+ * @return the minimum record id to retain during trimming.
* @since 2.7
*/
public @Nullable RecordId getMinId() {
- return minId;
+ return trimOptions != null && trimOptions.getTrimStrategy() instanceof MinIdTrimStrategy minIdTrimStrategy
+ ? minIdTrimStrategy.threshold() : null;
}
/**
@@ -239,7 +493,20 @@ public boolean isApproximateTrimming() {
* @since 2.7
*/
public boolean hasMinId() {
- return minId != null;
+ return trimOptions != null && trimOptions.getTrimStrategy() instanceof MinIdTrimStrategy;
+ }
+
+ public XAddOptions nomkstream(boolean nomkstream) {
+ return new XAddOptions(nomkstream, trimOptions);
+ }
+
+
+ public boolean hasTrimOptions() {
+ return trimOptions != null;
+ }
+
+ public @Nullable TrimOptions getTrimOptions() {
+ return trimOptions;
}
@Override
@@ -250,28 +517,111 @@ public boolean equals(@Nullable Object o) {
if (!(o instanceof XAddOptions that)) {
return false;
}
- if (nomkstream != that.nomkstream) {
- return false;
- }
- if (approximateTrimming != that.approximateTrimming) {
- return false;
- }
- if (!ObjectUtils.nullSafeEquals(maxlen, that.maxlen)) {
+ if (!(ObjectUtils.nullSafeEquals(this.trimOptions, that.trimOptions))) {
return false;
}
- return ObjectUtils.nullSafeEquals(minId, that.minId);
+ return nomkstream == that.nomkstream;
}
@Override
public int hashCode() {
- int result = ObjectUtils.nullSafeHashCode(maxlen);
+ int result = ObjectUtils.nullSafeHashCode(this.trimOptions);
result = 31 * result + (nomkstream ? 1 : 0);
- result = 31 * result + (approximateTrimming ? 1 : 0);
- result = 31 * result + ObjectUtils.nullSafeHashCode(minId);
return result;
}
}
+ /**
+ * Deletion policy for stream entries.
+ *
+ * @author Viktoriya Kutsarova
+ * @since 4.0
+ */
+ enum StreamDeletionPolicy {
+ /**
+ * Remove entries according to the specified strategy, but preserve existing references.
+ */
+ KEEP_REFERENCES,
+ /**
+ * Remove entries according to the specified strategy and remove references.
+ */
+ DELETE_REFERENCES,
+ /**
+ * Remove entries that are read and acknowledged and remove references.
+ */
+ ACKNOWLEDGED;
+
+ /**
+ * Factory method for {@link #KEEP_REFERENCES}.
+ */
+ public static StreamDeletionPolicy keep() { return KEEP_REFERENCES; }
+
+ /**
+ * Factory method for {@link #DELETE_REFERENCES}.
+ */
+ public static StreamDeletionPolicy delete() { return DELETE_REFERENCES; }
+
+ /**
+ * Factory method for {@link #ACKNOWLEDGED}.
+ */
+ public static StreamDeletionPolicy removeAcknowledged() { return ACKNOWLEDGED; }
+ }
+
+ /**
+ * Result of a stream entry deletion operation for {@literal XDELEX} and {@literal XACKDEL} commands.
+ *
+ * @author Viktoriya Kutsarova
+ * @since 4.0
+ */
+ enum StreamEntryDeletionResult {
+
+ UNKNOWN(-2L),
+ /**
+ * The entry ID does not exist in the stream.
+ */
+ NOT_FOUND(-1L),
+ /**
+ * The entry was successfully deleted from the stream.
+ */
+ DELETED(1L),
+ /**
+ * The entry was acknowledged but not deleted (when using ACKED deletion policy with dangling references).
+ */
+ NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED(2L);
+
+ private final long code;
+
+ StreamEntryDeletionResult(long code) {
+ this.code = code;
+ }
+
+ /**
+ * Get the numeric code for this deletion result.
+ *
+ * @return the numeric code: -1 for NOT_FOUND, 1 for DELETED, 2 for NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED
+ */
+ public long getCode() {
+ return code;
+ }
+
+ /**
+ * Convert a numeric code to a {@link StreamEntryDeletionResult}.
+ *
+ * @param code the numeric code
+ * @return the corresponding {@link StreamEntryDeletionResult}
+ * @throws IllegalArgumentException if the code is not valid
+ */
+ public static StreamEntryDeletionResult fromCode(long code) {
+ return switch ((int) code) {
+ case -2 -> UNKNOWN;
+ case -1 -> NOT_FOUND;
+ case 1 -> DELETED;
+ case 2 -> NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED;
+ default -> throw new IllegalArgumentException("Invalid deletion result code: " + code);
+ };
+ }
+ }
+
/**
* Change the ownership of a pending message to the given new {@literal consumer} without increasing the delivered
* count.
@@ -543,6 +893,143 @@ default Long xDel(byte @NonNull [] key, @NonNull String @NonNull... recordIds) {
*/
Long xDel(byte @NonNull [] key, @NonNull RecordId @NonNull... recordIds);
+ /**
+ * Additional options applicable for {@literal XDELEX} and {@literal XACKDEL} commands.
+ *
+ * @author Viktoriya Kutsarova
+ * @since 4.0
+ */
+ class XDelOptions {
+
+ private static final XDelOptions DEFAULT = new XDelOptions(StreamDeletionPolicy.keep());
+
+ private final @NonNull StreamDeletionPolicy pendingReferences;
+
+ private XDelOptions(@NonNull StreamDeletionPolicy pendingReferences) {
+ this.pendingReferences = pendingReferences;
+ }
+
+ /**
+ * Create an {@link XDelOptions} instance with default options.
+ *
+ * This returns the default options for the {@literal XDELEX} and {@literal XACKDEL} commands
+ * with {@link StreamDeletionPolicy#KEEP_REFERENCES} as the deletion policy, which preserves
+ * existing references in consumer groups' PELs (similar to the behavior of {@literal XDEL}).
+ *
+ * @return a default {@link XDelOptions} instance with {@link StreamDeletionPolicy#KEEP_REFERENCES}.
+ */
+ public static XDelOptions defaults() {
+ return DEFAULT;
+ }
+
+ /**
+ * Set the deletion policy for the delete operation.
+ *
+ * @param deletionPolicy the deletion policy to apply.
+ * @return new instance of {@link XDelOptions}.
+ */
+ public static XDelOptions deletionPolicy(StreamDeletionPolicy deletionPolicy) {
+ return new XDelOptions(deletionPolicy);
+ }
+
+ /**
+ * @return the deletion policy.
+ */
+ @NonNull
+ public StreamDeletionPolicy getPendingReferences() {
+ return pendingReferences;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof XDelOptions that)) {
+ return false;
+ }
+ return pendingReferences.equals(that.pendingReferences);
+ }
+
+ @Override
+ public int hashCode() {
+ return pendingReferences.hashCode();
+ }
+ }
+
+ /**
+ * Deletes one or multiple entries from the stream at the specified key.
+ *
+ * XDELEX is an extension of the Redis Streams XDEL command that provides more control over how message entries
+ * are deleted concerning consumer groups.
+ *
+ * @param key the {@literal key} the stream is stored at.
+ * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaults()} ()} for default behavior.
+ * @param recordIds the id's of the records to remove.
+ * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists,
+ * {@link StreamEntryDeletionResult#DELETED} if the entry was deleted, {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED}
+ * if the entry was not deleted but there are still dangling references (ACKED deletion policy).
+ * Returns {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: XDELEX
+ */
+ default List xDelEx(byte @NonNull [] key, XDelOptions options, @NonNull String @NonNull... recordIds) {
+ return xDelEx(key, options, Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new));
+ }
+
+ /**
+ * Deletes one or multiple entries from the stream at the specified key.
+ *
+ * XDELEX is an extension of the Redis Streams XDEL command that provides more control over how message entries
+ * are deleted concerning consumer groups.
+ *
+ * @param key the {@literal key} the stream is stored at.
+ * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaults()} ()} for default behavior.
+ * @param recordIds the id's of the records to remove.
+ * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists,
+ * {@link StreamEntryDeletionResult#DELETED} if the entry was deleted, {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED}
+ * if the entry was not deleted but there are still dangling references (ACKED deletion policy).
+ * Returns {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: XDELEX
+ */
+ List xDelEx(byte @NonNull [] key, XDelOptions options, @NonNull RecordId @NonNull... recordIds);
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries (messages) for a stream consumer group at the specified key.
+ *
+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the
+ * given consumer group and simultaneously attempts to delete the corresponding entries from the stream.
+ *
+ * @param key the {@literal key} the stream is stored at.
+ * @param group name of the consumer group.
+ * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaults()} ()} for default behavior.
+ * @param recordIds the id's of the records to acknowledge and remove.
+ * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#DELETED} if the entry was acknowledged and deleted,
+ * {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists, {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED}
+ * if the entry was acknowledged but not deleted (when using ACKED deletion policy).
+ * Returns {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: XACKDEL
+ */
+ default List xAckDel(byte @NonNull [] key, @NonNull String group, XDelOptions options, @NonNull String @NonNull... recordIds) {
+ return xAckDel(key, group, options, Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new));
+ }
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries (messages) for a stream consumer group at the specified key.
+ *
+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the
+ * given consumer group and simultaneously attempts to delete the corresponding entries from the stream.
+ *
+ * @param key the {@literal key} the stream is stored at.
+ * @param group name of the consumer group.
+ * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaults()} ()} for default behavior.
+ * @param recordIds the id's of the records to acknowledge and remove.
+ * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#DELETED} if the entry was acknowledged and deleted,
+ * {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists, {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED}
+ * if the entry was acknowledged but not deleted (when using ACKED deletion policy).
+ * Returns {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: XACKDEL
+ */
+ List xAckDel(byte @NonNull [] key, @NonNull String group, XDelOptions options, @NonNull RecordId @NonNull... recordIds);
/**
* Create a consumer group.
*
@@ -1061,4 +1548,14 @@ public boolean hasMinIdleTime() {
* @see Redis Documentation: XTRIM
*/
Long xTrim(byte @NonNull [] key, long count, boolean approximateTrimming);
+
+ /**
+ * Trims the stream to {@code count} elements.
+ *
+ * @param key the stream key.
+ * @param options the trimming options.
+ * @return number of removed entries. {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: XTRIM
+ */
+ Long xTrim(byte @NonNull [] key, @NonNull XTrimOptions options);
}
diff --git a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java
index 8b4a3c4302..6b40aa2703 100644
--- a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java
@@ -3095,6 +3095,53 @@ default Long xDel(@NonNull String key, @NonNull String @NonNull... entryIds) {
Long xDel(@NonNull String key, @NonNull RecordId @NonNull... recordIds);
+ /**
+ * Deletes one or multiple entries from the stream at the specified key.
+ *
+ * XDELEX is an extension of the Redis Streams XDEL command that provides more control over how message entries
+ * are deleted concerning consumer groups.
+ *
+ * @param key the {@literal key} the stream is stored at.
+ * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaultOptions()} for default behavior.
+ * @param recordIds the id's of the records to remove.
+ * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists,
+ * {@link StreamEntryDeletionResult#DELETED} if the entry was deleted,
+ * {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED}
+ * if the entry was not deleted but there are still dangling references (ACKED deletion policy).
+ * Returns {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: XDELEX
+ */
+ default List xDelEx(@NonNull String key, XDelOptions options, @NonNull String @NonNull... recordIds) {
+ return xDelEx(key, options, entryIds(recordIds));
+ }
+
+ List xDelEx(@NonNull String key, XDelOptions options, @NonNull RecordId @NonNull... recordIds);
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries (messages) for a stream consumer group at the specified key.
+ *
+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the
+ * given consumer group and simultaneously attempts to delete the corresponding entries from the stream.
+ *
+ * @param key the {@literal key} the stream is stored at.
+ * @param group name of the consumer group.
+ * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaultOptions()} for default behavior.
+ * @param recordIds the id's of the records to acknowledge and remove.
+ * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#DELETED} if
+ * the entry was acknowledged and deleted, {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists,
+ * {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED} if the entry was acknowledged
+ * but not deleted (when using ACKED deletion policy).
+ * Returns {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: XACKDEL
+ */
+ default List xAckDel(@NonNull String key, @NonNull String group, XDelOptions options,
+ @NonNull String @NonNull... recordIds) {
+ return xAckDel(key, group, options, entryIds(recordIds));
+ }
+
+ List xAckDel(@NonNull String key, @NonNull String group, XDelOptions options,
+ @NonNull RecordId @NonNull... recordIds);
+
/**
* Create a consumer group.
*
@@ -3522,4 +3569,14 @@ List xRevRange(@NonNull String key, org.springframework.data.domai
* @see Redis Documentation: XTRIM
*/
Long xTrim(@NonNull String key, long count, boolean approximateTrimming);
+
+ /**
+ * Trims the stream to {@code count} elements.
+ *
+ * @param key the stream key.
+ * @param options the trimming options.
+ * @return number of removed entries. {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: XTRIM
+ */
+ Long xTrim(@NonNull String key, @NonNull XTrimOptions options);
}
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java
index 5e832e9af1..00b3dcd8c1 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java
@@ -33,6 +33,7 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands;
+import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -44,6 +45,7 @@
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.util.Assert;
+import redis.clients.jedis.params.XTrimParams;
/**
* @author Dengliming
@@ -144,6 +146,37 @@ public Long xDel(byte[] key, RecordId... recordIds) {
}
}
+ @Override
+ public List xDelEx(byte[] key, XDelOptions options, RecordId... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(recordIds, "recordIds must not be null");
+
+ try {
+ return StreamConverters.toStreamEntryDeletionResults(connection.getCluster().xdelex(key,
+ StreamConverters.toStreamDeletionPolicy(options),
+ entryIdsToBytes(Arrays.asList(recordIds))));
+ } catch (Exception ex) {
+ throw convertJedisAccessException(ex);
+ }
+ }
+
+ @Override
+ public List xAckDel(byte[] key, String group, XDelOptions options, RecordId... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(group, "Group must not be null");
+ Assert.notNull(recordIds, "recordIds must not be null");
+
+ try {
+ return StreamConverters.toStreamEntryDeletionResults(connection.getCluster().xackdel(key, JedisConverters.toBytes(group),
+ StreamConverters.toStreamDeletionPolicy(options),
+ entryIdsToBytes(Arrays.asList(recordIds))));
+ } catch (Exception ex) {
+ throw convertJedisAccessException(ex);
+ }
+ }
+
@Override
public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset) {
return xGroupCreate(key, groupName, readOffset, false);
@@ -380,6 +413,20 @@ public Long xTrim(byte[] key, long count, boolean approximateTrimming) {
}
}
+ @Override
+ public Long xTrim(byte[] key, XTrimOptions options) {
+
+ Assert.notNull(key, "Key must not be null");
+
+ XTrimParams xTrimParams = StreamConverters.toXTrimParams(options);
+
+ try {
+ return connection.getCluster().xtrim(key, xTrimParams);
+ } catch (Exception ex) {
+ throw convertJedisAccessException(ex);
+ }
+ }
+
private DataAccessException convertJedisAccessException(Exception ex) {
return connection.convertJedisAccessException(ex);
}
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java
index d6d6570743..f619068b6a 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java
@@ -24,6 +24,7 @@
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
+import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.resps.StreamConsumerInfo;
import redis.clients.jedis.resps.StreamGroupInfo;
@@ -132,6 +133,33 @@ public Long xDel(byte @NonNull [] key, @NonNull RecordId @NonNull... recordIds)
StreamConverters.entryIdsToBytes(Arrays.asList(recordIds)));
}
+ @Override
+ public List xDelEx(byte @NonNull [] key, @NonNull XDelOptions options,
+ @NonNull RecordId @NonNull... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(options, "Options must not be null");
+ Assert.notNull(recordIds, "recordIds must not be null");
+
+ return connection.invoke().from(Jedis::xdelex, ResponseCommands::xdelex, key,
+ StreamConverters.toStreamDeletionPolicy(options), StreamConverters.entryIdsToBytes(Arrays.asList(recordIds)))
+ .get(StreamConverters::toStreamEntryDeletionResults);
+ }
+
+ @Override
+ public List xAckDel(byte @NonNull [] key, @NonNull String group, @NonNull XDelOptions options,
+ @NonNull RecordId @NonNull... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(group, "Group must not be null");
+ Assert.notNull(options, "Options must not be null");
+ Assert.notNull(recordIds, "recordIds must not be null");
+
+ return connection.invoke().from(Jedis::xackdel, ResponseCommands::xackdel, key, JedisConverters.toBytes(group),
+ StreamConverters.toStreamDeletionPolicy(options), StreamConverters.entryIdsToBytes(Arrays.asList(recordIds)))
+ .get(StreamConverters::toStreamEntryDeletionResults);
+ }
+
@Override
public String xGroupCreate(byte @NonNull [] key, @NonNull String groupName, @NonNull ReadOffset readOffset) {
return xGroupCreate(key, groupName, readOffset, false);
@@ -319,4 +347,15 @@ public Long xTrim(byte @NonNull [] key, long count, boolean approximateTrimming)
return connection.invoke().just(Jedis::xtrim, PipelineBinaryCommands::xtrim, key, count, approximateTrimming);
}
+ @Override
+ public Long xTrim(byte @NonNull [] key, @NonNull XTrimOptions options) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(options, "XTrimOptions must not be null");
+
+ XTrimParams xTrimParams = StreamConverters.toXTrimParams(options);
+
+ return connection.invoke().just(Jedis::xtrim, PipelineBinaryCommands::xtrim, key, xTrimParams);
+ }
+
}
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java
index 5a534864d4..0c93c0973a 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java
@@ -15,13 +15,16 @@
*/
package org.springframework.data.redis.connection.jedis;
+import org.springframework.data.redis.connection.RedisStreamCommands;
import redis.clients.jedis.BuilderFactory;
import redis.clients.jedis.StreamEntryID;
+import redis.clients.jedis.args.StreamDeletionPolicy;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
+import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.resps.StreamEntry;
import redis.clients.jedis.resps.StreamPendingEntry;
@@ -37,7 +40,17 @@
import org.jspecify.annotations.Nullable;
import org.springframework.data.domain.Range;
-import org.springframework.data.redis.connection.RedisStreamCommands;
+import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.TrimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.TrimOperator;
+import org.springframework.data.redis.connection.RedisStreamCommands.MaxLenTrimStrategy;
+import org.springframework.data.redis.connection.RedisStreamCommands.MinIdTrimStrategy;
+
+import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.PendingMessage;
@@ -57,6 +70,7 @@
* @author dengliming
* @author Mark Paluch
* @author Jeonggyu Choi
+ * @author Viktoriya Kutsarova
* @since 2.3
*/
class StreamConverters {
@@ -207,25 +221,66 @@ static org.springframework.data.redis.connection.stream.PendingMessages toPendin
}
@SuppressWarnings("NullAway")
- public static XAddParams toXAddParams(RecordId recordId, RedisStreamCommands.XAddOptions options) {
+ public static XAddParams toXAddParams(RecordId recordId, XAddOptions options) {
XAddParams params = new XAddParams();
params.id(toStreamEntryId(recordId.getValue()));
- if (options.hasMaxlen()) {
- params.maxLen(options.getMaxlen());
+ if (options.isNoMkStream()) {
+ params.noMkStream();
}
- if (options.hasMinId()) {
- params.minId(options.getMinId().getValue());
+ if (options.hasTrimOptions()) {
+ TrimOptions trim = options.getTrimOptions();
+ var strategy = trim.getTrimStrategy();
+ if (strategy instanceof MaxLenTrimStrategy max) {
+ params.maxLen(max.threshold());
+ } else if (strategy instanceof MinIdTrimStrategy min) {
+ params.minId(min.threshold().getValue());
+ }
+
+ if (trim.getTrimOperator() == TrimOperator.APPROXIMATE) {
+ params.approximateTrimming();
+ } else {
+ params.exactTrimming();
+ }
+
+ if (trim.hasLimit()) {
+ params.limit(trim.getLimit());
+ }
+
+ if (trim.hasDeletionPolicy()) {
+ params.trimmingMode(toStreamDeletionPolicy(trim.getPendingReferences()));
+ }
}
- if (options.isNoMkStream()) {
- params.noMkStream();
+ return params;
+ }
+
+ public static XTrimParams toXTrimParams(XTrimOptions options) {
+
+ XTrimParams params = new XTrimParams();
+
+ TrimOptions trim = options.getTrimOptions();
+ var strategy = trim.getTrimStrategy();
+ if (strategy instanceof MaxLenTrimStrategy max) {
+ params.maxLen(max.threshold());
+ } else if (strategy instanceof MinIdTrimStrategy min) {
+ params.minId(min.threshold().getValue());
}
- if (options.isApproximateTrimming()) {
+ if (trim.getTrimOperator() == TrimOperator.APPROXIMATE) {
params.approximateTrimming();
+ } else {
+ params.exactTrimming();
+ }
+
+ if (trim.hasLimit()) {
+ params.limit(trim.getLimit());
+ }
+
+ if (trim.hasDeletionPolicy()) {
+ params.trimmingMode(toStreamDeletionPolicy(trim.getPendingReferences()));
}
return params;
@@ -248,7 +303,16 @@ private static StreamEntryID toStreamEntryId(String value) {
return new StreamEntryID(value);
}
- public static XClaimParams toXClaimParams(RedisStreamCommands.XClaimOptions options) {
+ private static StreamDeletionPolicy toStreamDeletionPolicy(RedisStreamCommands.StreamDeletionPolicy deletionPolicy) {
+
+ return switch (deletionPolicy) {
+ case KEEP_REFERENCES -> StreamDeletionPolicy.KEEP_REFERENCES;
+ case DELETE_REFERENCES -> StreamDeletionPolicy.DELETE_REFERENCES;
+ case ACKNOWLEDGED -> StreamDeletionPolicy.ACKNOWLEDGED;
+ };
+ }
+
+ public static XClaimParams toXClaimParams(XClaimOptions options) {
XClaimParams params = XClaimParams.xClaimParams();
@@ -305,7 +369,7 @@ public static XReadGroupParams toXReadGroupParams(StreamReadOptions readOptions)
}
@SuppressWarnings("NullAway")
- public static XPendingParams toXPendingParams(RedisStreamCommands.XPendingOptions options) {
+ public static XPendingParams toXPendingParams(XPendingOptions options) {
Range range = (Range) options.getRange();
XPendingParams xPendingParams = XPendingParams.xPendingParams(StreamConverters.getLowerValue(range),
@@ -321,4 +385,39 @@ public static XPendingParams toXPendingParams(RedisStreamCommands.XPendingOption
return xPendingParams;
}
+ public static StreamDeletionPolicy toStreamDeletionPolicy(XDelOptions options) {
+ return toStreamDeletionPolicy(options.getPendingReferences());
+ }
+
+ /**
+ * Convert Jedis {@link redis.clients.jedis.resps.StreamEntryDeletionResult} to Spring Data Redis
+ * {@link RedisStreamCommands.StreamEntryDeletionResult}.
+ *
+ * @param result the Jedis deletion result enum
+ * @return the corresponding Spring Data Redis enum
+ * @since 4.0
+ */
+ public static RedisStreamCommands.StreamEntryDeletionResult toStreamEntryDeletionResult(
+ redis.clients.jedis.resps.StreamEntryDeletionResult result) {
+ return switch (result) {
+ case NOT_FOUND -> RedisStreamCommands.StreamEntryDeletionResult.NOT_FOUND;
+ case DELETED -> RedisStreamCommands.StreamEntryDeletionResult.DELETED;
+ case NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED ->
+ RedisStreamCommands.StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED;
+ };
+ }
+
+ /**
+ * Convert a list of Jedis {@link redis.clients.jedis.resps.StreamEntryDeletionResult} to a {@link List} of Spring Data Redis
+ * {@link RedisStreamCommands.StreamEntryDeletionResult}.
+ *
+ * @param results the list of Jedis deletion result enums
+ * @return the list of Spring Data Redis deletion result enums
+ * @since 4.0
+ */
+ public static List toStreamEntryDeletionResults(
+ List results) {
+ return results.stream().map(StreamConverters::toStreamEntryDeletionResult).collect(Collectors.toList());
+ }
+
}
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java
index 1103b5400d..f1b714e0af 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java
@@ -1147,6 +1147,8 @@ static class TypeHints {
COMMAND_OUTPUT_TYPE_MAPPING.put(SUNIONSTORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(STRLEN, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(TTL, IntegerOutput.class);
+ COMMAND_OUTPUT_TYPE_MAPPING.put(XACK, IntegerOutput.class);
+ COMMAND_OUTPUT_TYPE_MAPPING.put(XDEL, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(XLEN, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(XTRIM, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZADD, IntegerOutput.class);
@@ -1232,6 +1234,7 @@ static class TypeHints {
COMMAND_OUTPUT_TYPE_MAPPING.put(TYPE, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(WATCH, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(UNWATCH, StatusOutput.class);
+ COMMAND_OUTPUT_TYPE_MAPPING.put(XGROUP, StatusOutput.class);
// VALUE LIST
COMMAND_OUTPUT_TYPE_MAPPING.put(HMGET, ValueListOutput.class);
@@ -1277,6 +1280,10 @@ static class TypeHints {
COMMAND_OUTPUT_TYPE_MAPPING.put(SINTER, ValueSetOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SMEMBERS, ValueSetOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SUNION, ValueSetOutput.class);
+
+ // ENUM SET
+ COMMAND_OUTPUT_TYPE_MAPPING.put(XACKDEL, EnumSetOutput.class);
+ COMMAND_OUTPUT_TYPE_MAPPING.put(XDELEX, EnumSetOutput.class);
}
/**
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java
index 6291583d3f..264b235e9f 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java
@@ -36,7 +36,10 @@
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
+import org.springframework.data.redis.connection.ReactiveStreamCommands.AcknowledgeDeleteCommand;
+import org.springframework.data.redis.connection.ReactiveStreamCommands.DeleteExCommand;
import org.springframework.data.redis.connection.ReactiveStreamCommands.GroupCommand.GroupCommandAction;
+import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.PendingMessages;
@@ -99,18 +102,7 @@ public Flux> xAdd(Publisher new CommandResponse<>(command, RecordId.of(value)));
@@ -160,6 +152,43 @@ public Flux> xDel(Publisher
}));
}
+ @Override
+ public Flux>> xDelEx(Publisher commands) {
+
+ return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
+
+ Assert.notNull(command.getKey(), "Key must not be null");
+ Assert.notNull(command.getRecordIds(), "recordIds must not be null");
+
+ return cmd.xdelex(command.getKey(),
+ StreamConverters.toXDelArgs(command.getOptions()),
+ entryIdsToString(command.getRecordIds()))
+ .map(StreamConverters::toStreamEntryDeletionResult)
+ .collectList()
+ .map(results -> new CommandResponse<>(command, results));
+ }));
+ }
+
+ @Override
+ public Flux>> xAckDel(
+ Publisher commands) {
+
+ return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
+
+ Assert.notNull(command.getKey(), "Key must not be null");
+ Assert.notNull(command.getGroup(), "Group must not be null");
+ Assert.notNull(command.getRecordIds(), "recordIds must not be null");
+
+ return cmd.xackdel(command.getKey(),
+ ByteUtils.getByteBuffer(command.getGroup()),
+ StreamConverters.toXDelArgs(command.getOptions()),
+ entryIdsToString(command.getRecordIds()))
+ .map(StreamConverters::toStreamEntryDeletionResult)
+ .collectList()
+ .map(results -> new CommandResponse<>(command, results));
+ }));
+ }
+
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Flux> xGroup(Publisher commands) {
@@ -370,9 +399,9 @@ public Flux> xTrim(Publisher comm
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
- Assert.notNull(command.getCount(), "Count must not be null");
+ Assert.notNull(command.getOptions(), "Options must not be null");
- return cmd.xtrim(command.getKey(), command.isApproximateTrimming(), command.getCount())
+ return cmd.xtrim(command.getKey(), StreamConverters.toXTrimArgs(command.getOptions()))
.map(value -> new NumericResponse<>(command, value));
}));
}
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java
index d663f17d86..e47c3fbe16 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java
@@ -20,6 +20,7 @@
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XPendingArgs;
import io.lettuce.core.XReadArgs;
+import io.lettuce.core.XTrimArgs;
import io.lettuce.core.api.async.RedisStreamAsyncCommands;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
@@ -84,16 +85,7 @@ public RecordId xAdd(@NonNull MapRecord xDelEx(byte @NonNull [] key, @NonNull XDelOptions options,
+ @NonNull RecordId @NonNull... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(options, "Options must not be null");
+ Assert.notNull(recordIds, "recordIds must not be null");
+
+ return connection.invoke().from(RedisStreamAsyncCommands::xdelex, key, StreamConverters.toXDelArgs(options),
+ entryIdsToString(recordIds)).get(StreamConverters::toStreamEntryDeletionResults);
+ }
+
+ @Override
+ public List xAckDel(byte @NonNull [] key, @NonNull String group,
+ @NonNull XDelOptions options, @NonNull RecordId @NonNull... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(group, "Group must not be null");
+ Assert.notNull(options, "Options must not be null");
+ Assert.notNull(recordIds, "recordIds must not be null");
+
+ return connection.invoke().from(RedisStreamAsyncCommands::xackdel, key, LettuceConverters.toBytes(group),
+ StreamConverters.toXDelArgs(options), entryIdsToString(recordIds))
+ .get(StreamConverters::toStreamEntryDeletionResults);
+ }
+
@Override
public String xGroupCreate(byte @NonNull [] key, @NonNull String groupName, @NonNull ReadOffset readOffset) {
return xGroupCreate(key, groupName, readOffset, false);
@@ -324,6 +342,17 @@ public Long xTrim(byte @NonNull [] key, long count, boolean approximateTrimming)
return connection.invoke().just(RedisStreamAsyncCommands::xtrim, key, approximateTrimming, count);
}
+ @Override
+ public Long xTrim(byte @NonNull [] key, @NonNull XTrimOptions options) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(options, "XTrimOptions must not be null");
+
+ XTrimArgs xTrimArgs = StreamConverters.toXTrimArgs(options);
+
+ return connection.invoke().just(RedisStreamAsyncCommands::xtrim, key, xTrimArgs);
+ }
+
RedisClusterAsyncCommands getAsyncDedicatedConnection() {
return connection.getAsyncDedicatedConnection();
}
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java
index ae17a9ca70..81a3d9525d 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java
@@ -15,17 +15,30 @@
*/
package org.springframework.data.redis.connection.lettuce;
+import io.lettuce.core.StreamDeletionPolicy;
import io.lettuce.core.StreamMessage;
+import io.lettuce.core.XAddArgs;
import io.lettuce.core.XClaimArgs;
import io.lettuce.core.XReadArgs;
+import io.lettuce.core.XTrimArgs;
import io.lettuce.core.models.stream.PendingMessage;
import io.lettuce.core.models.stream.PendingMessages;
+import io.lettuce.core.models.stream.StreamEntryDeletionResult;
import java.time.Duration;
import java.util.List;
import org.springframework.core.convert.converter.Converter;
+import org.springframework.data.redis.connection.RedisStreamCommands;
+import org.springframework.data.redis.connection.RedisStreamCommands.MaxLenTrimStrategy;
+import org.springframework.data.redis.connection.RedisStreamCommands.MinIdTrimStrategy;
+import org.springframework.data.redis.connection.RedisStreamCommands.TrimOperator;
+import org.springframework.data.redis.connection.RedisStreamCommands.TrimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.TrimStrategy;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
@@ -41,6 +54,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
+ * @author Viktoriya Kutsarova
* @since 2.2
*/
@SuppressWarnings({ "rawtypes" })
@@ -67,6 +81,22 @@ static XClaimArgs toXClaimArgs(XClaimOptions options) {
return XClaimOptionsToXClaimArgsConverter.INSTANCE.convert(options);
}
+ static XAddArgs toXAddArgs(RecordId recordId, XAddOptions options) {
+ XAddArgs args = XAddOptionsToXAddArgsConverter.INSTANCE.convert(options);
+ if (!recordId.shouldBeAutoGenerated()) {
+ args.id(recordId.getValue());
+ }
+ return args;
+ }
+
+ static XTrimArgs toXTrimArgs(XTrimOptions options) {
+ return XTrimOptionsToXTrimArgsConverter.INSTANCE.convert(options);
+ }
+
+ static StreamDeletionPolicy toXDelArgs(XDelOptions options) {
+ return toStreamDeletionPolicy(options.getPendingReferences());
+ }
+
static Converter, ByteRecord> byteRecordConverter() {
return (it) -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBytes(it.getBody());
}
@@ -169,4 +199,123 @@ public XClaimArgs convert(XClaimOptions source) {
}
}
+
+ /**
+ * {@link Converter} to convert {@link XAddOptions} to Lettuce's {@link XAddArgs}.
+ *
+ * @since 4.0
+ */
+ enum XAddOptionsToXAddArgsConverter implements Converter {
+
+ INSTANCE;
+
+ @Override
+ public XAddArgs convert(XAddOptions source) {
+
+ XAddArgs args = new XAddArgs();
+
+ if (source.isNoMkStream()) {
+ args.nomkstream();
+ }
+
+ if (!source.hasTrimOptions()) {
+ return args;
+ }
+
+ TrimOptions trimOptions = source.getTrimOptions();
+ TrimStrategy trimStrategy = trimOptions.getTrimStrategy();
+ if (trimStrategy instanceof MaxLenTrimStrategy maxLenTrimStrategy) {
+ args.maxlen(maxLenTrimStrategy.threshold());
+ }
+ else if (trimStrategy instanceof MinIdTrimStrategy minIdTrimStrategy) {
+ args.minId(minIdTrimStrategy.threshold().getValue());
+ }
+
+ if (trimOptions.hasLimit()) {
+ args.limit(trimOptions.getLimit());
+ }
+
+ args.exactTrimming(trimOptions.getTrimOperator() == TrimOperator.EXACT);
+ args.approximateTrimming(trimOptions.getTrimOperator() == TrimOperator.APPROXIMATE);
+
+ if (trimOptions.hasDeletionPolicy()) {
+ args.trimmingMode(toStreamDeletionPolicy(trimOptions.getPendingReferences()));
+ }
+
+ return args;
+ }
+ }
+
+ enum XTrimOptionsToXTrimArgsConverter implements Converter {
+ INSTANCE;
+
+ @Override
+ public XTrimArgs convert(XTrimOptions source) {
+
+ XTrimArgs args = new XTrimArgs();
+
+ TrimOptions trimOptions = source.getTrimOptions();
+ TrimStrategy trimStrategy = trimOptions.getTrimStrategy();
+ if (trimStrategy instanceof MaxLenTrimStrategy maxLenTrimStrategy) {
+ args.maxlen(maxLenTrimStrategy.threshold());
+ }
+ else if (trimStrategy instanceof MinIdTrimStrategy minIdTrimStrategy) {
+ args.minId(minIdTrimStrategy.threshold().getValue());
+ }
+
+ if (trimOptions.hasLimit()) {
+ args.limit(trimOptions.getLimit());
+ }
+
+ args.exactTrimming(trimOptions.getTrimOperator() == TrimOperator.EXACT);
+ args.approximateTrimming(trimOptions.getTrimOperator() == TrimOperator.APPROXIMATE);
+
+ if (trimOptions.hasDeletionPolicy()) {
+ args.trimmingMode(toStreamDeletionPolicy(trimOptions.getPendingReferences()));
+ }
+
+ return args;
+ }
+ }
+
+ public static StreamDeletionPolicy toStreamDeletionPolicy(RedisStreamCommands.StreamDeletionPolicy deletionPolicy) {
+
+ return switch (deletionPolicy) {
+ case KEEP_REFERENCES -> StreamDeletionPolicy.KEEP_REFERENCES;
+ case DELETE_REFERENCES -> StreamDeletionPolicy.DELETE_REFERENCES;
+ case ACKNOWLEDGED -> StreamDeletionPolicy.ACKNOWLEDGED;
+ };
+ }
+
+ /**
+ * Convert Lettuce {@link io.lettuce.core.models.stream.StreamEntryDeletionResult} to Spring Data Redis
+ * {@link StreamEntryDeletionResult}.
+ *
+ * @param result the Lettuce deletion result enum
+ * @return the corresponding Spring Data Redis enum
+ * @since 4.0
+ */
+ static RedisStreamCommands.StreamEntryDeletionResult toStreamEntryDeletionResult(
+ StreamEntryDeletionResult result) {
+ return switch (result) {
+ case UNKNOWN -> RedisStreamCommands.StreamEntryDeletionResult.UNKNOWN;
+ case NOT_FOUND -> RedisStreamCommands.StreamEntryDeletionResult.NOT_FOUND;
+ case DELETED -> RedisStreamCommands.StreamEntryDeletionResult.DELETED;
+ case NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED ->
+ RedisStreamCommands.StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED;
+ };
+ }
+
+ /**
+ * Convert a list of Lettuce {@link io.lettuce.core.models.stream.StreamEntryDeletionResult} to a {@link List} of Spring Data Redis
+ * {@link RedisStreamCommands.StreamEntryDeletionResult}.
+ *
+ * @param results the list of Lettuce deletion result enums
+ * @return the list of Spring Data Redis deletion result enums
+ * @since 4.0
+ */
+ static List toStreamEntryDeletionResults(
+ List results) {
+ return results.stream().map(StreamConverters::toStreamEntryDeletionResult).toList();
+ }
}
diff --git a/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java b/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java
index 46f23598cf..4b7a56afef 100644
--- a/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java
@@ -22,7 +22,11 @@
import org.jspecify.annotations.NullUnmarked;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
+import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
@@ -35,6 +39,7 @@
* @author Mark Paluch
* @author Christoph Strobl
* @author Dengliming
+ * @author Viktoriya Kutsarova
* @since 2.2
*/
@NullUnmarked
@@ -80,6 +85,54 @@ public interface BoundStreamOperations {
*/
Long delete(@NonNull String @NonNull... recordIds);
+ /**
+ * Deletes one or multiple entries from the stream at the specified key with extended options.
+ *
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's as strings.
+ * @return list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XDELEX
+ * @since 4.0
+ */
+ List deleteWithOptions(@NonNull XDelOptions options, @NonNull String @NonNull ... recordIds);
+
+ /**
+ * Deletes one or multiple entries from the stream at the specified key with extended options.
+ *
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's.
+ * @return list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XDELEX
+ * @since 4.0
+ */
+ List deleteWithOptions(@NonNull XDelOptions options, @NonNull RecordId @NonNull ... recordIds);
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key.
+ *
+ * @param group name of the consumer group.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's as strings.
+ * @return list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XACKDEL
+ * @since 4.0
+ */
+ List acknowledgeAndDelete(@NonNull String group, @NonNull XDelOptions options,
+ @NonNull String @NonNull ... recordIds);
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key.
+ *
+ * @param group name of the consumer group.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's.
+ * @return list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XACKDEL
+ * @since 4.0
+ */
+ List acknowledgeAndDelete(@NonNull String group, @NonNull XDelOptions options,
+ @NonNull RecordId @NonNull ... recordIds);
+
/**
* Create a consumer group.
*
@@ -219,4 +272,18 @@ public interface BoundStreamOperations {
* @see Redis Documentation: XTRIM
*/
Long trim(long count, boolean approximateTrimming);
+
+ /**
+ * Trims the stream according to the specified {@link RedisStreamCommands.XTrimOptions}.
+ *
+ * Supports various trimming strategies including {@literal MAXLEN} (limit by count) and
+ * {@literal MINID} (evict entries older than a specific ID), with options for approximate
+ * or exact trimming.
+ *
+ * @param options the trimming options specifying the strategy and parameters. Must not be {@literal null}.
+ * @return number of removed entries. {@literal null} when used in pipeline / transaction.
+ * @since 2.7.4
+ * @see Redis Documentation: XTRIM
+ */
+ Long trim(@NonNull XTrimOptions options);
}
diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java
index b87d0b1a0b..0979d97c92 100644
--- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java
@@ -15,6 +15,7 @@
*/
package org.springframework.data.redis.core;
+import org.springframework.data.redis.connection.RedisStreamCommands;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -35,6 +36,9 @@
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
@@ -179,6 +183,30 @@ public Mono delete(@NonNull K key, RecordId @NonNull... recordIds) {
return createMono(streamCommands -> streamCommands.xDel(rawKey(key), recordIds));
}
+ @Override
+ public Flux deleteWithOptions(@NonNull K key, @NonNull XDelOptions options,
+ @NonNull RecordId @NonNull... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(options, "XDelOptions must not be null");
+ Assert.notNull(recordIds, "RecordIds must not be null");
+
+ return createFlux(streamCommands -> streamCommands.xDelEx(rawKey(key), options, recordIds));
+ }
+
+ @Override
+ public Flux acknowledgeAndDelete(@NonNull K key, @NonNull String group,
+ @NonNull XDelOptions options,
+ @NonNull RecordId @NonNull... recordIds) {
+
+ Assert.notNull(key, "Key must not be null");
+ Assert.hasText(group, "Group must not be null or empty");
+ Assert.notNull(options, "XDelOptions must not be null");
+ Assert.notNull(recordIds, "RecordIds must not be null");
+
+ return createFlux(streamCommands -> streamCommands.xAckDel(rawKey(key), group, options, recordIds));
+ }
+
@Override
public Mono createGroup(@NonNull K key, @NonNull ReadOffset readOffset, @NonNull String group) {
@@ -330,6 +358,14 @@ public Mono trim(@NonNull K key, long count, boolean approximateTrimming)
return createMono(streamCommands -> streamCommands.xTrim(rawKey(key), count, approximateTrimming));
}
+ @Override
+ public Mono trim(@NonNull K key, @NonNull XTrimOptions options) {
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(options, "XTrimOptions must not be null");
+
+ return createMono(streamCommands -> streamCommands.xTrim(rawKey(key), options));
+ }
+
@Override
public HashMapper getHashMapper(@NonNull Class targetType) {
return objectMapper.getHashMapper(targetType);
diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java
index 1e114678a5..3739136eef 100644
--- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java
@@ -30,8 +30,12 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -177,6 +181,25 @@ public Long delete(@NonNull K key, @NonNull RecordId @NonNull... recordIds) {
return execute(connection -> connection.xDel(rawKey, recordIds));
}
+ @Override
+ public List deleteWithOptions(@NonNull K key, @NonNull XDelOptions options,
+ @NonNull String @NonNull... recordIds) {
+
+ byte[] rawKey = rawKey(key);
+ RecordId[] recordIdArray = Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new);
+ return execute(connection -> connection.streamCommands().xDelEx(rawKey, options, recordIdArray));
+ }
+
+ @Override
+ public List acknowledgeAndDelete(@NonNull K key, @NonNull String group,
+ @NonNull XDelOptions options,
+ @NonNull String @NonNull... recordIds) {
+
+ byte[] rawKey = rawKey(key);
+ RecordId[] recordIdArray = Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new);
+ return execute(connection -> connection.streamCommands().xAckDel(rawKey, group, options, recordIdArray));
+ }
+
@Override
public String createGroup(@NonNull K key, @NonNull ReadOffset readOffset, @NonNull String group) {
@@ -328,6 +351,12 @@ public Long trim(@NonNull K key, long count, boolean approximateTrimming) {
return execute(connection -> connection.xTrim(rawKey, count, approximateTrimming));
}
+ @Override
+ public Long trim(@NonNull K key, @NonNull XTrimOptions options) {
+ byte[] rawKey = rawKey(key);
+ return execute(connection -> connection.streamCommands().xTrim(rawKey, options));
+ }
+
@Override
public HashMapper getHashMapper(@NonNull Class targetType) {
return objectMapper.getHashMapper(targetType);
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java
index df74d05b7b..a892ca1088 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java
@@ -15,11 +15,13 @@
*/
package org.springframework.data.redis.core;
+import org.springframework.data.redis.connection.RedisStreamCommands;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import org.jspecify.annotations.NonNull;
@@ -28,8 +30,11 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
+import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
@@ -256,6 +261,92 @@ default Mono delete(@NonNull Record record) {
*/
Mono delete(@NonNull K key, @NonNull RecordId @NonNull... recordIds);
+ /**
+ * Deletes one or multiple entries from the stream at the specified key with extended options.
+ *
+ * @param key the stream key.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's.
+ * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XDELEX
+ * @since 4.0
+ */
+ default Flux deleteWithOptions(@NonNull K key, @NonNull XDelOptions options, @NonNull String @NonNull... recordIds) {
+ return deleteWithOptions(key, options, Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new));
+ }
+
+ /**
+ * Deletes a given {@link Record} from the stream with extended options.
+ *
+ * @param record must not be {@literal null}.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID.
+ */
+ default Flux deleteWithOptions(@NonNull Record record, @NonNull XDelOptions options) {
+ Assert.notNull(record.getStream(), "Record.getStream() must not be null");
+ return deleteWithOptions(record.getStream(), options, record.getId());
+ }
+
+ /**
+ * Deletes one or multiple entries from the stream at the specified key with extended options.
+ *
+ * @param key the stream key.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's.
+ * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XDELEX
+ * @since 4.0
+ */
+ Flux deleteWithOptions(@NonNull K key, @NonNull XDelOptions options,
+ @NonNull RecordId @NonNull... recordIds);
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key.
+ *
+ * @param key the stream key.
+ * @param group name of the consumer group.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's.
+ * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XACKDEL
+ * @since 4.0
+ */
+ default Flux acknowledgeAndDelete(@NonNull K key, @NonNull String group,
+ @NonNull XDelOptions options, @NonNull String @NonNull... recordIds) {
+ return acknowledgeAndDelete(key, group, options, Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new));
+ }
+
+ /**
+ * Acknowledges and conditionally deletes a given {@link Record} for a stream consumer group.
+ *
+ * @param group name of the consumer group.
+ * @param record must not be {@literal null}.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XACKDEL
+ * @since 4.0
+ */
+ default Flux acknowledgeAndDelete(@NonNull String group, @NonNull Record record,
+ @NonNull XDelOptions options) {
+ Assert.notNull(record.getStream(), "Record.getStream() must not be null");
+ return acknowledgeAndDelete(record.getStream(), group, options, record.getId());
+ }
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key.
+ *
+ * @param key the stream key.
+ * @param group name of the consumer group.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record Id's.
+ * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XACKDEL
+ * @since 4.0
+ */
+ Flux acknowledgeAndDelete(@NonNull K key, @NonNull String group,
+ @NonNull XDelOptions options,
+ @NonNull RecordId @NonNull... recordIds);
+
/**
* Create a consumer group at the {@link ReadOffset#latest() latest offset}. This command creates the stream if it
* does not already exist.
@@ -660,6 +751,21 @@ default Flux> reverseRange(@NonNull Class targetType,
*/
Mono trim(@NonNull K key, long count, boolean approximateTrimming);
+ /**
+ * Trims the stream according to the specified {@link XTrimOptions}.
+ *
+ * Supports various trimming strategies including {@literal MAXLEN} (limit by count) and
+ * {@literal MINID} (evict entries older than a specific ID), with options for approximate
+ * or exact trimming.
+ *
+ * @param key the stream key.
+ * @param options the trimming options specifying the strategy and parameters. Must not be {@literal null}.
+ * @return number of removed entries.
+ * @since 4.0
+ * @see Redis Documentation: XTRIM
+ */
+ Mono trim(@NonNull K key, @NonNull XTrimOptions options);
+
/**
* Get the {@link HashMapper} for a specific type.
*
diff --git a/src/main/java/org/springframework/data/redis/core/RedisCommand.java b/src/main/java/org/springframework/data/redis/core/RedisCommand.java
index 2571a694f5..b130a73b8b 100644
--- a/src/main/java/org/springframework/data/redis/core/RedisCommand.java
+++ b/src/main/java/org/springframework/data/redis/core/RedisCommand.java
@@ -284,6 +284,25 @@ public enum RedisCommand {
// -- W
WATCH("rw", 1), //
+
+ // -- X
+ XACK("rw", 3), //
+ XACKDEL("rw", 3), //
+ XADD("rw", 3), //
+ XAUTOCLAIM("rw", 4), //
+ XCLAIM("rw", 4), //
+ XDEL("rw", 2), //
+ XDELEX("rw", 2), //
+ XGROUP("rw", 2), //
+ XINFO("r", 1), //
+ XLEN("r", 1), //
+ XPENDING("r", 1), //
+ XRANGE("r", 2), //
+ XREVRANGE("r", 2), //
+ XREAD("r", 2), //
+ XREADGROUP("rw", 4), //
+ XTRIM("rw", 2), //
+
// -- Z
ZADD("rw", 3), //
ZCARD("r", 1), //
diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java
index 6580347220..c914ceff58 100644
--- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java
@@ -29,6 +29,9 @@
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
@@ -241,6 +244,99 @@ default Long delete(@NonNull Record record) {
*/
Long delete(@NonNull K key, @NonNull RecordId @NonNull... recordIds);
+ /**
+ * Deletes one or multiple entries from the stream at the specified key with extended options.
+ *
+ * @param key the stream key.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record ids as strings.
+ * @return list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XDELEX
+ * @since 4.0
+ */
+ List deleteWithOptions(@NonNull K key, @NonNull XDelOptions options, @NonNull String @NonNull... recordIds);
+
+ /**
+ * Deletes one or multiple entries from the stream at the specified key with extended options.
+ *
+ * @param key the stream key.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record ids.
+ * @return list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XDELEX
+ * @since 4.0
+ */
+ default List deleteWithOptions(@NonNull K key, @NonNull XDelOptions options,
+ @NonNull RecordId @NonNull... recordIds) {
+ return deleteWithOptions(key, options, Arrays.stream(recordIds).map(RecordId::getValue).toArray(String[]::new));
+ }
+
+ /**
+ * Deletes a given {@link Record} from the stream with extended options.
+ *
+ * @param record must not be {@literal null}.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @return list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XDELEX
+ * @since 4.0
+ */
+ default List deleteWithOptions(@NonNull Record record, @NonNull XDelOptions options) {
+ Assert.notNull(record.getStream(), "Record.getStream() must not be null");
+ return deleteWithOptions(record.getStream(), options, record.getId().getValue());
+ }
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key.
+ *
+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the
+ * given consumer group and simultaneously attempts to delete the corresponding entries from the stream.
+ *
+ * @param key the stream key.
+ * @param group name of the consumer group.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record ids as strings.
+ * @return list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XACKDEL
+ * @since 4.0
+ */
+ List acknowledgeAndDelete(@NonNull K key, @NonNull String group, @NonNull XDelOptions options,
+ @NonNull String @NonNull... recordIds);
+
+ /**
+ * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key.
+ *
+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the
+ * given consumer group and simultaneously attempts to delete the corresponding entries from the stream.
+ *
+ * @param key the stream key.
+ * @param group name of the consumer group.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @param recordIds stream record ids.
+ * @return list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XACKDEL
+ * @since 4.0
+ */
+ default List acknowledgeAndDelete(@NonNull K key, @NonNull String group,
+ @NonNull XDelOptions options, @NonNull RecordId @NonNull... recordIds) {
+ return acknowledgeAndDelete(key, group, options, Arrays.stream(recordIds).map(RecordId::getValue).toArray(String[]::new));
+ }
+
+ /**
+ * Acknowledges and conditionally deletes a given {@link Record} for a stream consumer group.
+ *
+ * @param group name of the consumer group.
+ * @param record must not be {@literal null}.
+ * @param options the {@link XDelOptions} specifying deletion policy.
+ * @return list of {@link StreamEntryDeletionResult} for each ID.
+ * @see Redis Documentation: XACKDEL
+ * @since 4.0
+ */
+ default List acknowledgeAndDelete(@NonNull String group, @NonNull Record record,
+ @NonNull XDelOptions options) {
+ Assert.notNull(record.getStream(), "Record.getStream() must not be null");
+ return acknowledgeAndDelete(record.getStream(), group, options, record.getId().getValue());
+ }
+
/**
* Create a consumer group at the {@link ReadOffset#latest() latest offset}. This command creates the stream if it
* does not already exist.
@@ -649,6 +745,21 @@ default List> read(@NonNull Class targetType, @NonNull
*/
Long trim(@NonNull K key, long count, boolean approximateTrimming);
+ /**
+ * Trims the stream according to the specified {@link XTrimOptions}.
+ *
+ * Supports various trimming strategies including {@literal MAXLEN} (limit by count) and
+ * {@literal MINID} (evict entries older than a specific ID), with options for approximate
+ * or exact trimming.
+ *
+ * @param key the stream key.
+ * @param options the trimming options specifying the strategy and parameters. Must not be {@literal null}.
+ * @return number of removed entries. {@literal null} when used in pipeline / transaction.
+ * @since 2.4
+ * @see Redis Documentation: XTRIM
+ */
+ Long trim(@NonNull K key, @NonNull XTrimOptions options);
+
/**
* Get the {@link HashMapper} for a specific type.
*
diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java
index d09bb78ddf..bbec86fbc3 100644
--- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java
@@ -62,7 +62,11 @@
import org.springframework.data.redis.TestCondition;
import org.springframework.data.redis.connection.RedisGeoCommands.GeoLocation;
import org.springframework.data.redis.connection.RedisListCommands.Position;
+import org.springframework.data.redis.connection.RedisStreamCommands.StreamDeletionPolicy;
+import org.springframework.data.redis.connection.RedisStreamCommands.TrimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions;
import org.springframework.data.redis.connection.RedisStringCommands.BitOperation;
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
import org.springframework.data.redis.connection.RedisZSetCommands.ZAddArgs;
@@ -4026,7 +4030,7 @@ void xAddShouldCreateStream() {
@EnabledOnCommand("XADD")
void xAddShouldTrimStreamExactly() {
- RedisStreamCommands.XAddOptions xAddOptions = RedisStreamCommands.XAddOptions.maxlen(1);
+ XAddOptions xAddOptions = XAddOptions.trim(TrimOptions.maxLen(1));
actual.add(
connection.xAdd(StringRecord.of(Collections.singletonMap(KEY_2, VALUE_2)).withStreamKey(KEY_1), xAddOptions));
actual.add(
@@ -4045,7 +4049,7 @@ void xAddShouldTrimStreamExactly() {
@EnabledOnCommand("XADD")
void xAddShouldTrimStreamApprox() {
- RedisStreamCommands.XAddOptions xAddOptions = RedisStreamCommands.XAddOptions.maxlen(1).approximateTrimming(true);
+ XAddOptions xAddOptions = XAddOptions.trim(TrimOptions.maxLen(1).approximate());
actual.add(
connection.xAdd(StringRecord.of(Collections.singletonMap(KEY_2, VALUE_2)).withStreamKey(KEY_1), xAddOptions));
actual.add(
@@ -4060,6 +4064,240 @@ void xAddShouldTrimStreamApprox() {
assertThat((Long) results.get(3)).isBetween(1L, 3L);
}
+ @Test // GH-3232
+ @EnabledOnCommand("XADD")
+ void xAddShouldTrimStreamWithMinId() {
+
+ // Add initial records to get valid IDs
+ actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+ actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+ actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+
+ List