Skip to content

Commit 323b98a

Browse files
committed
Plumb more with await to ensure correct ending of callbacks before span.end
1 parent 97ad2ef commit 323b98a

File tree

3 files changed

+39
-49
lines changed

3 files changed

+39
-49
lines changed

src/database.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3229,8 +3229,8 @@ class Database extends common.GrpcServiceObject {
32293229
}
32303230

32313231
if (err) {
3232+
await runFn!(err as grpc.ServiceError);
32323233
span.end();
3233-
runFn!(err as grpc.ServiceError);
32343234
return;
32353235
}
32363236

@@ -3244,11 +3244,6 @@ class Database extends common.GrpcServiceObject {
32443244

32453245
const release = () => {
32463246
this.pool_.release(session!);
3247-
if (span.isRecording()) {
3248-
// span.end() might have already been invoked inside
3249-
// Transactionrunner.
3250-
span.end();
3251-
}
32523247
};
32533248

32543249
const runner = new TransactionRunner(
@@ -3259,26 +3254,27 @@ class Database extends common.GrpcServiceObject {
32593254
setSpanError(span, err!);
32603255
}
32613256
await runFn!(err, resp);
3262-
span.end();
32633257
},
32643258
options
32653259
);
32663260

3267-
runner.run().then(release, err => {
3261+
await runner.run().then(release, async err => {
32683262
setSpanError(span, err);
32693263

32703264
if (isSessionNotFoundError(err)) {
32713265
span.addEvent('No session available', {
32723266
'session.id': session?.id,
32733267
});
32743268
release();
3275-
this.runTransaction(options, runFn!);
3269+
await this.runTransaction(options, runFn!);
32763270
} else {
32773271
span.addEvent('Using Session', {'session.id': session!.id});
32783272
setImmediate(runFn!, err);
32793273
release();
32803274
}
32813275
});
3276+
3277+
span.end();
32823278
});
32833279
});
32843280
}

src/table.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
CommitCallback,
3030
} from './transaction';
3131
import {google as databaseAdmin} from '../protos/protos';
32+
import {google} from '../protos/protos';
3233
import {Schema, LongRunningCallback} from './common';
3334
import IRequestOptions = databaseAdmin.spanner.v1.IRequestOptions;
3435
import {
@@ -1105,22 +1106,23 @@ class Table {
11051106
requestOptions: requestOptions,
11061107
excludeTxnFromChangeStreams: excludeTxnFromChangeStreams,
11071108
},
1108-
(err, transaction) => {
1109+
async (err, transaction) => {
11091110
if (err) {
11101111
setSpanError(span, err);
1112+
await callback(err);
11111113
span.end();
1112-
callback(err);
11131114
return;
11141115
}
11151116

1116-
transaction![method](this.name, rows as Key[]);
1117-
transaction!.commit(options, (err, resp) => {
1118-
if (err) {
1119-
setSpanError(span, err);
1120-
}
1117+
try {
1118+
transaction![method](this.name, rows as Key[]);
1119+
const resp = await transaction!.commit(options);
1120+
await callback(err, resp as google.spanner.v1.ICommitResponse);
1121+
} catch (e) {
1122+
await callback(err, null);
1123+
} finally {
11211124
span.end();
1122-
callback(err, resp);
1123-
});
1125+
}
11241126
}
11251127
);
11261128
});

src/transaction.ts

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -456,19 +456,18 @@ export class Snapshot extends EventEmitter {
456456
gaxOpts,
457457
headers: headers,
458458
},
459-
(
459+
async (
460460
err: null | grpc.ServiceError,
461461
resp: spannerClient.spanner.v1.ITransaction
462462
) => {
463463
if (err) {
464464
setSpanError(span, err);
465-
span.end();
466-
callback!(err, resp);
467-
return;
465+
} else {
466+
this._update(resp);
468467
}
469-
this._update(resp);
468+
469+
await callback!(err, resp);
470470
span.end();
471-
callback!(null, resp);
472471
}
473472
);
474473
});
@@ -2149,35 +2148,25 @@ export class Transaction extends Dml {
21492148
opts: this._observabilityOptions,
21502149
dbName: this._dbName!,
21512150
};
2152-
return startTrace('Transaction.commit', traceConfig, span => {
2151+
startTrace('Transaction.commit', traceConfig, async span => {
21532152
if (this.id) {
21542153
reqOpts.transactionId = this.id as Uint8Array;
21552154
} else if (!this._useInRunner) {
21562155
reqOpts.singleUseTransaction = this._options;
21572156
} else {
2158-
this.begin()
2159-
.then(
2160-
() => {
2161-
this.commit(options, (err, resp) => {
2162-
if (err) {
2163-
setSpanError(span, err);
2164-
}
2165-
span.end();
2166-
callback(err, resp);
2167-
});
2168-
},
2169-
err => {
2170-
setSpanError(span, err);
2171-
callback(err);
2172-
span.end();
2173-
}
2174-
)
2175-
.catch(err => {
2176-
setSpanErrorAndException(span, err as Error);
2177-
span.end();
2178-
// Re-throw the exception after recording it.
2179-
throw err;
2180-
});
2157+
try {
2158+
await this.begin();
2159+
const resp = await this.commit(options);
2160+
span.end();
2161+
await callback(
2162+
null,
2163+
resp as spannerClient.spanner.v1.ICommitResponse
2164+
);
2165+
} catch (err) {
2166+
setSpanErrorAndException(span, err as Error);
2167+
span.end();
2168+
await callback(err as ServiceError, null);
2169+
}
21812170
return;
21822171
}
21832172

@@ -2215,7 +2204,10 @@ export class Transaction extends Dml {
22152204
gaxOpts: gaxOpts,
22162205
headers: headers,
22172206
},
2218-
(err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => {
2207+
async (
2208+
err: null | Error,
2209+
resp: spannerClient.spanner.v1.ICommitResponse
2210+
) => {
22192211
this.end();
22202212

22212213
if (err) {

0 commit comments

Comments
 (0)