Skip to content

Commit 738804b

Browse files
authored
Merge branch 'googleapis:main' into main
2 parents 95aa578 + 30151ca commit 738804b

File tree

7 files changed

+67
-4
lines changed

7 files changed

+67
-4
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@
44

55
[1]: https://www.npmjs.com/package/nodejs-spanner?activeTab=versions
66

7+
## [7.9.1](https://github.com/googleapis/nodejs-spanner/compare/v7.9.0...v7.9.1) (2024-06-26)
8+
9+
10+
### Bug Fixes
11+
12+
* Retry with timeout ([#2071](https://github.com/googleapis/nodejs-spanner/issues/2071)) ([a943257](https://github.com/googleapis/nodejs-spanner/commit/a943257a0402b26fd80196057a9724fd28fc5c1b))
13+
714
## [7.9.0](https://github.com/googleapis/nodejs-spanner/compare/v7.8.0...v7.9.0) (2024-06-21)
815

916

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@google-cloud/spanner",
33
"description": "Cloud Spanner Client Library for Node.js",
4-
"version": "7.9.0",
4+
"version": "7.9.1",
55
"license": "Apache-2.0",
66
"author": "Google Inc.",
77
"engines": {

samples/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"dependencies": {
1818
"@google-cloud/kms": "^4.0.0",
1919
"@google-cloud/precise-date": "^4.0.0",
20-
"@google-cloud/spanner": "^7.9.0",
20+
"@google-cloud/spanner": "^7.9.1",
2121
"yargs": "^17.0.0",
2222
"protobufjs": "^7.0.0"
2323
},

src/partial-result-stream.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import mergeStream = require('merge-stream');
2222
import {common as p} from 'protobufjs';
2323
import {Readable, Transform} from 'stream';
2424
import * as streamEvents from 'stream-events';
25-
import {grpc} from 'google-gax';
26-
import {isRetryableInternalError} from './transaction-runner';
25+
import {grpc, CallOptions} from 'google-gax';
26+
import {DeadlineError, isRetryableInternalError} from './transaction-runner';
2727

2828
import {codec, JSONOptions, Json, Field, Value} from './codec';
2929
import {google} from '../protos/protos';
@@ -96,6 +96,7 @@ export interface RowOptions {
9696
* };
9797
*/
9898
columnsMetadata?: object;
99+
gaxOptions?: CallOptions;
99100
}
100101

101102
/**
@@ -491,6 +492,8 @@ export function partialResultStream(
491492
const maxQueued = 10;
492493
let lastResumeToken: ResumeToken;
493494
let lastRequestStream: Readable;
495+
const startTime = Date.now();
496+
const timeout = options?.gaxOptions?.timeout ?? Infinity;
494497

495498
// mergeStream allows multiple streams to be connected into one. This is good;
496499
// if we need to retry a request and pipe more data to the user's stream.
@@ -541,6 +544,17 @@ export function partialResultStream(
541544
};
542545

543546
const retry = (err: grpc.ServiceError): void => {
547+
const elapsed = Date.now() - startTime;
548+
if (elapsed >= timeout) {
549+
// The timeout has reached so this will flush any rows the
550+
// checkpoint stream has queued. After that, we will destroy the
551+
// user's stream with the Deadline exceeded error.
552+
setImmediate(() =>
553+
batchAndSplitOnTokenStream.destroy(new DeadlineError(err))
554+
);
555+
return;
556+
}
557+
544558
if (
545559
!(
546560
err.code &&

src/transaction.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,7 @@ export class Snapshot extends EventEmitter {
704704
jsonOptions,
705705
maxResumeRetries,
706706
columnsMetadata,
707+
gaxOptions,
707708
})
708709
?.on('response', response => {
709710
if (response.metadata && response.metadata!.transaction && !this.id) {
@@ -1210,6 +1211,7 @@ export class Snapshot extends EventEmitter {
12101211
jsonOptions,
12111212
maxResumeRetries,
12121213
columnsMetadata,
1214+
gaxOptions,
12131215
})
12141216
.on('response', response => {
12151217
if (response.metadata && response.metadata!.transaction && !this.id) {

test/partial-result-stream.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,38 @@ describe('PartialResultStream', () => {
311311
);
312312
});
313313

314+
it('should get Deadline exceeded error if timeout has reached', done => {
315+
const fakeCheckpointStream = through.obj();
316+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
317+
(fakeCheckpointStream as any).reset = () => {};
318+
319+
sandbox.stub(checkpointStream, 'obj').returns(fakeCheckpointStream);
320+
321+
const firstFakeRequestStream = through.obj();
322+
323+
const requestFnStub = sandbox.stub();
324+
325+
requestFnStub.onCall(0).callsFake(() => {
326+
setTimeout(() => {
327+
// This causes a new request stream to be created.
328+
firstFakeRequestStream.emit('error', {
329+
code: grpc.status.UNAVAILABLE,
330+
message: 'Error.',
331+
} as grpc.ServiceError);
332+
}, 50);
333+
334+
return firstFakeRequestStream;
335+
});
336+
337+
partialResultStream(requestFnStub, {gaxOptions: {timeout: 0}})
338+
.on('data', row => {})
339+
.on('error', err => {
340+
assert.strictEqual(err.code, grpc.status.DEADLINE_EXCEEDED);
341+
assert.strictEqual(requestFnStub.callCount, 1);
342+
done();
343+
});
344+
});
345+
314346
it('should resume if there was a retryable error', done => {
315347
// This test will emit four rows total:
316348
// - Two rows

test/transaction.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,11 +391,15 @@ describe('Transaction', () => {
391391
});
392392

393393
it('should pass along row options', () => {
394+
const gaxOptions = {
395+
timeout: 60,
396+
};
394397
const fakeOptions = {
395398
json: true,
396399
jsonOptions: {a: 'b'},
397400
maxResumeRetries: 10,
398401
columnsMetadata: {column1: {test: 'ss'}, column2: Function},
402+
gaxOptions: gaxOptions,
399403
};
400404

401405
snapshot.createReadStream(TABLE, fakeOptions);
@@ -766,11 +770,15 @@ describe('Transaction', () => {
766770
});
767771

768772
it('should pass along row options', () => {
773+
const gaxOptions = {
774+
timeout: 60,
775+
};
769776
const expectedOptions = {
770777
json: true,
771778
jsonOptions: {a: 'b'},
772779
maxResumeRetries: 10,
773780
columnsMetadata: {column1: {test: 'ss'}, column2: Function},
781+
gaxOptions: gaxOptions,
774782
};
775783

776784
const fakeQuery = Object.assign({}, QUERY, expectedOptions);

0 commit comments

Comments
 (0)