Skip to content

Commit 7a168e8

Browse files
committed
fix(instrumentation): fix mongodb promise api instrumentation
1 parent bd4cd1a commit 7a168e8

File tree

5 files changed

+1080
-145
lines changed

5 files changed

+1080
-145
lines changed

lib/instrumentations/mongodb.js

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
var Shimmer = require('../utils/shimmer')
22
var consts = require('../consts')
33
var utils = require('./utils')
4+
var semver = require('semver')
45

56
var COLLECTION_OPERATIONS = [
67
'aggregate',
78
'count',
89
'deleteMany',
910
'deleteOne',
1011
'distinct',
12+
'find',
1113
'findAndModify',
1214
'findAndRemove',
1315
'findOne',
@@ -28,12 +30,15 @@ var COLLECTION_OPERATIONS = [
2830
'updateOne'
2931
]
3032

31-
module.exports = function (mongodb, agent) {
33+
function wrapper (mongodb, agent, pkg) {
3234
Shimmer.wrap(mongodb.Collection.prototype, COLLECTION_OPERATIONS, function (original, name) {
3335
return function () {
34-
var _this = this
36+
var self = this
3537
var args = Array.prototype.slice.apply(arguments)
3638
var host
39+
var continuationMethod
40+
var queryOptions = typeof args[args.length - 1] === 'function' ? args[args.length - 2] : args[args.length - 1]
41+
var version = pkg ? pkg.version : undefined
3742

3843
if (this.db && this.db.serverConfig) {
3944
host = this.db.serverConfig.host + ':' + this.db.serverConfig.port
@@ -43,17 +48,42 @@ module.exports = function (mongodb, agent) {
4348
host = this.s.topology.isMasterDoc.primary
4449
}
4550

51+
if (name === 'aggregate') {
52+
if (version && semver.satisfies(version, '>= 2.0.0')) {
53+
// above 2.0.0 if no callback is provided a cursor is returned
54+
continuationMethod = typeof args[args.length - 1] === 'function' ? 'callback' : 'readStream'
55+
} else {
56+
// below 2.0.0 if a cursor description is provided, a cursor is returned, otherwise callback is called
57+
continuationMethod = queryOptions.cursor ? 'readStream' : 'callback'
58+
}
59+
} else if (typeof args[args.length - 1] === 'function') {
60+
continuationMethod = 'callback'
61+
} else if (name === 'find') {
62+
continuationMethod = 'readStream'
63+
} else if (version && semver.satisfies(version, '>= 2.0.0')) {
64+
continuationMethod = 'promise'
65+
} else {
66+
continuationMethod = 'callback'
67+
}
68+
4669
return utils.wrapQuery.call(this, original, args, agent, {
4770
protocol: consts.PROTOCOLS.MONGODB,
48-
url: _this.collectionName || 'unknown',
71+
url: self.collectionName || 'unknown',
4972
host: host || 'unknown',
5073
method: name,
51-
disableCallback: name === 'aggregate'
74+
continuationMethod: continuationMethod
5275
})
5376
}
5477
})
5578

5679
return mongodb
5780
}
5881

59-
module.exports._COLLECTION_OPERATIONS = COLLECTION_OPERATIONS
82+
module.exports = {
83+
package: true,
84+
instrumentations: [{
85+
path: 'mongodb',
86+
post: wrapper
87+
}],
88+
_COLLECTION_OPERATIONS: COLLECTION_OPERATIONS
89+
}

lib/instrumentations/mongodb.spec.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
'use strict'
22

33
var expect = require('chai').expect
4-
var wrapper = require('./mongodb')
4+
var wrapper = require('./mongodb').instrumentations[0].post
5+
var COLLECTION_OPERATIONS = require('./mongodb')._COLLECTION_OPERATIONS
56
var Shimmer = require('../utils/shimmer')
67
var utils = require('./utils')
78

@@ -18,7 +19,7 @@ describe('The mongodb wrapper module', function () {
1819

1920
expect(shimmerWrapStub).to.have.been.calledWith(
2021
fakeMongo.Collection.prototype,
21-
wrapper._COLLECTION_OPERATIONS
22+
COLLECTION_OPERATIONS
2223
)
2324
})
2425

@@ -51,7 +52,7 @@ describe('The mongodb wrapper module', function () {
5152
method: 'find',
5253
protocol: 'mongodb',
5354
url: 'unknown',
54-
disableCallback: false
55+
continuationMethod: 'readStream'
5556
})
5657
})
5758

@@ -90,7 +91,7 @@ describe('The mongodb wrapper module', function () {
9091
method: 'aggregate',
9192
protocol: 'mongodb',
9293
url: 'unknown',
93-
disableCallback: true
94+
continuationMethod: 'callback'
9495
})
9596
})
9697

Lines changed: 88 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
var consts = require('../../consts')
2+
var EventEmitter = require('events')
23

34
function wrapQuery (original, args, agent, params) {
45
var childCommId = agent.generateCommId()
@@ -9,68 +10,103 @@ function wrapQuery (original, args, agent, params) {
910
var host = _params.host
1011
var url = _params.url
1112
var method = _params.method
12-
var returnsPromise = _params.returnsPromise || false
13-
var disableCallback = _params.disableCallback
13+
var continuationMethod = _params.continuationMethod || false // promise || readStream || callback
1414

1515
// custom error parsing depending on the instrumentation
1616
var parseError = _params.parseError || function () { }
1717

18-
var reportSend = function reportSend () {
19-
agent.clientSend({
20-
protocol: protocol,
21-
requestId: requestId,
22-
childCommId: childCommId,
23-
host: host,
24-
time: clientSendTime,
25-
method: method,
26-
type: agent.CLIENT_SEND,
27-
url: url
28-
})
18+
var reporter = {
19+
reportSend: function reportSend () {
20+
agent.clientSend({
21+
protocol: protocol,
22+
requestId: requestId,
23+
childCommId: childCommId,
24+
host: host,
25+
time: clientSendTime,
26+
method: method,
27+
type: agent.CLIENT_SEND,
28+
url: url
29+
})
30+
},
31+
reportReceive: function reportReceive (err) {
32+
var clientReceiveTime = agent.getMicrotime()
33+
agent.clientReceive({
34+
protocol: protocol,
35+
requestId: requestId,
36+
childCommId: childCommId,
37+
host: host,
38+
time: clientReceiveTime,
39+
url: url,
40+
method: method,
41+
mustCollect: err ? consts.MUST_COLLECT.ERROR : undefined,
42+
responseTime: clientReceiveTime - clientSendTime,
43+
status: err ? consts.EDGE_STATUS.NOT_OK : consts.EDGE_STATUS.OK,
44+
statusDescription: parseError(err)
45+
})
46+
}
2947
}
3048

31-
var reportReceive = function reportReceive (err) {
32-
var clientReceiveTime = agent.getMicrotime()
33-
agent.clientReceive({
34-
protocol: protocol,
35-
requestId: requestId,
36-
childCommId: childCommId,
37-
host: host,
38-
time: clientReceiveTime,
39-
url: url,
40-
method: method,
41-
mustCollect: err ? consts.MUST_COLLECT.ERROR : undefined,
42-
responseTime: clientReceiveTime - clientSendTime,
43-
status: err ? consts.EDGE_STATUS.NOT_OK : consts.EDGE_STATUS.OK,
44-
statusDescription: parseError(err)
45-
})
49+
if (continuationMethod === 'promise') {
50+
return wrapPromise.call(this, original, args, reporter)
51+
} else if (continuationMethod === 'readStream') {
52+
return wrapReadStream.call(this, original, args, reporter)
53+
} else if (continuationMethod === 'callback') { // uses callback
54+
return wrapCallback.call(this, original, _params, args, reporter)
55+
} else {
56+
return original.apply(this, args) // we might not want to instrument the method
4657
}
58+
}
4759

48-
if (returnsPromise) {
49-
reportSend()
50-
var originalPromise = original.apply(this, args)
51-
return originalPromise.then(
52-
function (v) { reportReceive(); return v },
53-
function (err) { reportReceive(err); return err }
54-
)
55-
} else { // uses callback
56-
var wrappedCallback = function (original) {
57-
return function (err) {
58-
reportReceive(err)
59-
return original.apply(this, arguments)
60-
}
61-
}
62-
var last = args[args.length - 1]
63-
if (last && typeof last === 'function') {
64-
args[args.length - 1] = wrappedCallback(last)
65-
} else if (Array.isArray(last) && typeof last[last.length - 1] === 'function') {
66-
var lastOfLast = last.length - 1
67-
args[args.length - 1][lastOfLast] = wrappedCallback(last[lastOfLast])
68-
} else if (!disableCallback) {
69-
args.push(wrappedCallback(function () { }))
60+
function wrapCallback (original, params, args, reporter) {
61+
var wrappedCallback = function (original) {
62+
return function (err) {
63+
reporter.reportReceive(err)
64+
return original.apply(this, arguments)
7065
}
71-
reportSend()
72-
return original.apply(this, args)
7366
}
67+
var last = args[args.length - 1]
68+
if (last && typeof last === 'function') {
69+
args[args.length - 1] = wrappedCallback(last)
70+
} else if (Array.isArray(last) && typeof last[last.length - 1] === 'function') {
71+
var lastOfLast = last.length - 1
72+
args[args.length - 1][lastOfLast] = wrappedCallback(last[lastOfLast])
73+
} else {
74+
args.push(wrappedCallback(function () { }))
75+
}
76+
reporter.reportSend()
77+
return original.apply(this, args)
78+
}
79+
80+
function wrapPromise (original, args, reporter) {
81+
reporter.reportSend()
82+
var originalPromise = original.apply(this, args)
83+
return originalPromise.then(
84+
function (v) { reporter.reportReceive(); return v },
85+
function (err) { reporter.reportReceive(err); throw err }
86+
)
87+
}
88+
89+
function wrapReadStream (original, args, reporter) {
90+
reporter.reportSend()
91+
var originalStream = original.apply(this, args)
92+
93+
originalStream.on('end', function () {
94+
reporter.reportReceive()
95+
})
96+
97+
originalStream.on('error', function (err) {
98+
reporter.reportReceive(err)
99+
100+
if (typeof originalStream.listenerCount === 'function') {
101+
if (originalStream.listenerCount('error') < 2) {
102+
throw err
103+
}
104+
} else if (EventEmitter.listenerCount(originalStream, 'error') < 2) {
105+
throw err
106+
}
107+
})
108+
109+
return originalStream
74110
}
75111

76112
module.exports = wrapQuery

0 commit comments

Comments
 (0)