Skip to content

Commit 6fc0c60

Browse files
committed
Spans for abort transactions
1 parent 02c4c1f commit 6fc0c60

File tree

3 files changed

+115
-2
lines changed

3 files changed

+115
-2
lines changed

observability-test/spanner.ts

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ describe('EndToEnd', async () => {
142142
tracerProvider: tracerProvider,
143143
enableExtendedTracing: false,
144144
});
145+
let dbCounter = 1;
146+
147+
function newTestDatabase(): Database {
148+
return instance.database(`database-${dbCounter++}`,);
149+
}
145150

146151
const server = setupResult.server;
147152
const spannerMock = setupResult.spannerMock;
@@ -305,6 +310,7 @@ describe('EndToEnd', async () => {
305310
});
306311

307312
it('runTransactionAsync', async () => {
313+
308314
await database.runTransactionAsync(async transaction => {
309315
await transaction!.run('SELECT 1');
310316
});
@@ -327,6 +333,108 @@ describe('EndToEnd', async () => {
327333
);
328334
});
329335

336+
it('runTransaction with abort', done => {
337+
let attempts = 0;
338+
let rowCount = 0;
339+
const database = newTestDatabase();
340+
database.runTransaction(async (err, transaction) => {
341+
assert.ifError(err);
342+
if (!attempts) {
343+
spannerMock.abortTransaction(transaction!);
344+
}
345+
attempts++;
346+
transaction!.run(selectSql, (err, rows) => {
347+
assert.ifError(err);
348+
rows.forEach(() => rowCount++);
349+
transaction!
350+
.commit()
351+
.catch(done)
352+
.then(async () => {
353+
const expectedSpanNames = [
354+
'CloudSpanner.Database.batchCreateSessions',
355+
'CloudSpanner.SessionPool.createSessions',
356+
'CloudSpanner.Snapshot.runStream',
357+
'CloudSpanner.Snapshot.run',
358+
'CloudSpanner.Snapshot.runStream',
359+
'CloudSpanner.Snapshot.run',
360+
'CloudSpanner.Transaction.commit',
361+
'CloudSpanner.Snapshot.begin',
362+
'CloudSpanner.Database.runTransaction',
363+
];
364+
const expectedEventNames = [
365+
...cacheSessionEvents,
366+
'Using Session',
367+
'Retrying Transaction',
368+
'Starting stream',
369+
'exception',
370+
'Stream broken. Not safe to retry',
371+
'Begin Transaction',
372+
'Transaction Creation Done',
373+
'Starting stream',
374+
'Starting Commit',
375+
'Commit Done',
376+
];
377+
await verifySpansAndEvents(traceExporter, expectedSpanNames, expectedEventNames)
378+
database
379+
.close()
380+
.catch(done)
381+
.then(() => done());
382+
});
383+
});
384+
});
385+
});
386+
387+
it.only('runTransactionAsync with abort', async () => {
388+
let attempts = 0;
389+
const database = newTestDatabase();
390+
await database.runTransactionAsync((transaction): Promise<number> => {
391+
if (!attempts) {
392+
spannerMock.abortTransaction(transaction);
393+
}
394+
attempts++;
395+
return transaction.run(selectSql).then(([rows]) => {
396+
let count = 0;
397+
rows.forEach(() => count++);
398+
return transaction.commit().then(() => count);
399+
});
400+
});
401+
assert.strictEqual(attempts, 2);
402+
const expectedSpanNames = [
403+
'CloudSpanner.Database.batchCreateSessions',
404+
'CloudSpanner.SessionPool.createSessions',
405+
'CloudSpanner.Snapshot.runStream',
406+
'CloudSpanner.Snapshot.run',
407+
'CloudSpanner.Snapshot.begin',
408+
'CloudSpanner.Snapshot.runStream',
409+
'CloudSpanner.Snapshot.run',
410+
'CloudSpanner.Transaction.commit',
411+
'CloudSpanner.Database.runTransactionAsync',
412+
];
413+
const expectedEventNames = [
414+
'Requesting 25 sessions',
415+
'Creating 25 sessions',
416+
'Requested for 25 sessions returned 25',
417+
'Starting stream',
418+
'exception',
419+
'Stream broken. Not safe to retry',
420+
'Begin Transaction',
421+
'Transaction Creation Done',
422+
'Starting stream',
423+
'Starting Commit',
424+
'Commit Done',
425+
...cacheSessionEvents,
426+
'Using Session',
427+
'Retrying transaction',
428+
];
429+
await verifySpansAndEvents(
430+
traceExporter,
431+
expectedSpanNames,
432+
expectedEventNames
433+
);
434+
await database.close();
435+
});
436+
437+
330438
it('writeAtLeastOnce', done => {
331439
const blankMutations = new MutationSet();
332440
database.writeAtLeastOnce(blankMutations, async (err, response) => {

src/partial-result-stream.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {Readable, Transform} from 'stream';
2424
import * as streamEvents from 'stream-events';
2525
import {grpc, CallOptions} from 'google-gax';
2626
import {DeadlineError, isRetryableInternalError} from './transaction-runner';
27-
27+
import {getActiveOrNoopSpan, setSpanErrorAndException} from './instrument';
2828
import {codec, JSONOptions, Json, Field, Value} from './codec';
2929
import {google} from '../protos/protos';
3030
import * as stream from 'stream';
@@ -494,6 +494,7 @@ export function partialResultStream(
494494
let lastRequestStream: Readable;
495495
const startTime = Date.now();
496496
const timeout = options?.gaxOptions?.timeout ?? Infinity;
497+
const span = getActiveOrNoopSpan();
497498

498499
// mergeStream allows multiple streams to be connected into one. This is good;
499500
// if we need to retry a request and pipe more data to the user's stream.
@@ -568,6 +569,7 @@ export function partialResultStream(
568569
// checkpoint stream has queued. After that, we will destroy the
569570
// user's stream with the same error.
570571
setImmediate(() => batchAndSplitOnTokenStream.destroy(err));
572+
setSpanErrorAndException(span, err as Error);
571573
return;
572574
}
573575

src/transaction-runner.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {Session} from './session';
2323
import {Transaction} from './transaction';
2424
import {NormalCallback} from './common';
2525
import {isSessionNotFoundError} from './session-pool';
26+
import {getActiveOrNoopSpan} from './instrument';
2627
import {Database} from './database';
2728
import {google} from '../protos/protos';
2829
import IRequestOptions = google.spanner.v1.IRequestOptions;
@@ -238,6 +239,7 @@ export abstract class Runner<T> {
238239
this.session.lastError = e as grpc.ServiceError;
239240
lastError = e as grpc.ServiceError;
240241
}
242+
const span = getActiveOrNoopSpan();
241243

242244
// Note that if the error is a 'Session not found' error, it will be
243245
// thrown here. We do this to bubble this error up to the caller who is
@@ -250,7 +252,7 @@ export abstract class Runner<T> {
250252
}
251253

252254
this.attempts += 1;
253-
255+
span.addEvent('Retrying transaction');
254256
const delay = this.getNextDelay(lastError);
255257
await new Promise(resolve => setTimeout(resolve, delay));
256258
}
@@ -321,6 +323,7 @@ export class TransactionRunner extends Runner<void> {
321323
}
322324

323325
stream.unpipe(proxyStream);
326+
// proxyStream.emit('error', err);
324327
reject(err);
325328
})
326329
.pipe(proxyStream);

0 commit comments

Comments
 (0)