1717import io .dapr .utils .TypeRef ;
1818import reactor .core .publisher .Mono ;
1919
20+ import java .time .Duration ;
21+ import java .time .Instant ;
2022import java .util .ArrayList ;
23+ import java .util .Date ;
2124import java .util .List ;
2225import java .util .Map ;
2326import java .util .NoSuchElementException ;
@@ -66,12 +69,13 @@ public class ActorStateManager {
6669 /**
6770 * Adds a given key/value to the Actor's state store's cache.
6871 *
69- * @param stateName Name of the state being added.
70- * @param value Value to be added.
71- * @param <T> Type of the object being added.
72+ * @param stateName Name of the state being added.
73+ * @param value Value to be added.
74+ * @param expiration State's expiration.
75+ * @param <T> Type of the object being added.
7276 * @return Asynchronous void operation.
7377 */
74- public <T > Mono <Void > add (String stateName , T value ) {
78+ public <T > Mono <Void > add (String stateName , T value , Instant expiration ) {
7579 return Mono .fromSupplier (() -> {
7680 if (stateName == null ) {
7781 throw new IllegalArgumentException ("State's name cannot be null." );
@@ -84,7 +88,8 @@ public <T> Mono<Void> add(String stateName, T value) {
8488 StateChangeMetadata metadata = this .stateChangeTracker .get (stateName );
8589
8690 if (metadata .kind == ActorStateChangeKind .REMOVE ) {
87- this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .UPDATE , value ));
91+ this .stateChangeTracker .put (
92+ stateName , new StateChangeMetadata (ActorStateChangeKind .UPDATE , value , expiration ));
8893 return true ;
8994 }
9095
@@ -95,7 +100,8 @@ public <T> Mono<Void> add(String stateName, T value) {
95100 throw new IllegalStateException ("Duplicate state: " + stateName );
96101 }
97102
98- this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .ADD , value ));
103+ this .stateChangeTracker .put (
104+ stateName , new StateChangeMetadata (ActorStateChangeKind .ADD , value , expiration ));
99105 return true ;
100106 }))
101107 .then ();
@@ -130,6 +136,10 @@ public <T> Mono<T> get(String stateName, TypeRef<T> type) {
130136 if (this .stateChangeTracker .containsKey (stateName )) {
131137 StateChangeMetadata metadata = this .stateChangeTracker .get (stateName );
132138
139+ if (metadata .isExpired ()) {
140+ throw new NoSuchElementException ("State is expired: " + stateName );
141+ }
142+
133143 if (metadata .kind == ActorStateChangeKind .REMOVE ) {
134144 throw new NoSuchElementException ("State is marked for removal: " + stateName );
135145 }
@@ -142,20 +152,37 @@ public <T> Mono<T> get(String stateName, TypeRef<T> type) {
142152 this .stateProvider .load (this .actorTypeName , this .actorId , stateName , type )
143153 .switchIfEmpty (Mono .error (new NoSuchElementException ("State not found: " + stateName )))
144154 .map (v -> {
145- this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .NONE , v ));
146- return (T ) v ;
155+ this .stateChangeTracker .put (
156+ stateName , new StateChangeMetadata (ActorStateChangeKind .NONE , v .getValue (), v .getExpiration ()));
157+ return (T ) v .getValue ();
147158 }));
148159 }
149160
150161 /**
151162 * Updates a given key/value pair in the state store's cache.
163+ * Use the variation that takes in an TTL instead.
152164 *
153165 * @param stateName Name of the state being updated.
154166 * @param value Value to be set for given state.
155167 * @param <T> Type of the value being set.
156168 * @return Asynchronous void result.
157169 */
170+ @ Deprecated
158171 public <T > Mono <Void > set (String stateName , T value ) {
172+ return this .set (stateName , value , Duration .ZERO );
173+ }
174+
175+ /**
176+ * Updates a given key/value pair in the state store's cache.
177+ * Using TTL is highly recommended to avoid state to be left in the state store forever.
178+ *
179+ * @param stateName Name of the state being updated.
180+ * @param value Value to be set for given state.
181+ * @param ttl Time to live.
182+ * @param <T> Type of the value being set.
183+ * @return Asynchronous void result.
184+ */
185+ public <T > Mono <Void > set (String stateName , T value , Duration ttl ) {
159186 return Mono .fromSupplier (() -> {
160187 if (stateName == null ) {
161188 throw new IllegalArgumentException ("State's name cannot be null." );
@@ -165,20 +192,23 @@ public <T> Mono<Void> set(String stateName, T value) {
165192 StateChangeMetadata metadata = this .stateChangeTracker .get (stateName );
166193
167194 ActorStateChangeKind kind = metadata .kind ;
168- if ((kind == ActorStateChangeKind .NONE ) || (kind == ActorStateChangeKind .REMOVE )) {
195+ if (metadata . isExpired () || (kind == ActorStateChangeKind .NONE ) || (kind == ActorStateChangeKind .REMOVE )) {
169196 kind = ActorStateChangeKind .UPDATE ;
170197 }
171198
172- this .stateChangeTracker .put (stateName , new StateChangeMetadata (kind , value ));
199+ var expiration = buildExpiration (ttl );
200+ this .stateChangeTracker .put (stateName , new StateChangeMetadata (kind , value , expiration ));
173201 return true ;
174202 }
175203
176204 return false ;
177205 }).filter (x -> x )
178206 .switchIfEmpty (this .stateProvider .contains (this .actorTypeName , this .actorId , stateName )
179207 .map (exists -> {
208+ var expiration = buildExpiration (ttl );
180209 this .stateChangeTracker .put (stateName ,
181- new StateChangeMetadata (exists ? ActorStateChangeKind .UPDATE : ActorStateChangeKind .ADD , value ));
210+ new StateChangeMetadata (
211+ exists ? ActorStateChangeKind .UPDATE : ActorStateChangeKind .ADD , value , expiration ));
182212 return exists ;
183213 }))
184214 .then ();
@@ -208,7 +238,7 @@ public Mono<Void> remove(String stateName) {
208238 return true ;
209239 }
210240
211- this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .REMOVE , null ));
241+ this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .REMOVE , null , null ));
212242 return true ;
213243 }
214244
@@ -218,7 +248,7 @@ public Mono<Void> remove(String stateName) {
218248 .switchIfEmpty (this .stateProvider .contains (this .actorTypeName , this .actorId , stateName ))
219249 .filter (exists -> exists )
220250 .map (exists -> {
221- this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .REMOVE , null ));
251+ this .stateChangeTracker .put (stateName , new StateChangeMetadata (ActorStateChangeKind .REMOVE , null , null ));
222252 return exists ;
223253 })
224254 .then ();
@@ -239,7 +269,7 @@ public Mono<Boolean> contains(String stateName) {
239269 return this .stateChangeTracker .get (stateName );
240270 }
241271 ).map (metadata -> {
242- if (metadata .kind == ActorStateChangeKind .REMOVE ) {
272+ if (metadata .isExpired () || ( metadata . kind == ActorStateChangeKind .REMOVE ) ) {
243273 return Boolean .FALSE ;
244274 }
245275
@@ -264,7 +294,8 @@ public Mono<Void> save() {
264294 continue ;
265295 }
266296
267- changes .add (new ActorStateChange (tuple .getKey (), tuple .getValue ().value , tuple .getValue ().kind ));
297+ var actorState = new ActorState <>(tuple .getKey (), tuple .getValue ().value , tuple .getValue ().expiration );
298+ changes .add (new ActorStateChange (actorState , tuple .getValue ().kind ));
268299 }
269300
270301 return changes .toArray (new ActorStateChange [0 ]);
@@ -288,12 +319,17 @@ private void flush() {
288319 if (tuple .getValue ().kind == ActorStateChangeKind .REMOVE ) {
289320 this .stateChangeTracker .remove (stateName );
290321 } else {
291- StateChangeMetadata metadata = new StateChangeMetadata (ActorStateChangeKind .NONE , tuple .getValue ().value );
322+ StateChangeMetadata metadata =
323+ new StateChangeMetadata (ActorStateChangeKind .NONE , tuple .getValue ().value , tuple .getValue ().expiration );
292324 this .stateChangeTracker .put (stateName , metadata );
293325 }
294326 }
295327 }
296328
329+ private static Instant buildExpiration (Duration ttl ) {
330+ return (ttl != null ) && !ttl .isNegative () && !ttl .isZero () ? Instant .now ().plus (ttl ) : null ;
331+ }
332+
297333 /**
298334 * Internal class to represent value and change kind.
299335 */
@@ -309,15 +345,26 @@ private static final class StateChangeMetadata {
309345 */
310346 private final Object value ;
311347
348+ /**
349+ * Expiration.
350+ */
351+ private final Instant expiration ;
352+
312353 /**
313354 * Creates a new instance of the metadata on state change.
314355 *
315356 * @param kind Kind of change.
316357 * @param value Value to be set.
358+ * @param expiration When the value is set to expire (recommended but accepts null).
317359 */
318- private StateChangeMetadata (ActorStateChangeKind kind , Object value ) {
360+ private StateChangeMetadata (ActorStateChangeKind kind , Object value , Instant expiration ) {
319361 this .kind = kind ;
320362 this .value = value ;
363+ this .expiration = expiration ;
364+ }
365+
366+ private boolean isExpired () {
367+ return (this .expiration != null ) && Instant .now ().isAfter (this .expiration );
321368 }
322369 }
323370}
0 commit comments