Skip to content

Commit 5fef320

Browse files
committed
feat(observability): fix bugs found from product review + negative cases
This change adds recording of retry span annotations, catching cases in which exceptions where thrown but spans were not ended while testing out and visually confirming the results.
1 parent d0fe178 commit 5fef320

File tree

8 files changed

+1228
-132
lines changed

8 files changed

+1228
-132
lines changed

observability-test/spanner.ts

Lines changed: 1036 additions & 38 deletions
Large diffs are not rendered by default.

observability-test/transaction.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ describe('Transaction', () => {
479479
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
480480
);
481481

482-
const expectedEventNames = ['exception'];
482+
const expectedEventNames = ['Starting stream', 'exception'];
483483
assert.deepStrictEqual(
484484
actualEventNames,
485485
expectedEventNames,

src/database.ts

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,6 +1130,8 @@ class Database extends common.GrpcServiceObject {
11301130
} catch (e) {
11311131
setSpanErrorAndException(span, e as Error);
11321132
this.emit('error', e);
1133+
} finally {
1134+
span.end();
11331135
}
11341136
});
11351137
}
@@ -2101,6 +2103,9 @@ class Database extends common.GrpcServiceObject {
21012103
session!.lastError = err;
21022104
this.pool_.release(session!);
21032105
this.getSnapshot(options, (err, snapshot) => {
2106+
if (err) {
2107+
setSpanError(span, err);
2108+
}
21042109
span.end();
21052110
callback!(err, snapshot);
21062111
});
@@ -2815,6 +2820,7 @@ class Database extends common.GrpcServiceObject {
28152820
this.runStream(query, options)
28162821
.on('error', err => {
28172822
setSpanError(span, err);
2823+
span.end();
28182824
callback!(err as grpc.ServiceError, rows, stats, metadata);
28192825
})
28202826
.on('response', response => {
@@ -3054,7 +3060,6 @@ class Database extends common.GrpcServiceObject {
30543060
let dataStream = snapshot.runStream(query);
30553061

30563062
const endListener = () => {
3057-
span.end();
30583063
snapshot.end();
30593064
};
30603065
dataStream
@@ -3083,6 +3088,11 @@ class Database extends common.GrpcServiceObject {
30833088
// Create a new data stream and add it to the end user stream.
30843089
dataStream = this.runStream(query, options);
30853090
dataStream.pipe(proxyStream);
3091+
dataStream.on('end', () => span.end());
3092+
dataStream.on('error', err => {
3093+
setSpanError(span, err);
3094+
span.end();
3095+
});
30863096
} else {
30873097
proxyStream.destroy(err);
30883098
snapshot.end();
@@ -3098,7 +3108,9 @@ class Database extends common.GrpcServiceObject {
30983108
if (err) {
30993109
setSpanError(span, err);
31003110
}
3101-
span.end();
3111+
if (span.isRecording()) {
3112+
span.end();
3113+
}
31023114
});
31033115

31043116
return proxyStream as PartialResultStream;
@@ -3212,6 +3224,26 @@ class Database extends common.GrpcServiceObject {
32123224
? (optionsOrRunFn as RunTransactionOptions)
32133225
: {};
32143226

3227+
const retry = (span: Span) => {
3228+
this.runTransaction(options, (err, txn) => {
3229+
if (err) {
3230+
setSpanError(span, err);
3231+
span.end();
3232+
runFn!(err, null);
3233+
return;
3234+
}
3235+
3236+
txn!.once('end', () => {
3237+
span.end();
3238+
});
3239+
txn!.once('error', err => {
3240+
setSpanError(span, err!);
3241+
span.end();
3242+
});
3243+
runFn!(null, txn!);
3244+
});
3245+
};
3246+
32153247
startTrace('Database.runTransaction', this._traceConfig, span => {
32163248
this.pool_.getSession((err, session?, transaction?) => {
32173249
if (err) {
@@ -3222,8 +3254,7 @@ class Database extends common.GrpcServiceObject {
32223254
span.addEvent('No session available', {
32233255
'session.id': session?.id,
32243256
});
3225-
this.runTransaction(options, runFn!);
3226-
span.end();
3257+
retry(span);
32273258
return;
32283259
}
32293260

@@ -3241,41 +3272,42 @@ class Database extends common.GrpcServiceObject {
32413272
transaction!.excludeTxnFromChangeStreams();
32423273
}
32433274

3244-
const release = () => {
3275+
// Our span should only be ended if the
3276+
// transaction either errored or was ended.
3277+
transaction!.once('error', err => {
3278+
setSpanError(span, err!);
3279+
span.end();
3280+
});
3281+
3282+
transaction!.once('end', err => {
3283+
setSpanError(span, err!);
32453284
span.end();
3285+
});
3286+
3287+
const release = () => {
32463288
this.pool_.release(session!);
32473289
};
32483290

32493291
const runner = new TransactionRunner(
32503292
session!,
32513293
transaction!,
3252-
(err, resp) => {
3253-
if (err) {
3254-
setSpanError(span, err!);
3255-
}
3256-
span.end();
3257-
runFn!(err, resp);
3258-
},
3294+
runFn!,
32593295
options
32603296
);
32613297

32623298
runner.run().then(release, err => {
3263-
if (err) {
3264-
setSpanError(span, err!);
3265-
}
3299+
setSpanError(span, err);
32663300

32673301
if (isSessionNotFoundError(err)) {
32683302
span.addEvent('No session available', {
32693303
'session.id': session?.id,
32703304
});
32713305
release();
3272-
this.runTransaction(options, runFn!);
3306+
retry(span);
32733307
} else {
3274-
if (!err) {
3275-
span.addEvent('Using Session', {'session.id': session!.id});
3276-
}
32773308
setImmediate(runFn!, err);
32783309
release();
3310+
span.end();
32793311
}
32803312
});
32813313
});
@@ -3536,6 +3568,11 @@ class Database extends common.GrpcServiceObject {
35363568
mutationGroups,
35373569
options
35383570
);
3571+
dataStream.once('end', () => span.end());
3572+
dataStream.once('error', err => {
3573+
setSpanError(span, err!);
3574+
span.end();
3575+
});
35393576
dataStream.pipe(proxyStream);
35403577
} else {
35413578
span.end();
@@ -3635,8 +3672,14 @@ class Database extends common.GrpcServiceObject {
36353672
span.addEvent('No session available', {
36363673
'session.id': session?.id,
36373674
});
3638-
this.writeAtLeastOnce(mutations, options, cb!);
3639-
span.end();
3675+
// Retry this method.
3676+
this.writeAtLeastOnce(mutations, options, (err, resp) => {
3677+
if (err) {
3678+
setSpanError(span, err);
3679+
}
3680+
span.end();
3681+
cb!(err, resp);
3682+
});
36403683
return;
36413684
}
36423685
if (err) {

src/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@ import {
8080
import grpcGcpModule = require('grpc-gcp');
8181
const grpcGcp = grpcGcpModule(grpc);
8282
import * as v1 from './v1';
83-
import {ObservabilityOptions} from './instrument';
83+
import {
84+
ObservabilityOptions,
85+
ensureInitialContextManagerSet,
86+
} from './instrument';
8487

8588
// eslint-disable-next-line @typescript-eslint/no-var-requires
8689
const gcpApiConfig = require('./spanner_grpc_config.json');
@@ -370,6 +373,7 @@ class Spanner extends GrpcService {
370373
};
371374
this.directedReadOptions = directedReadOptions;
372375
this._observabilityOptions = options.observabilityOptions;
376+
ensureInitialContextManagerSet();
373377
}
374378

375379
/**

src/instrument.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ function ensureInitialContextManagerSet() {
117117
context.setGlobalContextManager(contextManager);
118118
}
119119
}
120+
export {ensureInitialContextManagerSet};
120121

121122
/**
122123
* startTrace begins an active span in the current active context
@@ -136,8 +137,6 @@ export function startTrace<T>(
136137
config = {} as traceConfig;
137138
}
138139

139-
ensureInitialContextManagerSet();
140-
141140
return getTracer(config.opts?.tracerProvider).startActiveSpan(
142141
SPAN_NAMESPACE_PREFIX + '.' + spanNameSuffix,
143142
{kind: SpanKind.CLIENT},
@@ -165,11 +164,20 @@ export function startTrace<T>(
165164
}
166165
}
167166

168-
if (config.that) {
169-
const fn = cb.bind(config.that);
170-
return fn(span);
171-
} else {
172-
return cb(span);
167+
// If at all the invoked function throws an exception,
168+
// record the exception and then end this span.
169+
try {
170+
if (config.that) {
171+
const fn = cb.bind(config.that);
172+
return fn(span);
173+
} else {
174+
return cb(span);
175+
}
176+
} catch (e) {
177+
setSpanErrorAndException(span, e as Error);
178+
span.end();
179+
// Finally re-throw the exception.
180+
throw e;
173181
}
174182
}
175183
);

src/table.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import {
3535
ObservabilityOptions,
3636
startTrace,
3737
setSpanError,
38+
setSpanErrorAndException,
3839
traceConfig,
3940
} from './instrument';
4041

0 commit comments

Comments
 (0)