@@ -232,12 +232,36 @@ function getIPv6Addresses(): string[] {
232232 return ipv6Addresses ;
233233}
234234
235+ interface ConfiguredMetrics {
236+ qps ?: number ;
237+ applicationUtilization ?: number ;
238+ eps ?: number ;
239+ }
240+
241+ function createInBandMetricsInterceptor ( metrics : ConfiguredMetrics ) {
242+ return function inBandMetricsInterceptor ( methodDescriptor : grpc . ServerMethodDefinition < any , any > , call : grpc . ServerInterceptingCallInterface ) : grpc . ServerInterceptingCall {
243+ const metricsRecorder = call . getMetricsRecorder ( )
244+ if ( metrics . qps ) {
245+ metricsRecorder . recordQpsMetric ( metrics . qps ) ;
246+ }
247+ if ( metrics . applicationUtilization ) {
248+ metricsRecorder . recordApplicationUtilizationMetric ( metrics . applicationUtilization ) ;
249+ }
250+ if ( metrics . eps ) {
251+ metricsRecorder . recordEpsMetric ( metrics . eps ) ;
252+ }
253+ return new grpc . ServerInterceptingCall ( call ) ;
254+ }
255+ }
256+
235257async function main ( ) {
236258 const argv = yargs
237- . string ( [ 'port' , 'maintenance_port' , 'address_type' , 'secure_mode' ] )
259+ . string ( [ 'port' , 'maintenance_port' , 'address_type' , 'secure_mode' , 'metrics_mode' ] )
260+ . number ( [ 'qps' , 'application_utilization' , 'eps' ] )
238261 . demandOption ( [ 'port' ] )
239262 . default ( 'address_type' , 'IPV4_IPV6' )
240263 . default ( 'secure_mode' , 'false' )
264+ . default ( 'metrics_mode' , 'NONE' )
241265 . parse ( )
242266 console . log ( 'Starting xDS interop server. Args: ' , argv ) ;
243267 const healthImpl = new HealthImplementation ( { '' : 'NOT_SERVING' } ) ;
@@ -253,7 +277,28 @@ async function main() {
253277 }
254278 const reflection = new ReflectionService ( packageDefinition , {
255279 services : [ 'grpc.testing.TestService' ]
256- } )
280+ } ) ;
281+ let metricInterceptor : grpc . ServerInterceptor | null = null ;
282+ const metricsMode = argv . metrics_mode . toUpperCase ( ) ;
283+ let metricRecorder : grpc . ServerMetricRecorder | null = null ;
284+ if ( metricsMode === 'IN_BAND' ) {
285+ metricInterceptor = createInBandMetricsInterceptor ( {
286+ qps : argv . qps ,
287+ applicationUtilization : argv . application_utilization ,
288+ eps : argv . eps
289+ } ) ;
290+ } else if ( metricsMode === 'OUT_OF_BAND' ) {
291+ metricRecorder = new grpc . ServerMetricRecorder ( ) ;
292+ if ( argv . qps ) {
293+ metricRecorder . setQpsMetric ( argv . qps ) ;
294+ }
295+ if ( argv . application_utilization ) {
296+ metricRecorder . setApplicationUtilizationMetric ( argv . application_utilization ) ;
297+ }
298+ if ( argv . eps ) {
299+ metricRecorder . setEpsMetric ( argv . eps ) ;
300+ }
301+ }
257302 const addressType = argv . address_type . toUpperCase ( ) ;
258303 const secureMode = argv . secure_mode . toLowerCase ( ) == 'true' ;
259304 if ( secureMode ) {
@@ -266,19 +311,33 @@ async function main() {
266311 reflection . addToServer ( maintenanceServer ) ;
267312 grpc . addAdminServicesToServer ( maintenanceServer ) ;
268313
269- const server = new grpc_xds . XdsServer ( { interceptors : [ testInfoInterceptor ] } ) ;
314+ const interceptorList = [ testInfoInterceptor ] ;
315+ if ( metricInterceptor ) {
316+ interceptorList . push ( metricInterceptor ) ;
317+ }
318+ const server = new grpc_xds . XdsServer ( { interceptors : interceptorList } ) ;
270319 server . addService ( loadedProto . grpc . testing . TestService . service , testServiceHandler ) ;
320+ if ( metricRecorder ) {
321+ metricRecorder . addToServer ( server ) ;
322+ }
271323 const xdsCreds = new grpc_xds . XdsServerCredentials ( grpc . ServerCredentials . createInsecure ( ) ) ;
272324 await Promise . all ( [
273325 serverBindPromise ( maintenanceServer , `[::]:${ argv . maintenance_port } ` , grpc . ServerCredentials . createInsecure ( ) ) ,
274326 serverBindPromise ( server , `0.0.0.0:${ argv . port } ` , xdsCreds )
275327 ] ) ;
276328 } else {
329+ const interceptorList = [ unifiedInterceptor ] ;
330+ if ( metricInterceptor ) {
331+ interceptorList . push ( metricInterceptor ) ;
332+ }
277333 const server = new grpc . Server ( { interceptors : [ unifiedInterceptor ] } ) ;
278334 server . addService ( loadedProto . grpc . testing . XdsUpdateHealthService . service , xdsUpdateHealthServiceImpl ) ;
279335 healthImpl . addToServer ( server ) ;
280336 reflection . addToServer ( server ) ;
281337 grpc . addAdminServicesToServer ( server ) ;
338+ if ( metricRecorder ) {
339+ metricRecorder . addToServer ( server ) ;
340+ }
282341 server . addService ( loadedProto . grpc . testing . TestService . service , testServiceHandler ) ;
283342 const creds = grpc . ServerCredentials . createInsecure ( ) ;
284343 switch ( addressType ) {
0 commit comments