Skip to content

Commit ed8cb34

Browse files
committed
feat: (observability): trace Database.runTransactionAsync
Extracted out of PR googleapis#2158, this change traces Database.runTransactionAsync. However, testing isn't effective because of bugs such as googleapis#2166.
1 parent 2a19ef1 commit ed8cb34

File tree

4 files changed

+301
-38
lines changed

4 files changed

+301
-38
lines changed

observability-test/benchmark.ts

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*!
2+
* Copyright 2024 Google LLC. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
const lessComparator = (a, b) => {
18+
if (a < b) return -1;
19+
if (a > b) return 1;
20+
return 0;
21+
};
22+
23+
/*
24+
* runBenchmarks runs each of the functions in runners ${nRuns} times
25+
* each time collecting RAM usage and time spent and then produces
26+
* a map of functionNames to the percentiles of RAM usage and time spent.
27+
*/
28+
export async function runBenchmarks(runners: Function[], done: Function) {
29+
const nRuns = 1000;
30+
const benchmarkValues = {};
31+
32+
let k = 0;
33+
for (k = 0; k < runners.length; k++) {
34+
const fn = runners[k];
35+
const functionName = fn.name;
36+
const timeSpentL: bigint[] = [];
37+
const ramL: number[] = [];
38+
let i = 0;
39+
for (i = 0; i < nRuns; i++) {
40+
const startTime: bigint = process.hrtime.bigint();
41+
const startHeapUsedBytes: number = process.memoryUsage().heapUsed;
42+
const _ = await fn();
43+
timeSpentL.push(process.hrtime.bigint() - startTime);
44+
ramL.push(process.memoryUsage().heapUsed - startHeapUsedBytes);
45+
}
46+
47+
timeSpentL.sort(lessComparator);
48+
ramL.sort(lessComparator);
49+
50+
benchmarkValues[functionName] = {
51+
ram: percentiles(functionName, ramL, 'bytes'),
52+
timeSpent: percentiles(functionName, timeSpentL, 'time'),
53+
};
54+
}
55+
56+
done(benchmarkValues);
57+
}
58+
59+
function percentiles(method, sortedValues, kind) {
60+
const n = sortedValues.length;
61+
const p50 = sortedValues[Math.floor(n * 0.5)];
62+
const p75 = sortedValues[Math.floor(n * 0.75)];
63+
const p90 = sortedValues[Math.floor(n * 0.9)];
64+
const p95 = sortedValues[Math.floor(n * 0.95)];
65+
const p99 = sortedValues[Math.floor(n * 0.99)];
66+
67+
return {
68+
p50: p50,
69+
p75: p75,
70+
p90: p90,
71+
p95: p95,
72+
p99: p99,
73+
p50_s: humanize(p50, kind),
74+
p75_s: humanize(p75, kind),
75+
p90_s: humanize(p90, kind),
76+
p95_s: humanize(p95, kind),
77+
p99_s: humanize(p99, kind),
78+
};
79+
}
80+
81+
function humanize(values, kind) {
82+
let converterFn = humanizeTime;
83+
if (kind === 'bytes') {
84+
converterFn = humanizeBytes;
85+
}
86+
return converterFn(values);
87+
}
88+
89+
const secondUnits = ['ns', 'us', 'ms', 's'];
90+
interface unitDivisor {
91+
unit: string;
92+
divisor: number;
93+
}
94+
const pastSecondUnits: unitDivisor[] = [
95+
{unit: 'min', divisor: 60},
96+
{unit: 'hr', divisor: 60},
97+
{unit: 'day', divisor: 24},
98+
{unit: 'week', divisor: 7},
99+
{unit: 'month', divisor: 30},
100+
];
101+
function humanizeTime(ns) {
102+
let value = Number(ns);
103+
for (const unit of secondUnits) {
104+
if (value < 1000) {
105+
return `${value} ${unit}`;
106+
}
107+
value /= 1000;
108+
}
109+
110+
let i = 0;
111+
for (i = 0; i < pastSecondUnits.length; i++) {
112+
const unitPlusValue = pastSecondUnits[i];
113+
const unitName = unitPlusValue.unit;
114+
const divisor = unitPlusValue.divisor;
115+
if (value < divisor) {
116+
return `${value} ${unitName}`;
117+
}
118+
value = value / divisor;
119+
}
120+
return `${value} ${pastSecondUnits[pastSecondUnits.length - 1][0]}`;
121+
}
122+
123+
const bytesUnits = ['B', 'kB', 'MB', 'GB', 'TB', 'PB', 'ExB'];
124+
function humanizeBytes(b) {
125+
let value = b;
126+
for (const unit of bytesUnits) {
127+
if (value < 1024) {
128+
return `${value.toFixed(3)} ${unit}`;
129+
}
130+
value = value / 1024;
131+
}
132+
133+
return `${value.toFixed(3)} ${bytesUnits[bytesUnits.length - 1]}`;
134+
}

observability-test/database.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1565,6 +1565,81 @@ describe('Database', () => {
15651565
});
15661566
});
15671567

1568+
describe('runTransactionAsync', () => {
1569+
const SESSION = new FakeSession();
1570+
const TRANSACTION = new FakeTransaction(
1571+
{} as google.spanner.v1.TransactionOptions.ReadWrite
1572+
);
1573+
1574+
let pool: FakeSessionPool;
1575+
1576+
beforeEach(() => {
1577+
pool = database.pool_;
1578+
1579+
(sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake(
1580+
callback => {
1581+
callback(null, SESSION, TRANSACTION);
1582+
}
1583+
);
1584+
});
1585+
1586+
it('with error getting session', async () => {
1587+
const fakeErr = new Error('getting a session');
1588+
1589+
(pool.getSession as sinon.SinonStub).callsFake(callback =>
1590+
callback(fakeErr)
1591+
);
1592+
1593+
await database.runTransactionAsync(async transaction => {
1594+
await transaction.run('SELECT 1');
1595+
});
1596+
1597+
traceExporter.forceFlush();
1598+
const spans = traceExporter.getFinishedSpans();
1599+
withAllSpansHaveDBName(spans);
1600+
1601+
const actualSpanNames: string[] = [];
1602+
const actualEventNames: string[] = [];
1603+
spans.forEach(span => {
1604+
actualSpanNames.push(span.name);
1605+
span.events.forEach(event => {
1606+
actualEventNames.push(event.name);
1607+
});
1608+
});
1609+
1610+
const expectedSpanNames = [
1611+
'CloudSpanner.Database.runTransactionAsync',
1612+
'CloudSpanner.Transaction.run',
1613+
];
1614+
assert.deepStrictEqual(
1615+
actualSpanNames,
1616+
expectedSpanNames,
1617+
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
1618+
);
1619+
1620+
// Ensure that the span actually produced an error that was recorded.
1621+
const firstSpan = spans[0];
1622+
assert.strictEqual(
1623+
SpanStatusCode.ERROR,
1624+
firstSpan.status.code,
1625+
'Expected an ERROR span status'
1626+
);
1627+
assert.strictEqual(
1628+
'getting a session',
1629+
firstSpan.status.message,
1630+
'Mismatched span status message'
1631+
);
1632+
1633+
// We don't expect events.
1634+
const expectedEventNames = [];
1635+
assert.deepStrictEqual(
1636+
actualEventNames,
1637+
expectedEventNames,
1638+
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
1639+
);
1640+
});
1641+
});
1642+
15681643
describe('runStream', () => {
15691644
const QUERY = {
15701645
sql: 'SELECT * FROM table',

observability-test/spanner.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,52 @@ describe('EndToEnd', () => {
468468
});
469469
});
470470

471+
it('runTransactionAsync', async () => {
472+
const withAllSpansHaveDBName = generateWithAllSpansHaveDBName(
473+
database.formattedName_
474+
);
475+
await database.runTransactionAsync(async transaction => {
476+
const [rows] = await transaction!.run('SELECT 1');
477+
});
478+
479+
traceExporter.forceFlush();
480+
const spans = traceExporter.getFinishedSpans();
481+
withAllSpansHaveDBName(spans);
482+
483+
const actualEventNames: string[] = [];
484+
const actualSpanNames: string[] = [];
485+
spans.forEach(span => {
486+
actualSpanNames.push(span.name);
487+
span.events.forEach(event => {
488+
actualEventNames.push(event.name);
489+
});
490+
});
491+
492+
const expectedSpanNames = [
493+
'CloudSpanner.Snapshot.runStream',
494+
'CloudSpanner.Snapshot.run',
495+
'CloudSpanner.Database.runTransactionAsync',
496+
];
497+
assert.deepStrictEqual(
498+
actualSpanNames,
499+
expectedSpanNames,
500+
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
501+
);
502+
503+
const expectedEventNames = [
504+
'Transaction Creation Done',
505+
'Acquiring session',
506+
'Cache hit: has usable session',
507+
'Acquired session',
508+
'Using Session',
509+
];
510+
assert.deepStrictEqual(
511+
actualEventNames,
512+
expectedEventNames,
513+
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
514+
);
515+
});
516+
471517
it('writeAtLeastOnce', done => {
472518
const withAllSpansHaveDBName = generateWithAllSpansHaveDBName(
473519
database.formattedName_

src/database.ts

Lines changed: 46 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3363,46 +3363,54 @@ class Database extends common.GrpcServiceObject {
33633363

33643364
let sessionId = '';
33653365
const getSession = this.pool_.getSession.bind(this.pool_);
3366-
const span = getActiveOrNoopSpan();
3367-
// Loop to retry 'Session not found' errors.
3368-
// (and yes, we like while (true) more than for (;;) here)
3369-
// eslint-disable-next-line no-constant-condition
3370-
while (true) {
3371-
try {
3372-
const [session, transaction] = await promisify(getSession)();
3373-
transaction.requestOptions = Object.assign(
3374-
transaction.requestOptions || {},
3375-
options.requestOptions
3376-
);
3377-
if (options.optimisticLock) {
3378-
transaction.useOptimisticLock();
3379-
}
3380-
if (options.excludeTxnFromChangeStreams) {
3381-
transaction.excludeTxnFromChangeStreams();
3382-
}
3383-
sessionId = session?.id;
3384-
span.addEvent('Using Session', {'session.id': sessionId});
3385-
const runner = new AsyncTransactionRunner<T>(
3386-
session,
3387-
transaction,
3388-
runFn,
3389-
options
3390-
);
3391-
3392-
try {
3393-
return await runner.run();
3394-
} finally {
3395-
this.pool_.release(session);
3396-
}
3397-
} catch (e) {
3398-
if (!isSessionNotFoundError(e as ServiceError)) {
3399-
span.addEvent('No session available', {
3400-
'session.id': sessionId,
3401-
});
3402-
throw e;
3366+
return startTrace(
3367+
'Database.runTransactionAsync',
3368+
this._traceConfig,
3369+
async span => {
3370+
// Loop to retry 'Session not found' errors.
3371+
// (and yes, we like while (true) more than for (;;) here)
3372+
// eslint-disable-next-line no-constant-condition
3373+
while (true) {
3374+
try {
3375+
const [session, transaction] = await promisify(getSession)();
3376+
transaction.requestOptions = Object.assign(
3377+
transaction.requestOptions || {},
3378+
options.requestOptions
3379+
);
3380+
if (options.optimisticLock) {
3381+
transaction.useOptimisticLock();
3382+
}
3383+
if (options.excludeTxnFromChangeStreams) {
3384+
transaction.excludeTxnFromChangeStreams();
3385+
}
3386+
sessionId = session?.id;
3387+
span.addEvent('Using Session', {'session.id': sessionId});
3388+
const runner = new AsyncTransactionRunner<T>(
3389+
session,
3390+
transaction,
3391+
runFn,
3392+
options
3393+
);
3394+
3395+
try {
3396+
const result = await runner.run();
3397+
span.end();
3398+
return result;
3399+
} finally {
3400+
this.pool_.release(session);
3401+
}
3402+
} catch (e) {
3403+
if (!isSessionNotFoundError(e as ServiceError)) {
3404+
span.addEvent('No session available', {
3405+
'session.id': sessionId,
3406+
});
3407+
span.end();
3408+
throw e;
3409+
}
3410+
}
34033411
}
34043412
}
3405-
}
3413+
);
34063414
}
34073415

34083416
/**

0 commit comments

Comments
 (0)