3030import io .opensergo .util .AssertUtils ;
3131import io .opensergo .util .IdentifierUtils ;
3232
33+ import java .util .Optional ;
34+ import java .util .concurrent .TimeUnit ;
3335import java .util .concurrent .atomic .AtomicInteger ;
3436
3537/**
@@ -46,6 +48,7 @@ public class OpenSergoClient implements AutoCloseable {
4648 private final SubscribeRegistry subscribeRegistry ;
4749
4850 private AtomicInteger reqId ;
51+ protected volatile OpenSergoClientStatus status ;
4952
5053 public OpenSergoClient (String host , int port ) {
5154 this .channel = ManagedChannelBuilder .forAddress (host , port )
@@ -56,17 +59,72 @@ public OpenSergoClient(String host, int port) {
5659 this .configCache = new SubscribedConfigCache ();
5760 this .subscribeRegistry = new SubscribeRegistry ();
5861 this .reqId = new AtomicInteger (0 );
62+ status = OpenSergoClientStatus .INITIAL ;
63+ }
64+
65+ public void registerSubscribeInfo (OpenSergoClientSubscribeInfo subscribeInfo ) {
66+ // Register subscriber to local.
67+ if (Optional .of (subscribeInfo .getSubscriberList ()).isPresent () && subscribeInfo .getSubscriberList ().size () > 0 ) {
68+ subscribeInfo .getSubscriberList ().forEach (subscriber -> {
69+ this .subscribeRegistry .registerSubscriber (subscribeInfo .getSubscribeKey (), subscriber );
70+ OpenSergoLogger .info ("OpenSergo subscribeinfo registered, subscribeKey={}, subscriber={}" , subscribeInfo .getSubscribeKey (), subscriber );
71+
72+ if (requestAndResponseWriter != null && this .status == OpenSergoClientStatus .STARTED ) {
73+ this .subscribeConfig (subscribeInfo .getSubscribeKey ());
74+ }
75+ });
76+ }
5977 }
6078
6179 public void start () throws Exception {
80+ OpenSergoLogger .info ("OpensergoClient is starting..." );
81+
82+ if (status == OpenSergoClientStatus .INITIAL ) {
83+ OpenSergoLogger .info ("open keepavlive thread" );
84+ Thread keepAliveThread = new Thread (this ::keepAlive );
85+ keepAliveThread .setName ("thread-opensergo-keepalive-" + keepAliveThread .getId ());
86+ keepAliveThread .setDaemon (true );
87+ keepAliveThread .start ();
88+ }
89+
90+ status = OpenSergoClientStatus .STARTING ;
91+
6292 this .requestAndResponseWriter = transportGrpcStub .withWaitForReady ()
63- .subscribeConfig (new OpenSergoSubscribeClientObserver (configCache , subscribeRegistry ));
93+ .subscribeConfig (new OpenSergoSubscribeClientObserver (this ));
94+
95+ OpenSergoLogger .info ("begin to subscribe config-data..." );
96+ this .subscribeRegistry .getSubscriberKeysAll ().forEach (subscribeKey -> {
97+ this .subscribeConfig (subscribeKey );
98+ });
99+
100+ OpenSergoLogger .info ("openSergoClient is started" );
101+ status = OpenSergoClientStatus .STARTED ;
102+ }
103+
104+ private void keepAlive () {
105+ try {
106+ if (status != OpenSergoClientStatus .STARTING
107+ && status != OpenSergoClientStatus .STARTED
108+ && status != OpenSergoClientStatus .SHUTDOWN ) {
109+ OpenSergoLogger .info ("try to restart openSergoClient..." );
110+ this .start ();
111+ }
112+ Thread .sleep (TimeUnit .SECONDS .toMillis (10 ));
113+ if ( status != OpenSergoClientStatus .SHUTDOWN ) {
114+ keepAlive ();
115+ }
116+ } catch (Exception e ) {
117+ e .printStackTrace ();
118+ }
64119 }
65120
66121 @ Override
67122 public void close () throws Exception {
68123 requestAndResponseWriter .onCompleted ();
69124
125+ // stop the keepAliveThread
126+ status = OpenSergoClientStatus .SHUTDOWN ;
127+
70128 // gracefully drain the requests, then close the connection
71129 channel .shutdown ();
72130 }
@@ -77,8 +135,8 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
77135 AssertUtils .assertNotNull (subscribeKey .getKind (), "kind cannot be null" );
78136
79137 if (requestAndResponseWriter == null ) {
80- // TODO: return status that indicates not ready
81- throw new IllegalStateException ( "gRPC stream is not ready" ) ;
138+ OpenSergoLogger . error ( "Fatal error occurred on OpenSergo gRPC ClientObserver" , new IllegalStateException ( "gRPC stream is not ready" ));
139+ status = OpenSergoClientStatus . INTERRUPTED ;
82140 }
83141 SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
84142 .setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
@@ -106,8 +164,8 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
106164 AssertUtils .assertNotNull (subscribeKey .getKind (), "kind cannot be null" );
107165
108166 if (requestAndResponseWriter == null ) {
109- // TODO: return status that indicates not ready
110- throw new IllegalStateException ( "gRPC stream is not ready" ) ;
167+ OpenSergoLogger . error ( "Fatal error occurred on OpenSergo gRPC ClientObserver" , new IllegalStateException ( "gRPC stream is not ready" ));
168+ status = OpenSergoClientStatus . INTERRUPTED ;
111169 }
112170 SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
113171 .setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
@@ -121,18 +179,15 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
121179 // Send SubscribeRequest
122180 requestAndResponseWriter .onNext (request );
123181
124- // Register subscriber to local.
125- if (subscriber != null ) {
126- subscribeRegistry .registerSubscriber (subscribeKey , subscriber );
127- OpenSergoLogger .info ("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}" ,
128- subscribeKey , subscriber );
129- }
130-
131182 return true ;
132183 }
133184
134185 public SubscribedConfigCache getConfigCache () {
135186 return configCache ;
136187 }
137188
189+ public SubscribeRegistry getSubscribeRegistry () {
190+ return subscribeRegistry ;
191+ }
192+
138193}
0 commit comments