@@ -36,7 +36,10 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
3636import selectLbConfigFromList = experimental . selectLbConfigFromList ;
3737import SubchannelInterface = experimental . SubchannelInterface ;
3838import BaseSubchannelWrapper = experimental . BaseSubchannelWrapper ;
39+ import UnavailablePicker = experimental . UnavailablePicker ;
3940import { Locality__Output } from "./generated/envoy/config/core/v3/Locality" ;
41+ import { ClusterConfig , XdsConfig } from "./xds-dependency-manager" ;
42+ import { CdsUpdate } from "./xds-resource-type/cluster-resource-type" ;
4043
4144const TRACER_NAME = 'xds_cluster_impl' ;
4245
@@ -53,59 +56,26 @@ export interface DropCategory {
5356 requests_per_million : number ;
5457}
5558
56- function validateDropCategory ( obj : any ) : DropCategory {
57- if ( ! ( 'category' in obj && typeof obj . category === 'string' ) ) {
58- throw new Error ( 'xds_cluster_impl config drop_categories entry must have a string field category' ) ;
59- }
60- if ( ! ( 'requests_per_million' in obj && typeof obj . requests_per_million === 'number' ) ) {
61- throw new Error ( 'xds_cluster_impl config drop_categories entry must have a number field requests_per_million' ) ;
62- }
63- return obj ;
64- }
65-
6659class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
67- private maxConcurrentRequests : number ;
6860 getLoadBalancerName ( ) : string {
6961 return TYPE_NAME ;
7062 }
7163 toJsonObject ( ) : object {
7264 const jsonObj : { [ key : string ] : any } = {
7365 cluster : this . cluster ,
74- drop_categories : this . dropCategories ,
7566 child_policy : [ this . childPolicy . toJsonObject ( ) ] ,
76- max_concurrent_requests : this . maxConcurrentRequests ,
77- eds_service_name : this . edsServiceName ,
78- lrs_load_reporting_server : this . lrsLoadReportingServer ,
7967 } ;
8068 return {
8169 [ TYPE_NAME ] : jsonObj
8270 } ;
8371 }
8472
85- constructor ( private cluster : string , private dropCategories : DropCategory [ ] , private childPolicy : TypedLoadBalancingConfig , private edsServiceName : string , private lrsLoadReportingServer ?: XdsServerConfig , maxConcurrentRequests ?: number ) {
86- this . maxConcurrentRequests = maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS ;
87- }
73+ constructor ( private cluster : string , private childPolicy : TypedLoadBalancingConfig ) { }
8874
8975 getCluster ( ) {
9076 return this . cluster ;
9177 }
9278
93- getEdsServiceName ( ) {
94- return this . edsServiceName ;
95- }
96-
97- getLrsLoadReportingServer ( ) {
98- return this . lrsLoadReportingServer ;
99- }
100-
101- getMaxConcurrentRequests ( ) {
102- return this . maxConcurrentRequests ;
103- }
104-
105- getDropCategories ( ) {
106- return this . dropCategories ;
107- }
108-
10979 getChildPolicy ( ) {
11080 return this . childPolicy ;
11181 }
@@ -114,27 +84,14 @@ class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
11484 if ( ! ( 'cluster' in obj && typeof obj . cluster === 'string' ) ) {
11585 throw new Error ( 'xds_cluster_impl config must have a string field cluster' ) ;
11686 }
117- if ( ! ( 'eds_service_name' in obj && typeof obj . eds_service_name === 'string' ) ) {
118- throw new Error ( 'xds_cluster_impl config must have a string field eds_service_name' ) ;
119- }
120- if ( 'max_concurrent_requests' in obj && ! ( obj . max_concurrent_requests === undefined || typeof obj . max_concurrent_requests === 'number' ) ) {
121- throw new Error ( 'xds_cluster_impl config max_concurrent_requests must be a number if provided' ) ;
122- }
123- if ( ! ( 'drop_categories' in obj && Array . isArray ( obj . drop_categories ) ) ) {
124- throw new Error ( 'xds_cluster_impl config must have an array field drop_categories' ) ;
125- }
12687 if ( ! ( 'child_policy' in obj && Array . isArray ( obj . child_policy ) ) ) {
12788 throw new Error ( 'xds_cluster_impl config must have an array field child_policy' ) ;
12889 }
12990 const childConfig = selectLbConfigFromList ( obj . child_policy ) ;
13091 if ( ! childConfig ) {
13192 throw new Error ( 'xds_cluster_impl config child_policy parsing failed' ) ;
13293 }
133- let lrsServer : XdsServerConfig | undefined = undefined ;
134- if ( obj . lrs_load_reporting_server ) {
135- lrsServer = validateXdsServerConfig ( obj . lrs_load_reporting_server )
136- }
137- return new XdsClusterImplLoadBalancingConfig ( obj . cluster , obj . drop_categories . map ( validateDropCategory ) , childConfig , obj . eds_service_name , lrsServer , obj . max_concurrent_requests ) ;
94+ return new XdsClusterImplLoadBalancingConfig ( obj . cluster , childConfig ) ;
13895 }
13996}
14097
@@ -252,11 +209,12 @@ class XdsClusterImplBalancer implements LoadBalancer {
252209 private latestConfig : XdsClusterImplLoadBalancingConfig | null = null ;
253210 private clusterDropStats : XdsClusterDropStats | null = null ;
254211 private xdsClient : XdsClient | null = null ;
212+ private latestClusterConfig : ClusterConfig | null = null ;
255213
256214 constructor ( private readonly channelControlHelper : ChannelControlHelper , credentials : ChannelCredentials , options : ChannelOptions ) {
257215 this . childBalancer = new ChildLoadBalancerHandler ( createChildChannelControlHelper ( channelControlHelper , {
258216 createSubchannel : ( subchannelAddress , subchannelArgs , credentialsOverride ) => {
259- if ( ! this . xdsClient || ! this . latestConfig || ! this . lastestEndpointList ) {
217+ if ( ! this . xdsClient || ! this . latestConfig || ! this . lastestEndpointList || ! this . latestClusterConfig ) {
260218 throw new Error ( 'xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated' ) ;
261219 }
262220 const wrapperChild = channelControlHelper . createSubchannel ( subchannelAddress , subchannelArgs , credentialsOverride ) ;
@@ -270,23 +228,23 @@ class XdsClusterImplBalancer implements LoadBalancer {
270228 trace ( 'Not reporting load for address ' + subchannelAddressToString ( subchannelAddress ) + ' because it has unknown locality.' ) ;
271229 return wrapperChild ;
272230 }
273- const lrsServer = this . latestConfig . getLrsLoadReportingServer ( ) ;
231+ const lrsServer = this . latestClusterConfig . cluster . lrsLoadReportingServer ;
274232 let statsObj : XdsClusterLocalityStats | null = null ;
275233 if ( lrsServer ) {
276234 statsObj = this . xdsClient . addClusterLocalityStats (
277235 lrsServer ,
278236 this . latestConfig . getCluster ( ) ,
279- this . latestConfig . getEdsServiceName ( ) ,
237+ this . latestClusterConfig . cluster . edsServiceName ?? '' ,
280238 locality
281239 ) ;
282240 }
283241 return new LocalitySubchannelWrapper ( wrapperChild , statsObj ) ;
284242 } ,
285243 updateState : ( connectivityState , originalPicker ) => {
286- if ( this . latestConfig === null ) {
244+ if ( this . latestConfig === null || this . latestClusterConfig === null || this . latestClusterConfig . children . type === 'aggregate' || ! this . latestClusterConfig . children . endpoints ) {
287245 channelControlHelper . updateState ( connectivityState , originalPicker ) ;
288246 } else {
289- const picker = new XdsClusterImplPicker ( originalPicker , getCallCounterMapKey ( this . latestConfig . getCluster ( ) , this . latestConfig . getEdsServiceName ( ) ) , this . latestConfig . getMaxConcurrentRequests ( ) , this . latestConfig . getDropCategories ( ) , this . clusterDropStats ) ;
247+ const picker = new XdsClusterImplPicker ( originalPicker , getCallCounterMapKey ( this . latestConfig . getCluster ( ) , this . latestClusterConfig . cluster . edsServiceName ) , this . latestClusterConfig . cluster . maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS , this . latestClusterConfig . children . endpoints . dropCategories , this . clusterDropStats ) ;
290248 channelControlHelper . updateState ( connectivityState , picker ) ;
291249 }
292250 }
@@ -297,15 +255,38 @@ class XdsClusterImplBalancer implements LoadBalancer {
297255 trace ( 'Discarding address list update with unrecognized config ' + JSON . stringify ( lbConfig . toJsonObject ( ) , undefined , 2 ) ) ;
298256 return ;
299257 }
300- trace ( 'Received update with config: ' + JSON . stringify ( lbConfig , undefined , 2 ) ) ;
258+ trace ( 'Received update with config: ' + JSON . stringify ( lbConfig . toJsonObject ( ) , undefined , 2 ) ) ;
259+ const xdsConfig = attributes . xdsConfig as XdsConfig ;
260+ const maybeClusterConfig = xdsConfig . clusters . get ( lbConfig . getCluster ( ) ) ;
261+ if ( ! maybeClusterConfig ) {
262+ trace ( 'Received update with no config for cluster ' + lbConfig . getCluster ( ) ) ;
263+ return ;
264+ }
265+ if ( ! maybeClusterConfig . success ) {
266+ this . latestClusterConfig = null ;
267+ this . childBalancer . destroy ( ) ;
268+ this . channelControlHelper . updateState ( connectivityState . TRANSIENT_FAILURE , new UnavailablePicker ( maybeClusterConfig . error ) ) ;
269+ return ;
270+ }
271+ const clusterConfig = maybeClusterConfig . value ;
272+ if ( clusterConfig . children . type === 'aggregate' ) {
273+ trace ( 'Received update for aggregate cluster ' + lbConfig . getCluster ( ) ) ;
274+ return ;
275+ }
276+ if ( ! clusterConfig . children . endpoints ) {
277+ this . childBalancer . destroy ( ) ;
278+ this . channelControlHelper . updateState ( connectivityState . TRANSIENT_FAILURE , new UnavailablePicker ( { details : clusterConfig . children . resolutionNote } ) ) ;
279+
280+ }
301281 this . lastestEndpointList = endpointList ;
302282 this . latestConfig = lbConfig ;
283+ this . latestClusterConfig = clusterConfig ;
303284 this . xdsClient = attributes . xdsClient as XdsClient ;
304- if ( lbConfig . getLrsLoadReportingServer ( ) ) {
285+ if ( clusterConfig . cluster . lrsLoadReportingServer ) {
305286 this . clusterDropStats = this . xdsClient . addClusterDropStats (
306- lbConfig . getLrsLoadReportingServer ( ) ! ,
287+ clusterConfig . cluster . lrsLoadReportingServer ,
307288 lbConfig . getCluster ( ) ,
308- lbConfig . getEdsServiceName ( ) ?? ''
289+ clusterConfig . cluster . edsServiceName ?? ''
309290 ) ;
310291 }
311292
0 commit comments