2323import io .opensergo .proto .transport .v1 .SubscribeOpType ;
2424import io .opensergo .proto .transport .v1 .SubscribeRequest ;
2525import io .opensergo .proto .transport .v1 .SubscribeRequestTarget ;
26- import io .opensergo .subscribe .OpenSergoConfigSubscriber ;
2726import io .opensergo .subscribe .SubscribeKey ;
2827import io .opensergo .subscribe .SubscribeRegistry ;
2928import io .opensergo .subscribe .SubscribedConfigCache ;
3029import io .opensergo .util .AssertUtils ;
3130import io .opensergo .util .IdentifierUtils ;
3231
32+ import java .util .Optional ;
33+ import java .util .concurrent .TimeUnit ;
3334import java .util .concurrent .atomic .AtomicInteger ;
3435
3536/**
@@ -46,6 +47,7 @@ public class OpenSergoClient implements AutoCloseable {
4647 private final SubscribeRegistry subscribeRegistry ;
4748
4849 private AtomicInteger reqId ;
50+ protected static volatile OpensergoClientStatus status ;
4951
5052 public OpenSergoClient (String host , int port ) {
5153 this .channel = ManagedChannelBuilder .forAddress (host , port )
@@ -56,17 +58,64 @@ public OpenSergoClient(String host, int port) {
5658 this .configCache = new SubscribedConfigCache ();
5759 this .subscribeRegistry = new SubscribeRegistry ();
5860 this .reqId = new AtomicInteger (0 );
61+ status = OpensergoClientStatus .INITIAL ;
62+ }
63+
64+ public void registerSubscribeInfo (OpensergoClientSubscribeInfo subscribeInfo ) {
65+ // Register subscriber to local.
66+ if (Optional .of (subscribeInfo .getSubscriberList ()).isPresent () && subscribeInfo .getSubscriberList ().size () > 0 ) {
67+ subscribeInfo .getSubscriberList ().forEach (subscriber -> {
68+ this .subscribeRegistry .registerSubscriber (subscribeInfo .getSubscribeKey (), subscriber );
69+ OpenSergoLogger .info ("OpenSergo subscribeinfo registered, subscribeKey={}, subscriber={}" , subscribeInfo .getSubscribeKey (), subscriber );
70+ });
71+ }
5972 }
6073
6174 public void start () throws Exception {
75+ OpenSergoLogger .info ("OpensergoClient is starting..." );
76+
77+ if (status == OpensergoClientStatus .INITIAL ) {
78+ OpenSergoLogger .info ("open keepavlive thread" );
79+ new Thread (this ::keepAlive ).start ();
80+ }
81+
82+ status = OpensergoClientStatus .STARTING ;
83+
6284 this .requestAndResponseWriter = transportGrpcStub .withWaitForReady ()
6385 .subscribeConfig (new OpenSergoSubscribeClientObserver (configCache , subscribeRegistry ));
86+
87+ OpenSergoLogger .info ("begin to subscribe config-data..." );
88+ this .subscribeRegistry .getSubscriberKeysAll ().forEach (subscribeKey -> {
89+ this .subscribeConfig (subscribeKey );
90+ });
91+
92+ OpenSergoLogger .info ("openSergoClient is started" );
93+ status = OpensergoClientStatus .STARTED ;
94+ }
95+
96+ private void keepAlive () {
97+ try {
98+ if (status != OpensergoClientStatus .STARTING
99+ && status != OpensergoClientStatus .STARTED
100+ && status != OpensergoClientStatus .SHUTDOWN ) {
101+ OpenSergoLogger .info ("try to restart openSergoClient..." );
102+ this .start ();
103+ }
104+ Thread .sleep (TimeUnit .SECONDS .toMillis (10 ));
105+ if ( status != OpensergoClientStatus .SHUTDOWN ) {
106+ keepAlive ();
107+ }
108+ } catch (Exception e ) {
109+ e .printStackTrace ();
110+ }
64111 }
65112
66113 @ Override
67114 public void close () throws Exception {
68115 requestAndResponseWriter .onCompleted ();
69116
117+ status = OpensergoClientStatus .SHUTDOWN ;
118+
70119 // gracefully drain the requests, then close the connection
71120 channel .shutdown ();
72121 }
@@ -77,8 +126,8 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
77126 AssertUtils .assertNotNull (subscribeKey .getKind (), "kind cannot be null" );
78127
79128 if (requestAndResponseWriter == null ) {
80- // TODO: return status that indicates not ready
81- throw new IllegalStateException ( "gRPC stream is not ready" ) ;
129+ OpenSergoLogger . error ( "Fatal error occurred on OpenSergo gRPC ClientObserver" , new IllegalStateException ( "gRPC stream is not ready" ));
130+ status = OpensergoClientStatus . INTERRUPTED ;
82131 }
83132 SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
84133 .setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
@@ -97,17 +146,13 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
97146 }
98147
99148 public boolean subscribeConfig (SubscribeKey subscribeKey ) {
100- return subscribeConfig (subscribeKey , null );
101- }
102-
103- public boolean subscribeConfig (SubscribeKey subscribeKey , OpenSergoConfigSubscriber subscriber ) {
104149 AssertUtils .assertNotNull (subscribeKey , "subscribeKey cannot be null" );
105150 AssertUtils .assertNotNull (subscribeKey .getApp (), "app cannot be null" );
106151 AssertUtils .assertNotNull (subscribeKey .getKind (), "kind cannot be null" );
107152
108153 if (requestAndResponseWriter == null ) {
109- // TODO: return status that indicates not ready
110- throw new IllegalStateException ( "gRPC stream is not ready" ) ;
154+ OpenSergoLogger . error ( "Fatal error occurred on OpenSergo gRPC ClientObserver" , new IllegalStateException ( "gRPC stream is not ready" ));
155+ status = OpensergoClientStatus . INTERRUPTED ;
111156 }
112157 SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
113158 .setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
@@ -121,13 +166,6 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
121166 // Send SubscribeRequest
122167 requestAndResponseWriter .onNext (request );
123168
124- // Register subscriber to local.
125- if (subscriber != null ) {
126- subscribeRegistry .registerSubscriber (subscribeKey , subscriber );
127- OpenSergoLogger .info ("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}" ,
128- subscribeKey , subscriber );
129- }
130-
131169 return true ;
132170 }
133171
0 commit comments