@@ -199,19 +199,20 @@ default Mono<Long> xAck(ByteBuffer key, String group, RecordId... recordIds) {
199199 class AddStreamRecord extends KeyCommand {
200200
201201 private final ByteBufferRecord record ;
202- private final @ Nullable Long maxlen ;
203202 private final boolean nomkstream ;
203+ private final @ Nullable Long maxlen ;
204204 private final boolean approximateTrimming ;
205-
205+ private final @ Nullable RecordId minId ;
206206
207207 private AddStreamRecord (ByteBufferRecord record , @ Nullable Long maxlen , boolean nomkstream ,
208- boolean approximateTrimming ) {
208+ boolean approximateTrimming , @ Nullable RecordId minId ) {
209209
210210 super (record .getStream ());
211211 this .record = record ;
212212 this .maxlen = maxlen ;
213213 this .nomkstream = nomkstream ;
214214 this .approximateTrimming = approximateTrimming ;
215+ this .minId = minId ;
215216 }
216217
217218 /**
@@ -224,7 +225,7 @@ public static AddStreamRecord of(ByteBufferRecord record) {
224225
225226 Assert .notNull (record , "Record must not be null!" );
226227
227- return new AddStreamRecord (record , null , false , false );
228+ return new AddStreamRecord (record , null , false , false , null );
228229 }
229230
230231 /**
@@ -237,7 +238,7 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
237238
238239 Assert .notNull (body , "Body must not be null!" );
239240
240- return new AddStreamRecord (StreamRecords .rawBuffer (body ), null , false , false );
241+ return new AddStreamRecord (StreamRecords .rawBuffer (body ), null , false , false , null );
241242 }
242243
243244 /**
@@ -247,53 +248,57 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
247248 * @return a new {@link ReactiveGeoCommands.GeoAddCommand} with {@literal key} applied.
248249 */
249250 public AddStreamRecord to (ByteBuffer key ) {
250- return new AddStreamRecord (record .withStreamKey (key ), maxlen , false , false );
251+ return new AddStreamRecord (record .withStreamKey (key ), maxlen , nomkstream , approximateTrimming , minId );
251252 }
252253
253254 /**
254- * Limit the size of the stream to the given maximum number of elements .
255+ * Disable creation of stream if it does not already exist .
255256 *
256257 * @return new instance of {@link AddStreamRecord}.
258+ * @since 2.6
257259 */
258- public AddStreamRecord maxlen ( long maxlen ) {
259- return new AddStreamRecord (record , maxlen , false , false );
260+ public AddStreamRecord makeNoStream ( ) {
261+ return new AddStreamRecord (record , maxlen , true , approximateTrimming , minId );
260262 }
261263
262264 /**
263265 * Disable creation of stream if it does not already exist.
264266 *
267+ * @param makeNoStream {@code true} to not create a stream if it does not already exist.
265268 * @return new instance of {@link AddStreamRecord}.
266269 * @since 2.6
267270 */
268- public AddStreamRecord makeNoStream () {
269- return new AddStreamRecord (record , maxlen , true , false );
271+ public AddStreamRecord makeNoStream (boolean makeNoStream ) {
272+ return new AddStreamRecord (record , maxlen , makeNoStream , approximateTrimming , minId );
270273 }
271274
272275 /**
273- * Disable creation of stream if it does not already exist .
276+ * Limit the size of the stream to the given maximum number of elements .
274277 *
275- * @param makeNoStream {@code true} to not create a stream if it does not already exist.
276278 * @return new instance of {@link AddStreamRecord}.
277- * @since 2.6
278279 */
279- public AddStreamRecord makeNoStream ( boolean makeNoStream ) {
280- return new AddStreamRecord (record , maxlen , makeNoStream , false );
280+ public AddStreamRecord maxlen ( long maxlen ) {
281+ return new AddStreamRecord (record , maxlen , nomkstream , approximateTrimming , minId );
281282 }
282283
283284 /**
284- * Apply efficient trimming for capped streams using the {@code ~} flag .
285+ * Apply {@code MINID} trimming strategy, that evicts entries with IDs lower than the one specified .
285286 *
287+ * @param minId the minimum record Id to retain.
286288 * @return new instance of {@link AddStreamRecord}.
289+ * @since 2.7
287290 */
288- public AddStreamRecord approximateTrimming ( boolean approximateTrimming ) {
289- return new AddStreamRecord (record , maxlen , nomkstream , approximateTrimming );
291+ public AddStreamRecord minId ( RecordId minId ) {
292+ return new AddStreamRecord (record , maxlen , nomkstream , approximateTrimming , minId );
290293 }
291294
292295 /**
293- * @return {@literal true} if {@literal approximateTrimming} is set.
296+ * Apply efficient trimming for capped streams using the {@code ~} flag.
297+ *
298+ * @return new instance of {@link AddStreamRecord}.
294299 */
295- public boolean isApproximateTrimming ( ) {
296- return approximateTrimming ;
300+ public AddStreamRecord approximateTrimming ( boolean approximateTrimming ) {
301+ return new AddStreamRecord ( record , maxlen , nomkstream , approximateTrimming , minId ) ;
297302 }
298303
299304 /**
@@ -307,6 +312,14 @@ public ByteBufferRecord getRecord() {
307312 return record ;
308313 }
309314
315+ /**
316+ * @return {@literal true} if {@literal NOMKSTREAM} is set.
317+ * @since 2.6
318+ */
319+ public boolean isNoMkStream () {
320+ return nomkstream ;
321+ }
322+
310323 /**
311324 * Limit the size of the stream to the given maximum number of elements.
312325 *
@@ -327,11 +340,28 @@ public boolean hasMaxlen() {
327340 }
328341
329342 /**
330- * @return {@literal true} if {@literal NOMKSTREAM } is set.
331- * @since 2.6
343+ * @return {@literal true} if {@literal approximateTrimming } is set.
344+ * @since 2.7
332345 */
333- public boolean isNoMkStream () {
334- return nomkstream ;
346+ public boolean isApproximateTrimming () {
347+ return approximateTrimming ;
348+ }
349+
350+ /**
351+ * @return the minimum record Id to retain during trimming.
352+ * @since 2.7
353+ */
354+ @ Nullable
355+ public RecordId getMinId () {
356+ return minId ;
357+ }
358+
359+ /**
360+ * @return {@literal true} if {@literal MINID} is set.
361+ * @since 2.7
362+ */
363+ public boolean hasMinId () {
364+ return minId != null ;
335365 }
336366 }
337367
@@ -1223,7 +1253,7 @@ public static GroupCommand deleteConsumer(Consumer consumer) {
12231253 }
12241254
12251255 public GroupCommand makeStream (boolean mkStream ) {
1226- return new GroupCommand (getKey (), action , groupName , consumerName , offset ,mkStream );
1256+ return new GroupCommand (getKey (), action , groupName , consumerName , offset , mkStream );
12271257 }
12281258
12291259 public GroupCommand at (ReadOffset offset ) {
@@ -1291,8 +1321,8 @@ default Mono<String> xGroupCreate(ByteBuffer key, String groupName, ReadOffset r
12911321 * @since 2.3
12921322 */
12931323 default Mono <String > xGroupCreate (ByteBuffer key , String groupName , ReadOffset readOffset , boolean mkStream ) {
1294- return xGroup (Mono .just (GroupCommand .createGroup (groupName ).forStream (key ).at (readOffset ).makeStream (mkStream ))). next ()
1295- .map (CommandResponse ::getOutput );
1324+ return xGroup (Mono .just (GroupCommand .createGroup (groupName ).forStream (key ).at (readOffset ).makeStream (mkStream )))
1325+ .next (). map (CommandResponse ::getOutput );
12961326 }
12971327
12981328 /**
0 commit comments