Skip to content

Commit 26c1c13

Browse files
rls: Control plane channel monitor state and back off handling (#12460)
At the end of back off time, instead of firing a Rls RPC, just update the RLS picker, and RLS connectivity state change from TRANSIENT_FAILURE to READY deactivate all active backoffs.
1 parent 48a4288 commit 26c1c13

File tree

5 files changed

+249
-87
lines changed

5 files changed

+249
-87
lines changed

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
5454
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
5555
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
56-
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
5756
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
5857
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
5958
import io.grpc.rls.LruCache.EvictionListener;
@@ -217,6 +216,35 @@ private CachingRlsLbClient(Builder builder) {
217216
rlsChannelBuilder.disableServiceConfigLookUp();
218217
}
219218
rlsChannel = rlsChannelBuilder.build();
219+
Runnable rlsServerConnectivityStateChangeHandler = new Runnable() {
220+
private boolean wasInTransientFailure;
221+
@Override
222+
public void run() {
223+
ConnectivityState currentState = rlsChannel.getState(false);
224+
if (currentState == ConnectivityState.TRANSIENT_FAILURE) {
225+
wasInTransientFailure = true;
226+
} else if (wasInTransientFailure && currentState == ConnectivityState.READY) {
227+
wasInTransientFailure = false;
228+
synchronized (lock) {
229+
boolean anyBackoffsCanceled = false;
230+
for (CacheEntry value : linkedHashLruCache.values()) {
231+
if (value instanceof BackoffCacheEntry) {
232+
if (((BackoffCacheEntry) value).scheduledFuture.cancel(false)) {
233+
anyBackoffsCanceled = true;
234+
}
235+
}
236+
}
237+
if (anyBackoffsCanceled) {
238+
// Cache updated. updateBalancingState() to reattempt picks
239+
helper.triggerPendingRpcProcessing();
240+
}
241+
}
242+
}
243+
rlsChannel.notifyWhenStateChanged(currentState, this);
244+
}
245+
};
246+
rlsChannel.notifyWhenStateChanged(
247+
ConnectivityState.IDLE, rlsServerConnectivityStateChangeHandler);
220248
rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
221249
childLbResolvedAddressFactory =
222250
checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
@@ -226,8 +254,7 @@ private CachingRlsLbClient(Builder builder) {
226254
refCountedChildPolicyWrapperFactory =
227255
new RefCountedChildPolicyWrapperFactory(
228256
lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
229-
childLbHelperProvider,
230-
new BackoffRefreshListener());
257+
childLbHelperProvider);
231258
// TODO(creamsoup) wait until lb is ready
232259
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
233260
if (defaultTarget != null && !defaultTarget.isEmpty()) {
@@ -347,12 +374,15 @@ final CachedRouteLookupResponse get(final RouteLookupRequestKey routeLookupReque
347374
synchronized (lock) {
348375
final CacheEntry cacheEntry;
349376
cacheEntry = linkedHashLruCache.read(routeLookupRequestKey);
350-
if (cacheEntry == null) {
377+
if (cacheEntry == null
378+
|| (cacheEntry instanceof BackoffCacheEntry
379+
&& !((BackoffCacheEntry) cacheEntry).isInBackoffPeriod())) {
351380
PendingCacheEntry pendingEntry = pendingCallCache.get(routeLookupRequestKey);
352381
if (pendingEntry != null) {
353382
return CachedRouteLookupResponse.pendingResponse(pendingEntry);
354383
}
355-
return asyncRlsCall(routeLookupRequestKey, /* backoffPolicy= */ null,
384+
return asyncRlsCall(routeLookupRequestKey, cacheEntry instanceof BackoffCacheEntry
385+
? ((BackoffCacheEntry) cacheEntry).backoffPolicy : null,
356386
RouteLookupRequest.Reason.REASON_MISS);
357387
}
358388

@@ -447,7 +477,8 @@ private BackoffCacheEntry createBackOffEntry(RouteLookupRequestKey routeLookupRe
447477
ChannelLogLevel.DEBUG,
448478
"[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}",
449479
routeLookupRequestKey, status, delayNanos);
450-
BackoffCacheEntry entry = new BackoffCacheEntry(routeLookupRequestKey, status, backoffPolicy);
480+
BackoffCacheEntry entry = new BackoffCacheEntry(routeLookupRequestKey, status, backoffPolicy,
481+
ticker.read() + delayNanos * 2);
451482
// Lock is held, so the task can't execute before the assignment
452483
entry.scheduledFuture = scheduledExecutorService.schedule(
453484
() -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
@@ -462,11 +493,8 @@ private void refreshBackoffEntry(BackoffCacheEntry entry) {
462493
// Future was previously cancelled
463494
return;
464495
}
465-
logger.log(ChannelLogLevel.DEBUG,
466-
"[RLS Entry {0}] Calling RLS for transition to pending", entry.routeLookupRequestKey);
467-
linkedHashLruCache.invalidate(entry.routeLookupRequestKey);
468-
asyncRlsCall(entry.routeLookupRequestKey, entry.backoffPolicy,
469-
RouteLookupRequest.Reason.REASON_MISS);
496+
// Cache updated. updateBalancingState() to reattempt picks
497+
helper.triggerPendingRpcProcessing();
470498
}
471499
}
472500

@@ -773,13 +801,15 @@ private static final class BackoffCacheEntry extends CacheEntry {
773801

774802
private final Status status;
775803
private final BackoffPolicy backoffPolicy;
804+
private final long expiryTimeNanos;
776805
private Future<?> scheduledFuture;
777806

778807
BackoffCacheEntry(RouteLookupRequestKey routeLookupRequestKey, Status status,
779-
BackoffPolicy backoffPolicy) {
808+
BackoffPolicy backoffPolicy, long expiryTimeNanos) {
780809
super(routeLookupRequestKey);
781810
this.status = checkNotNull(status, "status");
782811
this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
812+
this.expiryTimeNanos = expiryTimeNanos;
783813
}
784814

785815
Status getStatus() {
@@ -791,9 +821,13 @@ int getSizeBytes() {
791821
return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
792822
}
793823

824+
boolean isInBackoffPeriod() {
825+
return !scheduledFuture.isDone();
826+
}
827+
794828
@Override
795-
boolean isExpired(long now) {
796-
return scheduledFuture.isDone();
829+
boolean isExpired(long nowNanos) {
830+
return nowNanos > expiryTimeNanos;
797831
}
798832

799833
@Override
@@ -956,32 +990,6 @@ public CacheEntry cacheAndClean(RouteLookupRequestKey key, CacheEntry value) {
956990
}
957991
}
958992

959-
/**
960-
* LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
961-
* ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
962-
*/
963-
private final class BackoffRefreshListener implements ChildLbStatusListener {
964-
965-
@Nullable
966-
private ConnectivityState prevState = null;
967-
968-
@Override
969-
public void onStatusChanged(ConnectivityState newState) {
970-
if (prevState == ConnectivityState.TRANSIENT_FAILURE
971-
&& newState == ConnectivityState.READY) {
972-
logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
973-
synchronized (lock) {
974-
for (CacheEntry value : linkedHashLruCache.values()) {
975-
if (value instanceof BackoffCacheEntry) {
976-
refreshBackoffEntry((BackoffCacheEntry) value);
977-
}
978-
}
979-
}
980-
}
981-
prevState = newState;
982-
}
983-
}
984-
985993
/** A header will be added when RLS server respond with additional header data. */
986994
@VisibleForTesting
987995
static final Metadata.Key<String> RLS_DATA_KEY =

rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -210,20 +210,17 @@ static final class RefCountedChildPolicyWrapperFactory {
210210
new HashMap<>();
211211

212212
private final ChildLoadBalancerHelperProvider childLbHelperProvider;
213-
private final ChildLbStatusListener childLbStatusListener;
214213
private final ChildLoadBalancingPolicy childPolicy;
215214
private ResolvedAddressFactory childLbResolvedAddressFactory;
216215

217216
public RefCountedChildPolicyWrapperFactory(
218217
ChildLoadBalancingPolicy childPolicy,
219218
ResolvedAddressFactory childLbResolvedAddressFactory,
220-
ChildLoadBalancerHelperProvider childLbHelperProvider,
221-
ChildLbStatusListener childLbStatusListener) {
219+
ChildLoadBalancerHelperProvider childLbHelperProvider) {
222220
this.childPolicy = checkNotNull(childPolicy, "childPolicy");
223221
this.childLbResolvedAddressFactory =
224222
checkNotNull(childLbResolvedAddressFactory, "childLbResolvedAddressFactory");
225223
this.childLbHelperProvider = checkNotNull(childLbHelperProvider, "childLbHelperProvider");
226-
this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener");
227224
}
228225

229226
void init() {
@@ -248,8 +245,7 @@ ChildPolicyWrapper createOrGet(String target) {
248245
RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);
249246
if (pooledChildPolicyWrapper == null) {
250247
ChildPolicyWrapper childPolicyWrapper = new ChildPolicyWrapper(
251-
target, childPolicy, childLbResolvedAddressFactory, childLbHelperProvider,
252-
childLbStatusListener);
248+
target, childPolicy, childLbResolvedAddressFactory, childLbHelperProvider);
253249
pooledChildPolicyWrapper = RefCountedChildPolicyWrapper.of(childPolicyWrapper);
254250
childPolicyMap.put(target, pooledChildPolicyWrapper);
255251
return pooledChildPolicyWrapper.getObject();
@@ -299,11 +295,9 @@ public ChildPolicyWrapper(
299295
String target,
300296
ChildLoadBalancingPolicy childPolicy,
301297
final ResolvedAddressFactory childLbResolvedAddressFactory,
302-
ChildLoadBalancerHelperProvider childLbHelperProvider,
303-
ChildLbStatusListener childLbStatusListener) {
298+
ChildLoadBalancerHelperProvider childLbHelperProvider) {
304299
this.target = target;
305-
this.helper =
306-
new ChildPolicyReportingHelper(childLbHelperProvider, childLbStatusListener);
300+
this.helper = new ChildPolicyReportingHelper(childLbHelperProvider);
307301
LoadBalancerProvider lbProvider = childPolicy.getEffectiveLbProvider();
308302
final ConfigOrError lbConfig =
309303
lbProvider
@@ -386,14 +380,11 @@ public String toString() {
386380
final class ChildPolicyReportingHelper extends ForwardingLoadBalancerHelper {
387381

388382
private final ChildLoadBalancerHelper delegate;
389-
private final ChildLbStatusListener listener;
390383

391384
ChildPolicyReportingHelper(
392-
ChildLoadBalancerHelperProvider childHelperProvider,
393-
ChildLbStatusListener listener) {
385+
ChildLoadBalancerHelperProvider childHelperProvider) {
394386
checkNotNull(childHelperProvider, "childHelperProvider");
395387
this.delegate = childHelperProvider.forTarget(getTarget());
396-
this.listener = checkNotNull(listener, "listener");
397388
}
398389

399390
@Override
@@ -406,18 +397,10 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne
406397
picker = newPicker;
407398
state = newState;
408399
super.updateBalancingState(newState, newPicker);
409-
listener.onStatusChanged(newState);
410400
}
411401
}
412402
}
413403

414-
/** Listener for child lb status change events. */
415-
interface ChildLbStatusListener {
416-
417-
/** Notifies when child lb status changes. */
418-
void onStatusChanged(ConnectivityState newState);
419-
}
420-
421404
private static final class RefCountedChildPolicyWrapper
422405
implements ObjectPool<ChildPolicyWrapper> {
423406

0 commit comments

Comments
 (0)