From 32b0cf9e733543e9962466e448f5e95646e632f5 Mon Sep 17 00:00:00 2001 From: Jiangnan Jia Date: Thu, 27 Oct 2022 20:22:26 +0800 Subject: [PATCH 1/3] force-pushed after main-branch was force-pushed. Co-authored-by: Eric Zhao Co-authored-by: Jiangnan Jia Signed-off-by: Jiangnan Jia --- src/main/java/io/opensergo/ConfigKind.java | 3 +- .../java/io/opensergo/OpenSergoClient.java | 79 ++++++++++++++++--- .../io/opensergo/OpenSergoClientStatus.java | 30 +++++++ .../OpenSergoClientSubscribeInfo.java | 62 +++++++++++++++ .../OpenSergoSubscribeClientObserver.java | 22 +++--- .../subscribe/SubscribeRegistry.java | 6 ++ 6 files changed, 177 insertions(+), 25 deletions(-) create mode 100644 src/main/java/io/opensergo/OpenSergoClientStatus.java create mode 100644 src/main/java/io/opensergo/OpenSergoClientSubscribeInfo.java diff --git a/src/main/java/io/opensergo/ConfigKind.java b/src/main/java/io/opensergo/ConfigKind.java index 72a8d01..31536cc 100644 --- a/src/main/java/io/opensergo/ConfigKind.java +++ b/src/main/java/io/opensergo/ConfigKind.java @@ -26,8 +26,7 @@ public enum ConfigKind { FAULT_TOLERANCE_RULE("fault-tolerance.opensergo.io/v1alpha1/FaultToleranceRule", "FaultToleranceRule"), RATE_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/RateLimitStrategy", "RateLimitStrategy"), THROTTLING_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ThrottlingStrategy", "ThrottlingStrategy"), - CONCURRENCY_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy", - "ConcurrencyLimitStrategy"), + CONCURRENCY_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy", "ConcurrencyLimitStrategy"), CIRCUIT_BREAKER_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/CircuitBreakerStrategy", "CircuitBreakerStrategy"); private final String kindName; diff --git a/src/main/java/io/opensergo/OpenSergoClient.java b/src/main/java/io/opensergo/OpenSergoClient.java index 0f3de80..594bb7d 100644 --- a/src/main/java/io/opensergo/OpenSergoClient.java +++ b/src/main/java/io/opensergo/OpenSergoClient.java @@ -30,6 +30,8 @@ import io.opensergo.util.AssertUtils; import io.opensergo.util.IdentifierUtils; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -46,6 +48,7 @@ public class OpenSergoClient implements AutoCloseable { private final SubscribeRegistry subscribeRegistry; private AtomicInteger reqId; + protected volatile OpenSergoClientStatus status; public OpenSergoClient(String host, int port) { this.channel = ManagedChannelBuilder.forAddress(host, port) @@ -56,17 +59,72 @@ public OpenSergoClient(String host, int port) { this.configCache = new SubscribedConfigCache(); this.subscribeRegistry = new SubscribeRegistry(); this.reqId = new AtomicInteger(0); + status = OpenSergoClientStatus.INITIAL; + } + + public void registerSubscribeInfo(OpenSergoClientSubscribeInfo subscribeInfo) { + // Register subscriber to local. + if (Optional.of(subscribeInfo.getSubscriberList()).isPresent() && subscribeInfo.getSubscriberList().size() > 0) { + subscribeInfo.getSubscriberList().forEach(subscriber -> { + this.subscribeRegistry.registerSubscriber(subscribeInfo.getSubscribeKey(), subscriber); + OpenSergoLogger.info("OpenSergo subscribeinfo registered, subscribeKey={}, subscriber={}", subscribeInfo.getSubscribeKey(), subscriber); + + if (requestAndResponseWriter != null && this.status == OpenSergoClientStatus.STARTED) { + this.subscribeConfig(subscribeInfo.getSubscribeKey()); + } + }); + } } public void start() throws Exception { + OpenSergoLogger.info("OpensergoClient is starting..."); + + if (status == OpenSergoClientStatus.INITIAL) { + OpenSergoLogger.info("open keepavlive thread"); + Thread keepAliveThread = new Thread(this::keepAlive); + keepAliveThread.setName("thread-opensergo-keepalive-" + keepAliveThread.getId()); + keepAliveThread.setDaemon(true); + keepAliveThread.start(); + } + + status = OpenSergoClientStatus.STARTING; + this.requestAndResponseWriter = transportGrpcStub.withWaitForReady() - .subscribeConfig(new OpenSergoSubscribeClientObserver(configCache, subscribeRegistry)); + .subscribeConfig(new OpenSergoSubscribeClientObserver(this)); + + OpenSergoLogger.info("begin to subscribe config-data..."); + this.subscribeRegistry.getSubscriberKeysAll().forEach(subscribeKey -> { + this.subscribeConfig(subscribeKey); + }); + + OpenSergoLogger.info("openSergoClient is started"); + status = OpenSergoClientStatus.STARTED; + } + + private void keepAlive() { + try { + if (status != OpenSergoClientStatus.STARTING + && status != OpenSergoClientStatus.STARTED + && status != OpenSergoClientStatus.SHUTDOWN) { + OpenSergoLogger.info("try to restart openSergoClient..."); + this.start(); + } + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + if( status != OpenSergoClientStatus.SHUTDOWN) { + keepAlive(); + } + } catch (Exception e) { + e.printStackTrace(); + } } @Override public void close() throws Exception { requestAndResponseWriter.onCompleted(); + // stop the keepAliveThread + status = OpenSergoClientStatus.SHUTDOWN; + // gracefully drain the requests, then close the connection channel.shutdown(); } @@ -77,8 +135,8 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) { AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null"); if (requestAndResponseWriter == null) { - // TODO: return status that indicates not ready - throw new IllegalStateException("gRPC stream is not ready"); + OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready")); + status = OpenSergoClientStatus.INTERRUPTED; } SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder() .setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp()) @@ -106,8 +164,8 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null"); if (requestAndResponseWriter == null) { - // TODO: return status that indicates not ready - throw new IllegalStateException("gRPC stream is not ready"); + OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready")); + status = OpenSergoClientStatus.INTERRUPTED; } SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder() .setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp()) @@ -121,13 +179,6 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri // Send SubscribeRequest requestAndResponseWriter.onNext(request); - // Register subscriber to local. - if (subscriber != null) { - subscribeRegistry.registerSubscriber(subscribeKey, subscriber); - OpenSergoLogger.info("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}", - subscribeKey, subscriber); - } - return true; } @@ -135,4 +186,8 @@ public SubscribedConfigCache getConfigCache() { return configCache; } + public SubscribeRegistry getSubscribeRegistry() { + return subscribeRegistry; + } + } diff --git a/src/main/java/io/opensergo/OpenSergoClientStatus.java b/src/main/java/io/opensergo/OpenSergoClientStatus.java new file mode 100644 index 0000000..b0c4fa0 --- /dev/null +++ b/src/main/java/io/opensergo/OpenSergoClientStatus.java @@ -0,0 +1,30 @@ +/* + * Copyright 2022, OpenSergo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.opensergo; + +/** + * @author Jiangnan Jia + **/ +public enum OpenSergoClientStatus { + + /* initial*/ + INITIAL, + STARTING, + STARTED, + INTERRUPTED, + SHUTDOWN + +} diff --git a/src/main/java/io/opensergo/OpenSergoClientSubscribeInfo.java b/src/main/java/io/opensergo/OpenSergoClientSubscribeInfo.java new file mode 100644 index 0000000..5447fd1 --- /dev/null +++ b/src/main/java/io/opensergo/OpenSergoClientSubscribeInfo.java @@ -0,0 +1,62 @@ +/* + * Copyright 2022, OpenSergo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.opensergo; + +import com.google.common.collect.Lists; +import io.opensergo.subscribe.OpenSergoConfigSubscriber; +import io.opensergo.subscribe.SubscribeKey; + +import java.util.List; + +/** + * @author Jiangnan Jia + **/ +public class OpenSergoClientSubscribeInfo { + + private SubscribeKey subscribeKey; + private List subscriberList; + + public OpenSergoClientSubscribeInfo(SubscribeKey subscribeKey) { + this.subscribeKey = subscribeKey; + this.subscriberList = Lists.newArrayList(); + } + public OpenSergoClientSubscribeInfo(SubscribeKey subscribeKey, List subscriberList) { + this.subscribeKey = subscribeKey; + this.subscriberList = subscriberList; + } + + public OpenSergoClientSubscribeInfo addSubscriber(OpenSergoConfigSubscriber subscriber) { + // TODO distinct the same OpenSergoConfigSubscriber + this.subscriberList.add(subscriber); + return this; + } + + public SubscribeKey getSubscribeKey() { + return subscribeKey; + } + + public List getSubscriberList() { + return subscriberList; + } + + @Override + public String toString() { + return "OpensergoClientSubscribeInfo{" + + "subscribeKey=" + subscribeKey + + ", subscriberList=" + subscriberList + + '}'; + } +} diff --git a/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java b/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java index 1ea1b8d..ce908aa 100644 --- a/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java +++ b/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java @@ -30,8 +30,6 @@ import io.opensergo.proto.transport.v1.SubscribeResponse; import io.opensergo.subscribe.OpenSergoConfigSubscriber; import io.opensergo.subscribe.SubscribeKey; -import io.opensergo.subscribe.SubscribeRegistry; -import io.opensergo.subscribe.SubscribedConfigCache; import io.opensergo.util.StringUtils; /** @@ -41,13 +39,10 @@ public class OpenSergoSubscribeClientObserver implements ClientResponseObserver< private ClientCallStreamObserver requestStream; - private final SubscribedConfigCache configCache; - private final SubscribeRegistry subscribeRegistry; + private OpenSergoClient openSergoClient; - public OpenSergoSubscribeClientObserver(SubscribedConfigCache configCache, - SubscribeRegistry subscribeRegistry) { - this.configCache = configCache; - this.subscribeRegistry = subscribeRegistry; + public OpenSergoSubscribeClientObserver(OpenSergoClient openSergoClient) { + this.openSergoClient = openSergoClient; } @Override @@ -58,7 +53,7 @@ public void beforeStart(ClientCallStreamObserver requestStream private LocalDataNotifyResult notifyDataChange(SubscribeKey subscribeKey, DataWithVersion dataWithVersion) throws Exception { long receivedVersion = dataWithVersion.getVersion(); - SubscribedData cachedData = configCache.getDataFor(subscribeKey); + SubscribedData cachedData = this.openSergoClient.getConfigCache().getDataFor(subscribeKey); if (cachedData != null && cachedData.getVersion() > receivedVersion) { // The upcoming data is out-dated, so we'll not resolve the push request. return new LocalDataNotifyResult().setCode(OpenSergoTransportConstants.CODE_ERROR_VERSION_OUTDATED); @@ -67,9 +62,9 @@ private LocalDataNotifyResult notifyDataChange(SubscribeKey subscribeKey, DataWi // Decode actual data from the raw "Any" data. List dataList = decodeActualData(subscribeKey.getKind().getKindName(), dataWithVersion.getDataList()); // Update to local config cache. - configCache.updateData(subscribeKey, dataList, receivedVersion); + this.openSergoClient.getConfigCache().updateData(subscribeKey, dataList, receivedVersion); - List subscribers = subscribeRegistry.getSubscribersOf(subscribeKey); + List subscribers = this.openSergoClient.getSubscribeRegistry().getSubscribersOf(subscribeKey); if (subscribers == null || subscribers.isEmpty()) { // no-subscriber is acceptable (just for cache-and-pull mode) return LocalDataNotifyResult.withSuccess(dataList); @@ -178,6 +173,11 @@ private List decodeActualData(String kind, List rawList) throws Exc @Override public void onError(Throwable t) { + // TODO add handles for different io.grpc.Status of Throwable from ClientCallStreamObserver + io.grpc.Status.Code errorCode = io.grpc.Status.fromThrowable(t).getCode(); + if(errorCode.equals(io.grpc.Status.UNAVAILABLE.getCode())) { + this.openSergoClient.status = OpenSergoClientStatus.INTERRUPTED; + } OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", t); } diff --git a/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java b/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java index ed66be0..ae124a8 100644 --- a/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java +++ b/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java @@ -16,6 +16,7 @@ package io.opensergo.subscribe; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -33,9 +34,14 @@ public void registerSubscriber(SubscribeKey key, OpenSergoConfigSubscriber subsc AssertUtils.assertNotNull(key, "subscribeKey cannot be null"); AssertUtils.assertNotNull(subscriber, "subscriber cannot be null"); List list = subscriberMap.computeIfAbsent(key, v -> new CopyOnWriteArrayList<>()); + // TODO distinct the same OpenSergoConfigSubscriber list.add(subscriber); } + public Set getSubscriberKeysAll() { + return subscriberMap.keySet(); + } + public List getSubscribersOf(SubscribeKey key) { if (key == null) { return null; From 10efa14da24df8e165e223dd765f761c13cfb362 Mon Sep 17 00:00:00 2001 From: Jiangnan Jia Date: Tue, 22 Nov 2022 15:24:48 +0800 Subject: [PATCH 2/3] remove recursive and add error handle Signed-off-by: Jiangnan Jia --- .../java/io/opensergo/OpenSergoClient.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/opensergo/OpenSergoClient.java b/src/main/java/io/opensergo/OpenSergoClient.java index 594bb7d..df04edb 100644 --- a/src/main/java/io/opensergo/OpenSergoClient.java +++ b/src/main/java/io/opensergo/OpenSergoClient.java @@ -102,19 +102,28 @@ public void start() throws Exception { } private void keepAlive() { - try { - if (status != OpenSergoClientStatus.STARTING - && status != OpenSergoClientStatus.STARTED - && status != OpenSergoClientStatus.SHUTDOWN) { - OpenSergoLogger.info("try to restart openSergoClient..."); - this.start(); + // TODO change to event-based design, instead of for-loop. + for (;;) { + if (status == OpenSergoClientStatus.SHUTDOWN) { + return; } - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - if( status != OpenSergoClientStatus.SHUTDOWN) { - keepAlive(); + + try { + if (status == OpenSergoClientStatus.INTERRUPTED) { + OpenSergoLogger.info("try to restart openSergoClient..."); + this.start(); + } + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + } catch (InterruptedException e) { + OpenSergoLogger.error(e.toString(), e); + } catch (Exception e) { + try { + this.close(); + } catch (Exception ex) { + status = OpenSergoClientStatus.SHUTDOWN; + } + OpenSergoLogger.error("close OpenSergoClient because of " + e, e); } - } catch (Exception e) { - e.printStackTrace(); } } From f7ff064e49cf77bd0803f17c1d86ec6600192ee5 Mon Sep 17 00:00:00 2001 From: Jiangnan Jia Date: Sat, 3 Dec 2022 22:04:20 +0800 Subject: [PATCH 3/3] Remove OpenSergoClientSubscribeInfo.java Signed-off-by: Jiangnan Jia --- .../java/io/opensergo/OpenSergoClient.java | 20 ++---- .../OpenSergoClientSubscribeInfo.java | 62 ------------------- 2 files changed, 6 insertions(+), 76 deletions(-) delete mode 100644 src/main/java/io/opensergo/OpenSergoClientSubscribeInfo.java diff --git a/src/main/java/io/opensergo/OpenSergoClient.java b/src/main/java/io/opensergo/OpenSergoClient.java index df04edb..8849835 100644 --- a/src/main/java/io/opensergo/OpenSergoClient.java +++ b/src/main/java/io/opensergo/OpenSergoClient.java @@ -62,20 +62,6 @@ public OpenSergoClient(String host, int port) { status = OpenSergoClientStatus.INITIAL; } - public void registerSubscribeInfo(OpenSergoClientSubscribeInfo subscribeInfo) { - // Register subscriber to local. - if (Optional.of(subscribeInfo.getSubscriberList()).isPresent() && subscribeInfo.getSubscriberList().size() > 0) { - subscribeInfo.getSubscriberList().forEach(subscriber -> { - this.subscribeRegistry.registerSubscriber(subscribeInfo.getSubscribeKey(), subscriber); - OpenSergoLogger.info("OpenSergo subscribeinfo registered, subscribeKey={}, subscriber={}", subscribeInfo.getSubscribeKey(), subscriber); - - if (requestAndResponseWriter != null && this.status == OpenSergoClientStatus.STARTED) { - this.subscribeConfig(subscribeInfo.getSubscribeKey()); - } - }); - } - } - public void start() throws Exception { OpenSergoLogger.info("OpensergoClient is starting..."); @@ -188,6 +174,12 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri // Send SubscribeRequest requestAndResponseWriter.onNext(request); + // Register subscriber to local. + if (subscriber != null) { + subscribeRegistry.registerSubscriber(subscribeKey, subscriber); + OpenSergoLogger.info("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}", subscribeKey, subscriber); + } + return true; } diff --git a/src/main/java/io/opensergo/OpenSergoClientSubscribeInfo.java b/src/main/java/io/opensergo/OpenSergoClientSubscribeInfo.java deleted file mode 100644 index 5447fd1..0000000 --- a/src/main/java/io/opensergo/OpenSergoClientSubscribeInfo.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2022, OpenSergo Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.opensergo; - -import com.google.common.collect.Lists; -import io.opensergo.subscribe.OpenSergoConfigSubscriber; -import io.opensergo.subscribe.SubscribeKey; - -import java.util.List; - -/** - * @author Jiangnan Jia - **/ -public class OpenSergoClientSubscribeInfo { - - private SubscribeKey subscribeKey; - private List subscriberList; - - public OpenSergoClientSubscribeInfo(SubscribeKey subscribeKey) { - this.subscribeKey = subscribeKey; - this.subscriberList = Lists.newArrayList(); - } - public OpenSergoClientSubscribeInfo(SubscribeKey subscribeKey, List subscriberList) { - this.subscribeKey = subscribeKey; - this.subscriberList = subscriberList; - } - - public OpenSergoClientSubscribeInfo addSubscriber(OpenSergoConfigSubscriber subscriber) { - // TODO distinct the same OpenSergoConfigSubscriber - this.subscriberList.add(subscriber); - return this; - } - - public SubscribeKey getSubscribeKey() { - return subscribeKey; - } - - public List getSubscriberList() { - return subscriberList; - } - - @Override - public String toString() { - return "OpensergoClientSubscribeInfo{" + - "subscribeKey=" + subscribeKey + - ", subscriberList=" + subscriberList + - '}'; - } -}