Skip to content

Commit 3d2cf10

Browse files
committed
WIP: many more operators fixed
1 parent 4205bbb commit 3d2cf10

File tree

14 files changed

+100
-116
lines changed

14 files changed

+100
-116
lines changed

packages/rxjs/src/internal/Observable.ts

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,6 @@ export interface SubscriberOverrides<T> {
221221
* will be handled and passed to the destination's `error` method.
222222
*/
223223
complete?: () => void;
224-
/**
225-
* If provided, this function will be called after all teardown has occurred
226-
* for this {@link Subscriber}. This is generally used for cleanup purposes
227-
* during operator development.
228-
*/
229-
finalize?: () => void;
230224
}
231225

232226
/**
@@ -249,8 +243,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
249243
protected readonly _errorOverride: ((err: any) => void) | null = null;
250244
/** @internal */
251245
protected readonly _completeOverride: (() => void) | null = null;
252-
/** @internal */
253-
protected readonly _onFinalize: (() => void) | null = null;
254246

255247
/**
256248
* @deprecated Do not create instances of `Subscriber` directly. Use {@link operate} instead.
@@ -284,7 +276,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
284276
this._nextOverride = overrides?.next ?? null;
285277
this._errorOverride = overrides?.error ?? null;
286278
this._completeOverride = overrides?.complete ?? null;
287-
this._onFinalize = overrides?.finalize ?? null;
288279

289280
// It's important - for performance reasons - that all of this class's
290281
// members are initialized and that they are always initialized in the same
@@ -356,7 +347,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
356347
if (!this.closed) {
357348
this.isStopped = true;
358349
super.unsubscribe();
359-
this._onFinalize?.();
360350
}
361351
}
362352

@@ -433,12 +423,11 @@ function overrideError(this: Subscriber<unknown>, err: any): void {
433423
}
434424

435425
function overrideComplete(this: Subscriber<unknown>): void {
426+
this.unsubscribe();
436427
try {
437428
this._completeOverride!();
438429
} catch (error) {
439430
this.destination.error(error);
440-
} finally {
441-
this.unsubscribe();
442431
}
443432
}
444433

packages/rxjs/src/internal/operators/bufferCount.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
6363
let buffers: T[][] = [];
6464
let count = 0;
6565

66+
destination.add(() => {
67+
// Release memory
68+
buffers = null!;
69+
});
70+
6671
source.subscribe(
6772
operate({
6873
destination,
@@ -108,10 +113,6 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
108113
}
109114
destination.complete();
110115
},
111-
finalize: () => {
112-
// Clean up our memory when we finalize
113-
buffers = null!;
114-
},
115116
})
116117
);
117118
});

packages/rxjs/src/internal/operators/bufferTime.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper
8383
// this is only really used for when *just* the buffer time span is passed.
8484
let restartOnEmit = false;
8585

86+
destination.add(() => {
87+
// Release memory
88+
bufferRecords = null;
89+
});
90+
8691
/**
8792
* Does the work of emitting the buffer from the record, ensuring that the
8893
* record is removed before the emission so reentrant code (from some custom scheduling, perhaps)
@@ -153,8 +158,6 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper
153158
destination.complete();
154159
destination.unsubscribe();
155160
},
156-
// Clean up
157-
finalize: () => (bufferRecords = null),
158161
});
159162

160163
source.subscribe(bufferTimeSubscriber);

packages/rxjs/src/internal/operators/bufferWhen.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { Subscriber} from '../Observable.js';
1+
import type { Subscriber } from '../Observable.js';
22
import { operate, Observable, from } from '../Observable.js';
33
import type { ObservableInput, OperatorFunction } from '../types.js';
44
import { noop } from '../util/noop.js';
@@ -43,14 +43,19 @@ import { noop } from '../util/noop.js';
4343
*/
4444
export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): OperatorFunction<T, T[]> {
4545
return (source) =>
46-
new Observable((subscriber) => {
46+
new Observable((destination) => {
4747
// The buffer we keep and emit.
4848
let buffer: T[] | null = null;
4949
// A reference to the subscriber used to subscribe to
5050
// the closing notifier. We need to hold this so we can
5151
// end the subscription after the first notification.
5252
let closingSubscriber: Subscriber<T> | null = null;
5353

54+
destination.add(() => {
55+
// Release memory.
56+
buffer = closingSubscriber = null!;
57+
});
58+
5459
// Ends the previous closing notifier subscription, so it
5560
// terminates after the first emission, then emits
5661
// the current buffer if there is one, starts a new buffer, and starts a
@@ -62,12 +67,12 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
6267
// emit the buffer if we have one, and start a new buffer.
6368
const b = buffer;
6469
buffer = [];
65-
b && subscriber.next(b);
70+
b && destination.next(b);
6671

6772
// Get a new closing notifier and subscribe to it.
6873
from(closingSelector()).subscribe(
6974
(closingSubscriber = operate({
70-
destination: subscriber,
75+
destination,
7176
next: openBuffer,
7277
complete: noop,
7378
}))
@@ -80,17 +85,15 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
8085
// Subscribe to our source.
8186
source.subscribe(
8287
operate({
83-
destination: subscriber,
88+
destination,
8489
// Add every new value to the current buffer.
8590
next: (value) => buffer?.push(value),
8691
// When we complete, emit the buffer if we have one,
8792
// then complete the result.
8893
complete: () => {
89-
buffer && subscriber.next(buffer);
90-
subscriber.complete();
94+
buffer && destination.next(buffer);
95+
destination.complete();
9196
},
92-
// Release memory on finalization
93-
finalize: () => (buffer = closingSubscriber = null!),
9497
})
9598
);
9699
});

packages/rxjs/src/internal/operators/debounce.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { Subscriber} from '../Observable.js';
1+
import type { Subscriber } from '../Observable.js';
22
import { operate, Observable, from } from '../Observable.js';
33
import type { MonoTypeOperatorFunction, ObservableInput } from '../types.js';
44
import { noop } from '../util/noop.js';
@@ -69,6 +69,11 @@ export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>
6969
// The subscriber/subscription for the current debounce, if there is one.
7070
let durationSubscriber: Subscriber<any> | null = null;
7171

72+
destination.add(() => {
73+
// Clean up
74+
lastValue = durationSubscriber = null;
75+
});
76+
7277
const emit = () => {
7378
// Unsubscribe any current debounce subscription we have,
7479
// we only cared about the first notification from it, and we
@@ -106,10 +111,6 @@ export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>
106111
emit();
107112
destination.complete();
108113
},
109-
finalize: () => {
110-
// Finalization.
111-
lastValue = durationSubscriber = null;
112-
},
113114
})
114115
);
115116
});

packages/rxjs/src/internal/operators/debounceTime.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyn
6666
let lastValue: T;
6767
let activeTask: Subscription | void;
6868

69+
destination.add(() => {
70+
// Clean up
71+
lastValue = activeTask = null!;
72+
});
73+
6974
source.subscribe(
7075
operate({
7176
destination,
@@ -94,10 +99,6 @@ export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyn
9499
}
95100
destination.complete();
96101
},
97-
finalize: () => {
98-
// Finalization.
99-
lastValue = activeTask = null!;
100-
},
101102
})
102103
);
103104
});

packages/rxjs/src/internal/operators/groupBy.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,14 @@ export function groupBy<T, K, R>(
204204
group!.complete();
205205
durationSubscriber?.unsubscribe();
206206
},
207-
finalize: () => groups.delete(key),
207+
error: (err) => {
208+
group!.error(err);
209+
groups.delete(key);
210+
},
211+
complete: () => {
212+
group!.complete();
213+
groups.delete(key);
214+
},
208215
});
209216

210217
// Start our duration notifier.
@@ -222,11 +229,6 @@ export function groupBy<T, K, R>(
222229
error: handleError,
223230
// Source completes.
224231
complete: () => notify((consumer) => consumer.complete()),
225-
// Free up memory.
226-
// When the source subscription is _finally_ torn down, release the subjects and keys
227-
// in our groups Map, they may be quite large and we don't want to keep them around if we
228-
// don't have to.
229-
finalize: () => groups.clear(),
230232
});
231233

232234
// Subscribe to the source

packages/rxjs/src/internal/operators/mergeInternals.ts

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,6 @@ export function mergeInternals<T, R>(
5757
// against our concurrency limit later.
5858
active++;
5959

60-
// A flag used to show that the inner observable completed.
61-
// This is checked during finalization to see if we should
62-
// move to the next item in the buffer, if there is on.
63-
let innerComplete = false;
64-
6560
// Start our inner subscription.
6661
from(project(value, index++)).subscribe(
6762
operate({
@@ -81,36 +76,26 @@ export function mergeInternals<T, R>(
8176
}
8277
},
8378
complete: () => {
84-
// Flag that we have completed, so we know to check the buffer
85-
// during finalization.
86-
innerComplete = true;
87-
},
88-
finalize: () => {
89-
// During finalization, if the inner completed (it wasn't errored or
90-
// cancelled), then we want to try the next item in the buffer if
91-
// there is one.
92-
if (innerComplete) {
93-
// We have to wrap this in a try/catch because it happens during
94-
// finalization, possibly asynchronously, and we want to pass
95-
// any errors that happen (like in a projection function) to
96-
// the outer Subscriber.
97-
try {
98-
// INNER SOURCE COMPLETE
99-
// Decrement the active count to ensure that the next time
100-
// we try to call `doInnerSub`, the number is accurate.
101-
active--;
102-
// If we have more values in the buffer, try to process those
103-
// Note that this call will increment `active` ahead of the
104-
// next conditional, if there were any more inner subscriptions
105-
// to start.
106-
while (buffer.length && active < concurrent) {
107-
doInnerSub(buffer.shift()!);
108-
}
109-
// Check to see if we can complete, and complete if so.
110-
checkComplete();
111-
} catch (err) {
112-
destination.error(err);
79+
// We have to wrap this in a try/catch because it happens during
80+
// finalization, possibly asynchronously, and we want to pass
81+
// any errors that happen (like in a projection function) to
82+
// the outer Subscriber.
83+
try {
84+
// INNER SOURCE COMPLETE
85+
// Decrement the active count to ensure that the next time
86+
// we try to call `doInnerSub`, the number is accurate.
87+
active--;
88+
// If we have more values in the buffer, try to process those
89+
// Note that this call will increment `active` ahead of the
90+
// next conditional, if there were any more inner subscriptions
91+
// to start.
92+
while (buffer.length && active < concurrent) {
93+
doInnerSub(buffer.shift()!);
11394
}
95+
// Check to see if we can complete, and complete if so.
96+
checkComplete();
97+
} catch (err) {
98+
destination.error(err);
11499
}
115100
},
116101
})

packages/rxjs/src/internal/operators/takeLast.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ export function takeLast<T>(count: number): MonoTypeOperatorFunction<T> {
5050
let ring = new Array<T>(count);
5151
// This counter is how we track where we are at in the ring buffer.
5252
let counter = 0;
53+
54+
destination.add(() => {
55+
// During finalization release the values in our buffer.
56+
ring = null!;
57+
});
58+
5359
source.subscribe(
5460
operate({
5561
destination,
@@ -73,10 +79,6 @@ export function takeLast<T>(count: number): MonoTypeOperatorFunction<T> {
7379
// All done. This will also trigger clean up.
7480
destination.complete();
7581
},
76-
finalize: () => {
77-
// During finalization release the values in our buffer.
78-
ring = null!;
79-
},
8082
})
8183
);
8284
});

packages/rxjs/src/internal/operators/tap.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,18 @@ export function tap<T>(observerOrNext?: Partial<TapObserver<T>> | ((value: T) =>
162162
new Observable((destination) => {
163163
tapObserver.subscribe?.();
164164
let isUnsub = true;
165+
166+
destination.add(() => {
167+
if (isUnsub) {
168+
tapObserver.unsubscribe?.();
169+
}
170+
tapObserver.finalize?.();
171+
});
172+
165173
source.subscribe(
166174
operate({
167175
destination,
168-
next: (value) => {
176+
next: (value: T) => {
169177
tapObserver.next?.(value);
170178
destination.next(value);
171179
},
@@ -179,12 +187,6 @@ export function tap<T>(observerOrNext?: Partial<TapObserver<T>> | ((value: T) =>
179187
tapObserver.complete?.();
180188
destination.complete();
181189
},
182-
finalize: () => {
183-
if (isUnsub) {
184-
tapObserver.unsubscribe?.();
185-
}
186-
tapObserver.finalize?.();
187-
},
188190
})
189191
);
190192
})

0 commit comments

Comments
 (0)