@@ -21,7 +21,6 @@ import ResultSummary from './result-summary'
2121import Record from './record'
2222import { Query } from './types'
2323import { observer , util , connectionHolder } from './internal'
24- import { CompletedObserver , FailedObserver } from './internal/observers'
2524
2625const { EMPTY_CONNECTION_HOLDER } = connectionHolder
2726
@@ -130,12 +129,14 @@ class Result implements Promise<QueryResult> {
130129 */
131130 keys ( ) : Promise < string [ ] > {
132131 return new Promise ( ( resolve , reject ) => {
133- this . _streamObserverPromise . then ( observer =>
134- observer . subscribe ( {
135- onKeys : keys => resolve ( keys ) ,
136- onError : err => reject ( err )
137- } )
138- )
132+ this . _streamObserverPromise
133+ . then ( observer =>
134+ observer . subscribe ( {
135+ onKeys : keys => resolve ( keys ) ,
136+ onError : err => reject ( err )
137+ } )
138+ )
139+ . catch ( reject )
139140 } )
140141 }
141142
@@ -150,13 +151,16 @@ class Result implements Promise<QueryResult> {
150151 */
151152 summary ( ) : Promise < ResultSummary > {
152153 return new Promise ( ( resolve , reject ) => {
153- this . _streamObserverPromise . then ( o => {
154- o . cancel ( )
155- o . subscribe ( {
156- onCompleted : metadata => resolve ( metadata ) ,
157- onError : err => reject ( err )
154+ this . _streamObserverPromise
155+ . then ( o => {
156+ o . cancel ( )
157+ o . subscribe ( {
158+ onCompleted : metadata =>
159+ this . _createSummary ( metadata ) . then ( resolve , reject ) ,
160+ onError : err => reject ( err )
161+ } )
158162 } )
159- } )
163+ . catch ( reject )
160164 } )
161165 }
162166
@@ -247,43 +251,8 @@ class Result implements Promise<QueryResult> {
247251 subscribe ( observer : ResultObserver ) : void {
248252 const onCompletedOriginal = observer . onCompleted || DEFAULT_ON_COMPLETED
249253 const onCompletedWrapper = ( metadata : any ) => {
250- const connectionHolder = this . _connectionHolder
251- const {
252- validatedQuery : query ,
253- params : parameters
254- } = util . validateQueryAndParameters ( this . _query , this . _parameters , {
255- skipAsserts : true
256- } )
257-
258- function complete ( protocolVersion ?: number ) {
259- onCompletedOriginal . call (
260- observer ,
261- new ResultSummary ( query , parameters , metadata , protocolVersion )
262- )
263- }
264-
265- function release ( ) {
266- // notify connection holder that the used connection is not needed any more because result has
267- // been fully consumed; call the original onCompleted callback after that
268- return connectionHolder . releaseConnection ( )
269- }
270-
271- connectionHolder . getConnection ( ) . then (
272- // onFulfilled:
273- connection => {
274- release ( ) . then ( ( ) =>
275- complete (
276- connection !== undefined
277- ? connection . protocol ( ) . version
278- : undefined
279- )
280- )
281- } ,
282-
283- // onRejected:
284- _ => {
285- complete ( )
286- }
254+ this . _createSummary ( metadata ) . then ( summary =>
255+ onCompletedOriginal . call ( observer , summary )
287256 )
288257 }
289258
@@ -300,9 +269,11 @@ class Result implements Promise<QueryResult> {
300269 }
301270 observer . onError = onErrorWrapper
302271
303- this . _streamObserverPromise . then ( o => {
304- return o . subscribe ( observer )
305- } )
272+ this . _streamObserverPromise
273+ . then ( o => {
274+ return o . subscribe ( observer )
275+ } )
276+ . catch ( error => observer . onError ! ( error ) )
306277 }
307278
308279 /**
@@ -315,6 +286,34 @@ class Result implements Promise<QueryResult> {
315286 _cancel ( ) : void {
316287 this . _streamObserverPromise . then ( o => o . cancel ( ) )
317288 }
289+
290+ private _createSummary ( metadata : any ) : Promise < ResultSummary > {
291+ const {
292+ validatedQuery : query ,
293+ params : parameters
294+ } = util . validateQueryAndParameters ( this . _query , this . _parameters , {
295+ skipAsserts : true
296+ } )
297+ const connectionHolder = this . _connectionHolder
298+
299+ return connectionHolder
300+ . getConnection ( )
301+ . then (
302+ // onFulfilled:
303+ connection =>
304+ connectionHolder
305+ . releaseConnection ( )
306+ . then ( ( ) =>
307+ connection ? connection . protocol ( ) . version : undefined
308+ ) ,
309+ // onRejected:
310+ _ => undefined
311+ )
312+ . then (
313+ protocolVersion =>
314+ new ResultSummary ( query , parameters , metadata , protocolVersion )
315+ )
316+ }
318317}
319318
320319function captureStacktrace ( ) : string | null {
0 commit comments