22// Executor of 'jobs' using the Redis task status notification mechanism
33
44const tracer = process . env . HF_VAR_ENABLE_TRACING === "1" ? require ( "./tracing.js" ) ( "hyperflow-job-executor" ) : undefined ;
5- const otelLogger = process . env . HF_VAR_ENABLE_TRACING === "1" ? require ( "./logs.js" ) ( "hyperflow-job-executor" ) : undefined ;
5+ const otelLogger = process . env . HF_VAR_ENABLE_OTEL === "1" ? require ( "./logs.js" ) ( "hyperflow-job-executor" ) : undefined ;
6+ const otelEnabled = process . env . HF_VAR_ENABLE_OTEL === "1" ;
67const { spawn} = require ( 'child_process' ) ;
78const redis = require ( 'redis' ) ;
89const fs = require ( 'fs' ) ;
@@ -41,7 +42,7 @@ async function handleJob(taskId, rcl, message) {
4142 let connector = new RemoteJobConnector ( rcl , wfId ) ;
4243
4344 // time interval (ms) at which to probe and log metrics
44- const probeInterval = process . env . HF_VAR_PROBE_INTERVAL || 1 ;
45+ const probeInterval = process . env . HF_VAR_PROBE_INTERVAL || 2000 ;
4546
4647 // **Experimental**: add job info to Redis "hf_all_jobs" set
4748 var allJobsMember = taskId + "#" + process . env . HF_LOG_NODE_NAME + "#" +
@@ -102,14 +103,13 @@ async function handleJob(taskId, rcl, message) {
102103 }
103104 }
104105
105-
106106 // periodically log process IO
107107 logProcIO = function ( pid ) {
108108 try {
109109 let ioInfo = procfs . processIo ( pid ) ;
110110 ioInfo . pid = pid ;
111111 ioInfo . name = jm [ "name" ] ;
112- if ( process . env . HF_VAR_ENABLE_TRACING === "1" ) {
112+ if ( otelLogger ) {
113113 otelLogger . emit (
114114 {
115115 observedTimestamp : Math . floor ( new Date ( ) . getTime ( ) / 1000 ) ,
@@ -137,7 +137,7 @@ async function handleJob(taskId, rcl, message) {
137137 let netDevInfo = procfs . processNetDev ( pid ) ;
138138 //netDevInfo.pid = pid;
139139 //netDevInfo.name = jm["name"];
140- if ( process . env . HF_VAR_ENABLE_TRACING === "1" ) {
140+ if ( otelLogger ) {
141141 otelLogger . emit (
142142 {
143143 observedTimestamp : Math . floor ( new Date ( ) . getTime ( ) / 1000 ) ,
@@ -176,7 +176,7 @@ async function handleJob(taskId, rcl, message) {
176176 // elapsed: 6650000, // ms since the start of the process
177177 // timestamp: 864000000 // ms since epoch
178178 // }
179- if ( process . env . HF_VAR_ENABLE_TRACING === "1" ) {
179+ if ( otelEnabled ) {
180180 cpuMetric . addCallback ( result => {
181181 result . observe ( stats . cpu , {
182182 ...metricBase ,
@@ -255,7 +255,7 @@ async function handleJob(taskId, rcl, message) {
255255 cmd . stderr . pipe ( stderrLog ) ;
256256
257257 logProcInfo ( targetPid ) ;
258- if ( process . env . HF_VAR_ENABLE_TRACING === "1" ) {
258+ if ( otelLogger ) {
259259 otelLogger . emit (
260260 {
261261 observedTimestamp : Math . floor ( Date . now ( ) ) ,
@@ -276,7 +276,7 @@ async function handleJob(taskId, rcl, message) {
276276 sysinfo . mem = data ;
277277 } ) . then ( data => {
278278 logger . info ( "Sysinfo:" , JSON . stringify ( sysinfo ) ) ;
279- if ( process . env . HF_VAR_ENABLE_TRACING === "1" ) {
279+ if ( otelLogger ) {
280280 otelLogger . emit (
281281 {
282282 observedTimestamp : Math . floor ( Date . now ( ) ) ,
@@ -342,7 +342,7 @@ async function handleJob(taskId, rcl, message) {
342342 logger . info ( 'job successful (try ' + attempt + '):' , jm [ "name" ] ) ;
343343 }
344344
345- if ( process . env . HF_VAR_ENABLE_TRACING === "1" ) {
345+ if ( otelLogger ) {
346346 otelLogger . emit (
347347 {
348348 observedTimestamp : Math . floor ( Date . now ( ) ) ,
@@ -546,7 +546,7 @@ async function handleJob(taskId, rcl, message) {
546546 name : jm [ 'name' ] ,
547547 }
548548
549- if ( process . env . HF_VAR_ENABLE_TRACING === "1" ) {
549+ if ( otelLogger ) {
550550 otelLogger . emit (
551551 {
552552 observedTimestamp : Math . floor ( Date . now ( ) ) ,
@@ -557,7 +557,7 @@ async function handleJob(taskId, rcl, message) {
557557 )
558558 }
559559
560- if ( process . env . HF_VAR_ENABLE_TRACING === "1" ) {
560+ if ( otelLogger ) {
561561 otelLogger . emit (
562562 {
563563 observedTimestamp : Math . floor ( Date . now ( ) ) ,
@@ -634,7 +634,7 @@ async function handleJob(taskId, rcl, message) {
634634 if ( err ) console . log ( err ) ;
635635 } ) ;
636636
637- if ( process . env . HF_VAR_ENABLE_TRACING === "1" ) {
637+ if ( otelLogger ) {
638638 otelLogger . emit (
639639 {
640640 observedTimestamp : Math . floor ( Date . now ( ) ) ,
0 commit comments