2121import com .google .protobuf .Any ;
2222import io .grpc .stub .ClientCallStreamObserver ;
2323import io .grpc .stub .ClientResponseObserver ;
24+ import io .opensergo .log .OpenSergoLogger ;
25+ import io .opensergo .subscribe .LocalDataNotifyResult ;
26+ import io .opensergo .subscribe .SubscribedData ;
27+ import io .opensergo .proto .transport .v1 .DataWithVersion ;
2428import io .opensergo .proto .transport .v1 .Status ;
2529import io .opensergo .proto .transport .v1 .SubscribeRequest ;
2630import io .opensergo .proto .transport .v1 .SubscribeResponse ;
2731import io .opensergo .subscribe .OpenSergoConfigSubscriber ;
2832import io .opensergo .subscribe .SubscribeKey ;
2933import io .opensergo .subscribe .SubscribeRegistry ;
3034import io .opensergo .subscribe .SubscribedConfigCache ;
35+ import io .opensergo .util .StringUtils ;
3136
3237/**
3338 * @author Eric Zhao
@@ -50,19 +55,24 @@ public void beforeStart(ClientCallStreamObserver<SubscribeRequest> requestStream
5055 this .requestStream = requestStream ;
5156 }
5257
53- public List <Object > notifyDataChange (String namespace , String appName , ConfigKindMetadata metadata ,
54- List <Any > rawDataList ) throws Exception {
55- SubscribeKey subscribeKey = new SubscribeKey (namespace , appName , metadata .getKind ());
58+ private LocalDataNotifyResult notifyDataChange (SubscribeKey subscribeKey , DataWithVersion dataWithVersion )
59+ throws Exception {
60+ long receivedVersion = dataWithVersion .getVersion ();
61+ SubscribedData cachedData = configCache .getDataFor (subscribeKey );
62+ if (cachedData != null && cachedData .getVersion () > receivedVersion ) {
63+ // The upcoming data is out-dated, so we'll not resolve the push request.
64+ return new LocalDataNotifyResult ().setCode (OpenSergoTransportConstants .CODE_ERROR_VERSION_OUTDATED );
65+ }
5666
5767 // Decode actual data from the raw "Any" data.
58- List <Object > dataList = decodeActualData (metadata . getKindName (), rawDataList );
68+ List <Object > dataList = decodeActualData (subscribeKey . getKind (). getKindName (), dataWithVersion . getDataList () );
5969 // Update to local config cache.
60- configCache .updateData (subscribeKey , dataList );
70+ configCache .updateData (subscribeKey , dataList , receivedVersion );
6171
6272 List <OpenSergoConfigSubscriber > subscribers = subscribeRegistry .getSubscribersOf (subscribeKey );
6373 if (subscribers == null || subscribers .isEmpty ()) {
6474 // no-subscriber is acceptable (just for cache-and-pull mode)
65- return dataList ;
75+ return LocalDataNotifyResult . withSuccess ( dataList ) ;
6676 }
6777
6878 List <Throwable > notifyErrors = new ArrayList <>();
@@ -71,17 +81,34 @@ public List<Object> notifyDataChange(String namespace, String appName, ConfigKin
7181 try {
7282 subscriber .onConfigUpdate (subscribeKey , dataList );
7383 } catch (Throwable t ) {
84+ OpenSergoLogger .error ("Failed to notify OpenSergo config change event, subscribeKey={}, subscriber={}" ,
85+ subscribeKey , subscriber );
7486 notifyErrors .add (t );
7587 }
7688 }
7789
78- // TODO: handle all errors and propagate to caller
79-
80- return dataList ;
90+ if (notifyErrors .isEmpty ()) {
91+ return LocalDataNotifyResult .withSuccess (dataList );
92+ } else {
93+ return new LocalDataNotifyResult ().setCode (OpenSergoTransportConstants .CODE_ERROR_SUBSCRIBE_HANDLER_ERROR )
94+ .setDecodedData (dataList ).setNotifyErrors (notifyErrors );
95+ }
8196 }
8297
8398 @ Override
8499 public void onNext (SubscribeResponse pushCommand ) {
100+ if (!StringUtils .isEmpty (pushCommand .getAck ())) {
101+ // This indicates a response
102+ int code = pushCommand .getStatus ().getCode ();
103+ if (code == OpenSergoTransportConstants .CODE_SUCCESS ) {
104+ return ;
105+ }
106+ if (code >= 4000 && code < 4100 ) {
107+ OpenSergoLogger .warn ("Warn: req failed, command={}" , pushCommand );
108+ // TODO: handle me
109+ return ;
110+ }
111+ }
85112 // server-push command received.
86113 String kindName = pushCommand .getKind ();
87114
@@ -91,25 +118,48 @@ public void onNext(SubscribeResponse pushCommand) {
91118 throw new IllegalArgumentException ("unrecognized config kind: " + kindName );
92119 }
93120
121+ SubscribeKey subscribeKey = new SubscribeKey (pushCommand .getNamespace (), pushCommand .getApp (),
122+ kindMetadata .getKind ());
94123 // Decode the actual data and notify to upstream subscribers.
95- List <Object > dataList = notifyDataChange (pushCommand .getNamespace (), pushCommand .getApp (), kindMetadata ,
96- pushCommand .getDataList ());
97- // TODO: handle partial-success (i.e. the data has been updated to cache, but error occurred in subscribers)
124+ LocalDataNotifyResult localResult = notifyDataChange (subscribeKey , pushCommand .getDataWithVersion ());
98125
99- // TODO: track versionInfo and ackInfo
126+ // TODO: handle partial-success (i.e. the data has been updated to cache, but error occurred in subscribers)
127+ Status status ;
128+ switch (localResult .getCode ()) {
129+ case OpenSergoTransportConstants .CODE_SUCCESS :
130+ status = Status .newBuilder ().setCode (OpenSergoTransportConstants .CODE_SUCCESS ).build ();
131+ break ;
132+ case OpenSergoTransportConstants .CODE_ERROR_SUBSCRIBE_HANDLER_ERROR :
133+ StringBuilder message = new StringBuilder ();
134+ for (Throwable t : localResult .getNotifyErrors ()) {
135+ message .append (t .toString ()).append ('|' );
136+ }
137+ status = Status .newBuilder ().setMessage (message .toString ()).setCode (
138+ OpenSergoTransportConstants .CODE_SUCCESS ).build ();
139+ break ;
140+ case OpenSergoTransportConstants .CODE_ERROR_VERSION_OUTDATED :
141+ status = Status .newBuilder ().setCode (OpenSergoTransportConstants .CODE_ERROR_VERSION_OUTDATED )
142+ .setMessage ("outdated version" ).build ();
143+ break ;
144+ default :
145+ status = Status .newBuilder ().setCode (localResult .getCode ()).build ();
146+ break ;
147+ }
100148
101149 // ACK response
102- SubscribeRequest pushAckResponse = SubscribeRequest .newBuilder ()
103- .setStatus ( Status . newBuilder (). setCode ( 0 ). build () )
104- .build ();
150+ SubscribeRequest pushAckResponse = SubscribeRequest .newBuilder (). setStatus ( status )
151+ .setResponseAck ( OpenSergoTransportConstants . ACK_FLAG )
152+ .setRequestId ( pushCommand . getResponseId ()). build ();
105153 requestStream .onNext (pushAckResponse );
106154 } catch (Exception ex ) {
107- // TODO: handle error (but not for ack error?)
155+ // TODO: improve the error handling logic
156+ OpenSergoLogger .error ("Handle push command failed" , ex );
108157
109158 // NACK response
110159 SubscribeRequest pushNackResponse = SubscribeRequest .newBuilder ()
111- .setStatus (Status .newBuilder ().setCode (-1 ).setMessage (ex .toString ()).build ())
112- .build ();
160+ .setStatus (Status .newBuilder ().setCode (OpenSergoTransportConstants .CODE_ERROR_UNKNOWN )
161+ .setMessage (ex .toString ()).build ())
162+ .setResponseAck (OpenSergoTransportConstants .NACK_FLAG ).build ();
113163 requestStream .onNext (pushNackResponse );
114164 }
115165 }
@@ -128,11 +178,11 @@ private List<Object> decodeActualData(String kind, List<Any> rawList) throws Exc
128178
129179 @ Override
130180 public void onError (Throwable t ) {
131- // TODO: handle error
181+ OpenSergoLogger . error ( "Fatal error occurred on OpenSergo gRPC ClientObserver" , t );
132182 }
133183
134184 @ Override
135185 public void onCompleted () {
136-
186+ OpenSergoLogger . info ( "OpenSergoSubscribeClientObserver onCompleted" );
137187 }
138188}
0 commit comments