1818import io .grpc .ManagedChannel ;
1919import io .grpc .ManagedChannelBuilder ;
2020import io .grpc .stub .StreamObserver ;
21+ import io .opensergo .log .OpenSergoLogger ;
2122import io .opensergo .proto .transport .v1 .OpenSergoUniversalTransportServiceGrpc ;
2223import io .opensergo .proto .transport .v1 .SubscribeOpType ;
2324import io .opensergo .proto .transport .v1 .SubscribeRequest ;
2728import io .opensergo .subscribe .SubscribeRegistry ;
2829import io .opensergo .subscribe .SubscribedConfigCache ;
2930import io .opensergo .util .AssertUtils ;
31+ import io .opensergo .util .IdentifierUtils ;
32+
33+ import java .util .concurrent .atomic .AtomicInteger ;
3034
3135/**
3236 * @author Eric Zhao
@@ -41,6 +45,8 @@ public class OpenSergoClient implements AutoCloseable {
4145 private final SubscribedConfigCache configCache ;
4246 private final SubscribeRegistry subscribeRegistry ;
4347
48+ private AtomicInteger reqId ;
49+
4450 public OpenSergoClient (String host , int port ) {
4551 this .channel = ManagedChannelBuilder .forAddress (host , port )
4652 // TODO: support TLS
@@ -49,6 +55,7 @@ public OpenSergoClient(String host, int port) {
4955 this .transportGrpcStub = OpenSergoUniversalTransportServiceGrpc .newStub (channel );
5056 this .configCache = new SubscribedConfigCache ();
5157 this .subscribeRegistry = new SubscribeRegistry ();
58+ this .reqId = new AtomicInteger (0 );
5259 }
5360
5461 public void start () throws Exception {
@@ -71,7 +78,7 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
7178
7279 if (requestAndResponseWriter == null ) {
7380 // TODO: return status that indicates not ready
74- return false ;
81+ throw new IllegalStateException ( "gRPC stream is not ready" ) ;
7582 }
7683 SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
7784 .setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
@@ -100,21 +107,25 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
100107
101108 if (requestAndResponseWriter == null ) {
102109 // TODO: return status that indicates not ready
103- return false ;
110+ throw new IllegalStateException ( "gRPC stream is not ready" ) ;
104111 }
105112 SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
106113 .setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
107114 .addKinds (subscribeKey .getKind ().getKindName ())
108115 .build ();
109116 SubscribeRequest request = SubscribeRequest .newBuilder ()
117+ .setRequestId (String .valueOf (reqId .incrementAndGet ()))
110118 .setTarget (subTarget ).setOpType (SubscribeOpType .SUBSCRIBE )
119+ .setIdentifier (IdentifierUtils .generateIdentifier (System .identityHashCode (this )))
111120 .build ();
112121 // Send SubscribeRequest
113122 requestAndResponseWriter .onNext (request );
114123
115124 // Register subscriber to local.
116125 if (subscriber != null ) {
117126 subscribeRegistry .registerSubscriber (subscribeKey , subscriber );
127+ OpenSergoLogger .info ("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}" ,
128+ subscribeKey , subscriber );
118129 }
119130
120131 return true ;
0 commit comments