6060 * @author Dengliming
6161 * @author Mark John Moreno
6262 * @author jinkshower
63+ * @author Jeonggyu Choi
6364 * @since 2.2
6465 */
6566public interface ReactiveStreamCommands {
@@ -743,6 +744,27 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?
743744 .map (CommandResponse ::getOutput );
744745 }
745746
747+ /**
748+ * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
749+ * {@literal consumer group} and over a given {@link Duration} of idle time.
750+ *
751+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
752+ * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
753+ * @param range the range of messages ids to search within. Must not be {@literal null}.
754+ * @param count limit the number of results. Must not be {@literal null}.
755+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
756+ * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
757+ * transaction.
758+ * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
759+ * @since 3.5
760+ */
761+ default Mono <PendingMessages > xPending (ByteBuffer key , String groupName , Range <?> range , Long count ,
762+ Duration minIdleTime ) {
763+ return xPending (
764+ Mono .just (PendingRecordsCommand .pending (key , groupName ).range (range , count ).minIdleTime (minIdleTime ))).next ()
765+ .map (CommandResponse ::getOutput );
766+ }
767+
746768 /**
747769 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
748770 * {@link Consumer} within a {@literal consumer group}.
@@ -759,6 +781,24 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
759781 return xPending (key , consumer .getGroup (), consumer .getName (), range , count );
760782 }
761783
784+ /**
785+ * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
786+ * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
787+ *
788+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
789+ * @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
790+ * @param range the range of messages ids to search within. Must not be {@literal null}.
791+ * @param count limit the number of results. Must not be {@literal null}.
792+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
793+ * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
794+ * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
795+ * @since 3.5
796+ */
797+ default Mono <PendingMessages > xPending (ByteBuffer key , Consumer consumer , Range <?> range , Long count ,
798+ Duration minIdleTime ) {
799+ return xPending (key , consumer .getGroup (), consumer .getName (), range , count , minIdleTime );
800+ }
801+
762802 /**
763803 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
764804 * {@literal consumer} within a {@literal consumer group}.
@@ -779,6 +819,27 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
779819 .next ().map (CommandResponse ::getOutput );
780820 }
781821
822+ /**
823+ * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
824+ * {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
825+ *
826+ * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
827+ * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
828+ * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
829+ * @param range the range of messages ids to search within. Must not be {@literal null}.
830+ * @param count limit the number of results. Must not be {@literal null}.
831+ * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
832+ * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
833+ * when used in pipeline / transaction.
834+ * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
835+ * @since 3.5
836+ */
837+ default Mono <PendingMessages > xPending (ByteBuffer key , String groupName , String consumerName , Range <?> range ,
838+ Long count , Duration minIdleTime ) {
839+ return xPending (Mono .just (PendingRecordsCommand .pending (key , groupName ).consumer (consumerName ).range (range , count )
840+ .minIdleTime (minIdleTime ))).next ().map (CommandResponse ::getOutput );
841+ }
842+
782843 /**
783844 * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
784845 * options}.
@@ -794,24 +855,26 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
794855 * Value Object holding parameters for obtaining pending messages.
795856 *
796857 * @author Christoph Strobl
858+ * @author Jeonggyu Choi
797859 * @since 2.3
798860 */
799861 class PendingRecordsCommand extends KeyCommand {
800862
801863 private final String groupName ;
802- private final @ Nullable String consumerName ;
803- private final Range <?> range ;
804- private final @ Nullable Long count ;
864+ private final XPendingOptions options ;
805865
806866 private PendingRecordsCommand (@ Nullable ByteBuffer key , String groupName , @ Nullable String consumerName , Range <?> range ,
807- @ Nullable Long count ) {
867+ @ Nullable Long count , @ Nullable Duration minIdleTime ) {
868+
869+ this (key , groupName , XPendingOptions .range (range , count ).consumer (consumerName ).minIdleTime (minIdleTime ));
870+ }
871+
872+ private PendingRecordsCommand (ByteBuffer key , String groupName , XPendingOptions options ) {
808873
809874 super (key );
810875
811876 this .groupName = groupName ;
812- this .consumerName = consumerName ;
813- this .range = range ;
814- this .count = count ;
877+ this .options = options ;
815878 }
816879
817880 /**
@@ -822,7 +885,7 @@ private PendingRecordsCommand(@Nullable ByteBuffer key, String groupName, @Nulla
822885 * @return new instance of {@link PendingRecordsCommand}.
823886 */
824887 static PendingRecordsCommand pending (ByteBuffer key , String groupName ) {
825- return new PendingRecordsCommand (key , groupName , null , Range .unbounded (), null );
888+ return new PendingRecordsCommand (key , groupName , null , Range .unbounded (), null , null );
826889 }
827890
828891 /**
@@ -837,7 +900,7 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
837900 Assert .notNull (range , "Range must not be null" );
838901 Assert .isTrue (count > -1 , "Count must not be negative" );
839902
840- return new PendingRecordsCommand (getKey (), groupName , consumerName , range , count );
903+ return new PendingRecordsCommand (getKey (), groupName , XPendingOptions . range ( range , count ) );
841904 }
842905
843906 /**
@@ -847,7 +910,20 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
847910 * @return new instance of {@link PendingRecordsCommand}.
848911 */
849912 public PendingRecordsCommand consumer (String consumerName ) {
850- return new PendingRecordsCommand (getKey (), groupName , consumerName , range , count );
913+ return new PendingRecordsCommand (getKey (), groupName , XPendingOptions .unbounded ().consumer (consumerName ));
914+ }
915+
916+ /**
917+ * Append given minimum idle time.
918+ *
919+ * @param minIdleTime must not be {@literal null}.
920+ * @return new instance of {@link PendingRecordsCommand}.
921+ */
922+ public PendingRecordsCommand minIdleTime (Duration minIdleTime ) {
923+
924+ Assert .notNull (minIdleTime , "Idle must not be null" );
925+
926+ return new PendingRecordsCommand (getKey (), groupName , XPendingOptions .unbounded ().minIdleTime (minIdleTime ));
851927 }
852928
853929 public String getGroupName () {
@@ -858,35 +934,50 @@ public String getGroupName() {
858934 * @return can be {@literal null}.
859935 */
860936 public @ Nullable String getConsumerName () {
861- return consumerName ;
937+ return options . getConsumerName () ;
862938 }
863939
864940 /**
865941 * @return never {@literal null}.
866942 */
867943 public Range <?> getRange () {
868- return range ;
944+ return options . getRange () ;
869945 }
870946
871947 /**
872948 * @return can be {@literal null}.
873949 */
874950 public @ Nullable Long getCount () {
875- return count ;
951+ return options .getCount ();
952+ }
953+
954+ /**
955+ * @return can be {@literal null}.
956+ */
957+ @ Nullable
958+ public Duration getMinIdleTime () {
959+ return options .getMinIdleTime ();
876960 }
877961
878962 /**
879963 * @return {@literal true} if a consumer name is present.
880964 */
881965 public boolean hasConsumer () {
882- return StringUtils .hasText (consumerName );
966+ return StringUtils .hasText (options . getConsumerName () );
883967 }
884968
885969 /**
886970 * @return {@literal true} count is set.
887971 */
888972 public boolean isLimited () {
889- return count != null ;
973+ return options .getCount () != null ;
974+ }
975+
976+ /**
977+ * @return {@literal true} if idle is set.
978+ */
979+ public boolean hasIdle () {
980+ return options .getMinIdleTime () != null ;
890981 }
891982 }
892983
0 commit comments