diff --git a/observability-test/database.ts b/observability-test/database.ts index 39ebe9afc..7df1ac6fc 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -682,7 +682,7 @@ describe('Database', () => { 'Expected that secondRetrySpan is the child to parentSpan' ); - const expectedEventNames = ['No session available']; + const expectedEventNames = ['No session available', 'Using Session']; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -1558,7 +1558,7 @@ describe('Database', () => { ); // We don't expect events. - const expectedEventNames = []; + const expectedEventNames = ['Using Session']; assert.deepStrictEqual( actualEventNames, expectedEventNames, diff --git a/observability-test/helper.ts b/observability-test/helper.ts index 591171666..58a1d13d4 100644 --- a/observability-test/helper.ts +++ b/observability-test/helper.ts @@ -36,6 +36,7 @@ export const cacheSessionEvents = [ 'Acquiring session', 'Cache hit: has usable session', 'Acquired session', + 'Using Session', ]; /** @@ -82,14 +83,25 @@ export async function verifySpansAndEvents( actualEventNames.push(event.name); }); }); + + assert.strictEqual( + actualSpanNames.length, + expectedSpans.length, + `Span count mismatch: Expected ${expectedSpans.length} spans, but received ${actualSpanNames.length} spans` + ); assert.deepStrictEqual( actualSpanNames, expectedSpans, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpans}` ); + assert.strictEqual( + actualEventNames.length, + expectedEvents.length, + `Event count mismatch: Expected ${expectedEvents.length} events, but received ${actualEventNames.length} events` + ); assert.deepStrictEqual( - actualEventNames, - expectedEvents, + actualEventNames.sort(), + expectedEvents.sort(), `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEvents}` ); } diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index c60549776..aea657e84 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -142,6 +142,11 @@ describe('EndToEnd', async () => { tracerProvider: tracerProvider, enableExtendedTracing: false, }); + let dbCounter = 1; + + function newTestDatabase(): Database { + return instance.database(`database-${dbCounter++}`); + } const server = setupResult.server; const spannerMock = setupResult.spannerMock; @@ -195,7 +200,6 @@ describe('EndToEnd', async () => { 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Database.getSnapshot', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', ]; const expectedEventNames = [ 'Begin Transaction', @@ -221,7 +225,7 @@ describe('EndToEnd', async () => { transaction!.commit(); const expectedSpanNames = ['CloudSpanner.Database.getTransaction']; - const expectedEventNames = [...cacheSessionEvents, 'Using Session']; + const expectedEventNames = [...cacheSessionEvents]; await verifySpansAndEvents( traceExporter, expectedSpanNames, @@ -241,11 +245,7 @@ describe('EndToEnd', async () => { 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Database.runStream', ]; - const expectedEventNames = [ - 'Starting stream', - ...cacheSessionEvents, - 'Using Session', - ]; + const expectedEventNames = ['Starting stream', ...cacheSessionEvents]; await verifySpansAndEvents( traceExporter, expectedSpanNames, @@ -263,11 +263,7 @@ describe('EndToEnd', async () => { 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', ]; - const expectedEventNames = [ - 'Starting stream', - ...cacheSessionEvents, - 'Using Session', - ]; + const expectedEventNames = ['Starting stream', ...cacheSessionEvents]; await verifySpansAndEvents( traceExporter, expectedSpanNames, @@ -283,16 +279,15 @@ describe('EndToEnd', async () => { await transaction!.end(); const expectedSpanNames = [ 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Transaction.commit', 'CloudSpanner.Database.runTransaction', ]; const expectedEventNames = [ 'Starting stream', - 'Transaction Creation Done', 'Starting Commit', 'Commit Done', ...cacheSessionEvents, + 'Transaction Creation Done', ]; await verifySpansAndEvents( @@ -311,14 +306,12 @@ describe('EndToEnd', async () => { const expectedSpanNames = [ 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Database.runTransactionAsync', ]; const expectedEventNames = [ 'Starting stream', - 'Transaction Creation Done', ...cacheSessionEvents, - 'Using Session', + 'Transaction Creation Done', ]; await verifySpansAndEvents( traceExporter, @@ -327,6 +320,101 @@ describe('EndToEnd', async () => { ); }); + it.skip('runTransaction with abort', done => { + let attempts = 0; + let rowCount = 0; + const database = newTestDatabase(); + database.runTransaction(async (err, transaction) => { + assert.ifError(err); + if (!attempts) { + spannerMock.abortTransaction(transaction!); + } + attempts++; + transaction!.run(selectSql, (err, rows) => { + assert.ifError(err); + rows.forEach(() => rowCount++); + transaction! + .commit() + .catch(done) + .then(async () => { + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.runTransaction', + ]; + const expectedEventNames = [ + ...batchCreateSessionsEvents, + 'Starting stream', + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting stream', + 'Starting Commit', + 'Commit Done', + ...waitingSessionsEvents, + 'Retrying transaction', + ]; + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames + ); + database + .close() + .catch(done) + .then(() => done()); + }); + }); + }); + }); + + it('runTransactionAsync with abort', async () => { + let attempts = 0; + const database = newTestDatabase(); + await database.runTransactionAsync((transaction): Promise => { + if (!attempts) { + spannerMock.abortTransaction(transaction); + } + attempts++; + return transaction.run(selectSql).then(([rows]) => { + let count = 0; + rows.forEach(() => count++); + return transaction.commit().then(() => count); + }); + }); + assert.strictEqual(attempts, 2); + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.runTransactionAsync', + ]; + const expectedEventNames = [ + ...batchCreateSessionsEvents, + 'Starting stream', + 'Stream broken. Not safe to retry', + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting stream', + 'Starting Commit', + 'Commit Done', + ...waitingSessionsEvents, + 'Retrying transaction', + ]; + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames + ); + await database.close(); + }); + it('writeAtLeastOnce', done => { const blankMutations = new MutationSet(); database.writeAtLeastOnce(blankMutations, async (err, response) => { @@ -340,7 +428,6 @@ describe('EndToEnd', async () => { 'Starting Commit', 'Commit Done', ...cacheSessionEvents, - 'Using Session', ]; await verifySpansAndEvents( traceExporter, @@ -371,7 +458,6 @@ describe('EndToEnd', async () => { const expectedSpanNames = [ 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Dml.runUpdate', 'CloudSpanner.PartitionedDml.runUpdate', 'CloudSpanner.Database.runPartitionedUpdate', @@ -530,7 +616,6 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedSpanNames = [ 'CloudSpanner.Database.getTransaction', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', ]; assert.deepStrictEqual( actualSpanNames, @@ -540,7 +625,6 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedEventNames = [ ...cacheSessionEvents, - 'Using Session', 'Starting stream', 'Transaction Creation Done', ]; @@ -585,7 +669,6 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedSpanNames = [ 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Dml.runUpdate', ]; assert.deepStrictEqual( @@ -649,7 +732,6 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedEventNames = [ ...cacheSessionEvents, - 'Using Session', 'Starting stream', ]; assert.deepStrictEqual( @@ -694,7 +776,6 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedSpanNames = [ 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Dml.runUpdate', 'CloudSpanner.Transaction.rollback', ]; @@ -1247,7 +1328,6 @@ SELECT 1p 'CloudSpanner.Database.batchCreateSessions', 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Transaction.commit', @@ -1259,19 +1339,6 @@ SELECT 1p expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const spanSnapshotRun = spans[3]; - assert.strictEqual(spanSnapshotRun.name, 'CloudSpanner.Snapshot.run'); - const wantSpanErr = '6 ALREADY_EXISTS: ' + messageBadInsertAlreadyExistent; - assert.deepStrictEqual( - spanSnapshotRun.status.code, - SpanStatusCode.ERROR, - 'Unexpected status code' - ); - assert.deepStrictEqual( - spanSnapshotRun.status.message, - wantSpanErr, - 'Unexpexcted error message' - ); const databaseBatchCreateSessionsSpan = spans[0]; assert.strictEqual( @@ -1300,8 +1367,7 @@ SELECT 1p // We need to ensure a strict relationship between the spans. // |-Database.runTransactionAsync |-------------------------------------| - // |-Snapshot.run |------------------------| - // |-Snapshot.runStream |---------------------| + // |-Snapshot.runStream |---------------------| // |-Transaction.commit |--------| // |-Snapshot.begin |------| // |-Snapshot.commit |-----| @@ -1322,12 +1388,6 @@ SELECT 1p 'Expected that Database.runTransaction is the parent to Transaction.commmit' ); - assert.deepStrictEqual( - spanSnapshotRun.parentSpanId, - spanDatabaseRunTransactionAsync.spanContext().spanId, - 'Expected that Database.runTransaction is the parent to Snapshot.run' - ); - // Assert that despite all being exported, SessionPool.createSessions // is not in the same trace as runStream, createSessions is invoked at // Spanner Client instantiation, thus before database.run is invoked. @@ -1848,7 +1908,6 @@ describe('Traces for ExecuteStream broken stream retries', () => { 'CloudSpanner.Database.batchCreateSessions', 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', 'CloudSpanner.Dml.runUpdate', 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Transaction.commit', diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts index 20e604d94..2cfe50d31 100644 --- a/observability-test/transaction.ts +++ b/observability-test/transaction.ts @@ -332,106 +332,6 @@ describe('Transaction', () => { }); }); - describe('run', () => { - const QUERY = 'SELET * FROM `MyTable`'; - - let fakeStream; - - beforeEach(() => { - fakeStream = new EventEmitter(); - sandbox.stub(snapshot, 'runStream').returns(fakeStream); - }); - - it('without error', done => { - const fakeRows = [{a: 'b'}, {c: 'd'}, {e: 'f'}]; - - snapshot.run(QUERY, (err, rows) => { - assert.ifError(err); - assert.deepStrictEqual(rows, fakeRows); - - const exportResults = extractExportedSpans(); - const actualSpanNames = exportResults.spanNames; - const actualEventNames = exportResults.spanEventNames; - - const expectedSpanNames = ['CloudSpanner.Snapshot.run']; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - const expectedEventNames = []; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` - ); - - // Ensure that the final span that got retries did not error. - const spans = exportResults.spans; - const firstSpan = spans[0]; - assert.strictEqual( - SpanStatusCode.UNSET, - firstSpan.status.code, - 'Unexpected an span status code' - ); - assert.strictEqual( - undefined, - firstSpan.status.message, - 'Unexpected span status message' - ); - done(); - }); - - fakeRows.forEach(row => fakeStream.emit('data', row)); - fakeStream.emit('end'); - }); - - it('with errors', done => { - const fakeError = new Error('run.error'); - - snapshot.run(QUERY, err => { - assert.strictEqual(err, fakeError); - - const exportResults = extractExportedSpans(); - const actualSpanNames = exportResults.spanNames; - const actualEventNames = exportResults.spanEventNames; - - const expectedSpanNames = ['CloudSpanner.Snapshot.run']; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - const expectedEventNames = []; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` - ); - - // Ensure that the final span that got retries did not error. - const spans = exportResults.spans; - const firstSpan = spans[0]; - assert.strictEqual( - SpanStatusCode.ERROR, - firstSpan.status.code, - 'Unexpected an span status code' - ); - assert.strictEqual( - 'run.error', - firstSpan.status.message, - 'Unexpected span status message' - ); - - done(); - }); - - fakeStream.emit('error', fakeError); - }); - }); - describe('runStream', () => { const QUERY = { sql: 'SELECT * FROM `MyTable`', diff --git a/src/database.ts b/src/database.ts index a81c5cfd5..6f0468b93 100644 --- a/src/database.ts +++ b/src/database.ts @@ -2110,7 +2110,6 @@ class Database extends common.GrpcServiceObject { span.end(); this.getSnapshot(options, callback!); } else { - span.addEvent('Using Session', {'session.id': session?.id}); this.pool_.release(session!); span.end(); callback!(err); @@ -2118,6 +2117,7 @@ class Database extends common.GrpcServiceObject { return; } + span.addEvent('Using Session', {'session.id': session?.id}); this._releaseOnEnd(session!, snapshot, span); span.end(); callback!(err, snapshot); @@ -3244,6 +3244,7 @@ class Database extends common.GrpcServiceObject { return; } + span.addEvent('Using Session', {'session.id': session?.id}); transaction!._observabilityOptions = this._observabilityOptions; if (options.optimisticLock) { transaction!.useOptimisticLock(); diff --git a/src/instrument.ts b/src/instrument.ts index 8ad123bf6..994de20d7 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -200,6 +200,18 @@ export function setSpanError(span: Span, err: Error | String): boolean { return true; } +/** + * Sets the span status with err and end, if non-null onto the span with + * status.code=ERROR and the message of err.toString() + * + * @returns {boolean} to signify if the status was set. + */ +export function setSpanErrorAndEnd(span: Span, err: Error | String): boolean { + const status = setSpanError(span, err); + span.end(); + return status; +} + /** * Sets err, if non-null onto the span with * status.code=ERROR and the message of err.toString() diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index 69439f534..340d171e6 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -24,7 +24,7 @@ import {Readable, Transform} from 'stream'; import * as streamEvents from 'stream-events'; import {grpc, CallOptions} from 'google-gax'; import {DeadlineError, isRetryableInternalError} from './transaction-runner'; - +import {Span} from './instrument'; import {codec, JSONOptions, Json, Field, Value} from './codec'; import {google} from '../protos/protos'; import * as stream from 'stream'; @@ -97,6 +97,7 @@ export interface RowOptions { */ columnsMetadata?: object; gaxOptions?: CallOptions; + span?: Span; } /** @@ -183,16 +184,16 @@ interface ResultEvents { export class PartialResultStream extends Transform implements ResultEvents { private _destroyed: boolean; private _fields!: google.spanner.v1.StructType.Field[]; - private _options: RowOptions; private _pendingValue?: p.IValue; private _pendingValueForResume?: p.IValue; private _values: p.IValue[]; private _numPushFailed = 0; + options: RowOptions; constructor(options = {}) { super({objectMode: true}); this._destroyed = false; - this._options = Object.assign({maxResumeRetries: 20}, options); + this.options = Object.assign({maxResumeRetries: 20}, options); this._values = []; } /** @@ -271,7 +272,7 @@ export class PartialResultStream extends Transform implements ResultEvents { // Downstream returned false indicating that it is still not ready for // more data. this._numPushFailed++; - if (this._numPushFailed === this._options.maxResumeRetries) { + if (this._numPushFailed === this.options.maxResumeRetries) { this.destroy( new Error( `Stream is still not ready to receive data after ${this._numPushFailed} attempts to resume.` @@ -359,8 +360,8 @@ export class PartialResultStream extends Transform implements ResultEvents { const row: Row = this._createRow(values); - if (this._options.json) { - return this.push(row.toJSON(this._options.jsonOptions)); + if (this.options.json) { + return this.push(row.toJSON(this.options.jsonOptions)); } return this.push(row); @@ -376,7 +377,7 @@ export class PartialResultStream extends Transform implements ResultEvents { private _createRow(values: Value[]): Row { const fields = values.map((value, index) => { const {name, type} = this._fields[index]; - const columnMetadata = this._options.columnsMetadata?.[name]; + const columnMetadata = this.options.columnsMetadata?.[name]; return { name, value: codec.decode( diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index 61d979e8c..9f1d01328 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -23,6 +23,7 @@ import {Session} from './session'; import {Transaction} from './transaction'; import {NormalCallback} from './common'; import {isSessionNotFoundError} from './session-pool'; +import {getActiveOrNoopSpan, setSpanErrorAndEnd} from './instrument'; import {Database} from './database'; import {google} from '../protos/protos'; import IRequestOptions = google.spanner.v1.IRequestOptions; @@ -238,6 +239,7 @@ export abstract class Runner { this.session.lastError = e as grpc.ServiceError; lastError = e as grpc.ServiceError; } + const span = getActiveOrNoopSpan(); // Note that if the error is a 'Session not found' error, it will be // thrown here. We do this to bubble this error up to the caller who is @@ -250,7 +252,7 @@ export abstract class Runner { } this.attempts += 1; - + span.addEvent('Retrying transaction'); const delay = this.getNextDelay(lastError); await new Promise(resolve => setTimeout(resolve, delay)); } @@ -312,9 +314,12 @@ export class TransactionRunner extends Runner { transaction.requestStream = (config: object) => { const proxyStream = through.obj(); const stream = requestStream(config); + const resultStream = transaction.resultStream; stream .on('error', (err: grpc.ServiceError) => { + resultStream?.options.span && + setSpanErrorAndEnd(resultStream?.options.span, err); if (!this.shouldRetry(err)) { proxyStream.destroy(err); return; diff --git a/src/transaction.ts b/src/transaction.ts index fa1f10814..3f5e115c2 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -287,6 +287,7 @@ export class Snapshot extends EventEmitter { readTimestampProto?: spannerClient.protobuf.ITimestamp; request: (config: {}, callback: Function) => void; requestStream: (config: {}) => Readable; + resultStream?: PartialResultStream; session: Session; queryOptions?: IQueryOptions; resourceHeader_: {[k: string]: string}; @@ -751,6 +752,7 @@ export class Snapshot extends EventEmitter { maxResumeRetries, columnsMetadata, gaxOptions, + span, } ) ?.on('response', response => { @@ -789,6 +791,7 @@ export class Snapshot extends EventEmitter { }); } + this.resultStream = resultStream; return resultStream; }); } @@ -1093,33 +1096,23 @@ export class Snapshot extends EventEmitter { let stats: google.spanner.v1.ResultSetStats; let metadata: google.spanner.v1.ResultSetMetadata; - const traceConfig = { - sql: query, - opts: this._observabilityOptions, - dbName: this._dbName!, - }; - startTrace('Snapshot.run', traceConfig, span => { - return this.runStream(query) - .on('error', (err, rows, stats, metadata) => { - setSpanError(span, err); - span.end(); - callback!(err, rows, stats, metadata); - }) - .on('response', response => { - if (response.metadata) { - metadata = response.metadata; - if (metadata.transaction && !this.id) { - this._update(metadata.transaction); - } + this.runStream(query) + .on('error', (err, rows, stats, metadata) => { + callback!(err, rows, stats, metadata); + }) + .on('response', response => { + if (response.metadata) { + metadata = response.metadata; + if (metadata.transaction && !this.id) { + this._update(metadata.transaction); } - }) - .on('data', row => rows.push(row)) - .on('stats', _stats => (stats = _stats)) - .on('end', () => { - span.end(); - callback!(null, rows, stats, metadata); - }); - }); + } + }) + .on('data', row => rows.push(row)) + .on('stats', _stats => (stats = _stats)) + .on('end', () => { + callback!(null, rows, stats, metadata); + }); } /** @@ -1342,6 +1335,7 @@ export class Snapshot extends EventEmitter { maxResumeRetries, columnsMetadata, gaxOptions, + span, } ) .on('response', response => { @@ -1381,6 +1375,7 @@ export class Snapshot extends EventEmitter { }); } + this.resultStream = resultStream; return resultStream; }); } diff --git a/test/transaction.ts b/test/transaction.ts index 28be1543f..7f91b3733 100644 --- a/test/transaction.ts +++ b/test/transaction.ts @@ -410,7 +410,7 @@ describe('Transaction', () => { assert.strictEqual(reqOpts.jsonOptions, undefined); assert.strictEqual(reqOpts.maxResumeRetries, undefined); - const options = PARTIAL_RESULT_STREAM.lastCall.args[1]; + const {span, ...options} = PARTIAL_RESULT_STREAM.lastCall.args[1]; assert.deepStrictEqual(options, fakeOptions); }); @@ -791,7 +791,7 @@ describe('Transaction', () => { assert.strictEqual(reqOpts.jsonOptions, undefined); assert.strictEqual(reqOpts.maxResumeRetries, undefined); - const options = PARTIAL_RESULT_STREAM.lastCall.args[1]; + const {span, ...options} = PARTIAL_RESULT_STREAM.lastCall.args[1]; assert.deepStrictEqual(options, expectedOptions); });