Skip to content

Commit 36205e9

Browse files
authored
refactor: simplified bull child process initialization (#2008)
refs #2002
1 parent 3d5caaa commit 36205e9

File tree

6 files changed

+22
-20
lines changed

6 files changed

+22
-20
lines changed

packages/collector/src/immediate.js

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ if (isNodeJsTooOld()) {
1818
}
1919

2020
const { util: coreUtil } = require('@instana/core');
21-
const agentOpts = require('./agent/opts');
2221

2322
// This file can be used with NODE_OPTIONS or `node --require` to instrument a Node.js app with Instana without
2423
// modifying the source code. See
@@ -27,22 +26,14 @@ const agentOpts = require('./agent/opts');
2726

2827
const isExcludedFromInstrumentation = coreUtil.excludedFromInstrumentation && coreUtil.excludedFromInstrumentation();
2928

30-
// In case this is a child process of an instrumented parent process we might receive the agent uuid from the parent
31-
// process to be able to produce and collect spans immediately without waiting for a connection to the agent in this
32-
// process.
33-
// TODO: This does not work because you would report spans with parent agent uuid and the child process pid -
34-
// this is not compatible. Our codebase does not support this.
35-
const parentProcessAgentUuid = process.env.INSTANA_AGENT_UUID;
29+
// CASE: This process is a forked child process of a bull worker.
30+
const currentProcessIsBullChildProcess = process.env.INSTANA_IS_BULL_CHILD_PROCESS === 'true';
3631

3732
if (!isExcludedFromInstrumentation) {
38-
if (parentProcessAgentUuid) {
39-
// @ts-ignore - Type 'string' is not assignable to type 'undefined'
40-
// Probably because exports.agentUuid is set to undefined and export values were not supposed to be changed
41-
// TODO: This has no effect. Remove! See comment above.
42-
agentOpts.agentUuid = parentProcessAgentUuid;
33+
if (currentProcessIsBullChildProcess) {
4334
require('./index')({
4435
tracing: {
45-
activateImmediately: true,
36+
// If we don't ACTIVATE the process instrumentation for bull forked processes immediately, we miss this event.
4637
activateBullProcessInstrumentation: true,
4738
forceTransmissionStartingAt: 1
4839
}

packages/collector/src/tracing/instrumentation/process/childProcess.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
'use strict';
88

9-
const processIdentityProvider = require('../../../pidStore');
10-
119
const coreChildProcess = require('child_process');
1210
const { tracing } = require('@instana/core');
1311
const shimmer = tracing.shimmer;
@@ -90,7 +88,9 @@ function shimFork(original) {
9088
`Detected a child_process.fork of Bull, instrumenting it by adding --require ${selfPath.immediate}.`
9189
);
9290

93-
process.env.INSTANA_AGENT_UUID = processIdentityProvider.getFrom().h;
91+
// NOTE: master.js is forked here!
92+
// https://github.com/OptimalBits/bull/blob/v4.16.5/lib/process/master.js
93+
process.env.INSTANA_IS_BULL_CHILD_PROCESS = 'true';
9494
args.execArgv.unshift(selfPath.immediate);
9595
args.execArgv.unshift('--require');
9696
}

packages/collector/test/tracing/messaging/bull/util.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ function processJob(job, done, log, info) {
4646
return Promise.reject(new Error('Invalid data. Expected data structure is {name: string}'));
4747
}
4848
} else {
49-
log(`Consuming: ${info || 'no extra info provided'}: ${JSON.stringify(job.data)}`);
49+
log(`Consuming: ${info || 'no extra info provided'}: ${JSON.stringify(job)}`);
5050

5151
if (done) {
5252
setTimeout(() => {
53-
writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job)).then(() => {
53+
writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job), log).then(() => {
5454
fetch(`http://127.0.0.1:${agentPort}`)
5555
.then(() => {
5656
log('The follow up request after receiving a message has happened.');
@@ -73,7 +73,7 @@ function processJob(job, done, log, info) {
7373
}, 100);
7474
} else {
7575
return delay(100)
76-
.then(() => writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job)))
76+
.then(() => writeToAFileToProveThatThisParticularJobHasBeenProcessed(getJobData(job), log))
7777
.then(() => fetch(`http://127.0.0.1:${agentPort}`))
7878
.then(() => {
7979
log('The follow up request after receiving a message has happened.');
@@ -155,7 +155,7 @@ function getJobData(job) {
155155
};
156156
}
157157

158-
function writeToAFileToProveThatThisParticularJobHasBeenProcessed(jobData) {
158+
function writeToAFileToProveThatThisParticularJobHasBeenProcessed(jobData, log) {
159159
let fileCreatedByJob;
160160

161161
if (jobData.data.bulkIndex) {
@@ -186,6 +186,8 @@ function writeToAFileToProveThatThisParticularJobHasBeenProcessed(jobData) {
186186

187187
jobData.pid = process.pid;
188188

189+
log(`Writing file ${fileCreatedByJob} to prove that this job has been processed.`);
190+
189191
return new Promise((resolve, reject) => {
190192
fs.writeFile(fileCreatedByJob, JSON.stringify(jobData, null, 2), (err, success) => {
191193
if (err) {

packages/core/src/tracing/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ exports.init = function init(_config, downstreamConnection, _processIdentityProv
248248
}
249249
}
250250

251+
// TODO: This is not used anymore. Investigate any usages. Potentially remove/deprecate in the next major release.
251252
if (config.tracing.activateImmediately) {
252253
exports.activate();
253254
}

packages/core/src/tracing/instrumentation/messaging/bull.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ function instrumentedJobCreate(ctx, originalJobCreate, originalArgs, options) {
7979
// Job.create args: Queue data, job name or ctx.DEFAULT_JOB_NAME, job data, options
8080

8181
// queue name should always be found, as it's required in order to start the whole process
82+
8283
const queueName = (originalArgs[0] && originalArgs[0].name) || 'name not found';
8384

8485
return cls.ns.runAndReturn(() => {
@@ -238,6 +239,8 @@ function instrumentedProcessJob(ctx, originalProcessJob, originalArgs) {
238239
}
239240

240241
// TODO: The entry is CREATED BEFORE the child process is forked. Its created ON THE receiver process.
242+
// Sender process -> bull exit (create jobs in redis)
243+
// Receiver process -> apply for jobs -> bull entry -> process jobs via forked processes from bull
241244
// This is not correct.
242245
const span = cls.startSpan({
243246
spanName: exports.spanName,

packages/core/src/tracing/instrumentation/process/process.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ exports.init = function init(config) {
2424
if (config.tracing.activateBullProcessInstrumentation) {
2525
shimmer.wrap(process, 'emit', shimProcessEmitForBullChildWorker);
2626
shimmer.wrap(process, 'send', shimProcessSendForBullChildWorker);
27+
28+
// Activate immediately, otherwise we miss the first message event and the instana
29+
// headers would not get removed from the message.
30+
exports.activate();
2731
}
2832
};
2933

@@ -32,6 +36,7 @@ function shimProcessEmitForBullChildWorker(originalProcessEmit) {
3236
if (!isActive || event !== 'message') {
3337
return originalProcessEmit.apply(this, arguments);
3438
}
39+
3540
const ipcMessage = arguments[1];
3641

3742
if (

0 commit comments

Comments
 (0)