Skip to content

Commit 4e63a6e

Browse files
Fix ReplaySubject accessing its state under lock
1 parent 41856c6 commit 4e63a6e

File tree

1 file changed

+14
-9
lines changed

1 file changed

+14
-9
lines changed

RxSwift/Subjects/ReplaySubject.swift

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private class ReplayBufferBase<Element>
103103
rxAbstractMethod()
104104
}
105105

106-
func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
106+
func getEventsToReplay() -> [Event<Element>] {
107107
rxAbstractMethod()
108108
}
109109

@@ -149,15 +149,21 @@ private class ReplayBufferBase<Element>
149149
let anyObserver = observer.asObserver()
150150

151151
if let stoppedEvent = self.stoppedEvent {
152+
let eventsToReplay = self.getEventsToReplay()
152153
lock.unlock()
153-
self.replayBuffer(anyObserver)
154+
for event in eventsToReplay {
155+
observer.on(event)
156+
}
154157
observer.on(stoppedEvent)
155158
return Disposables.create()
156159
}
157160
else {
158161
let key = self.observers.insert(observer.on)
162+
let eventsToReplay = self.getEventsToReplay()
159163
lock.unlock()
160-
self.replayBuffer(anyObserver)
164+
for event in eventsToReplay {
165+
observer.on(event)
166+
}
161167
return SubscriptionDisposable(owner: self, key: key)
162168
}
163169
}
@@ -205,10 +211,11 @@ private final class ReplayOne<Element> : ReplayBufferBase<Element> {
205211
self.value = value
206212
}
207213

208-
override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
214+
override func getEventsToReplay() -> [Event<Element>] {
209215
if let value = self.value {
210-
observer.on(.next(value))
216+
return [.next(value)]
211217
}
218+
return []
212219
}
213220

214221
override func synchronized_dispose() {
@@ -228,10 +235,8 @@ private class ReplayManyBase<Element>: ReplayBufferBase<Element> {
228235
self.queue.enqueue(value)
229236
}
230237

231-
override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
232-
for item in self.queue {
233-
observer.on(.next(item))
234-
}
238+
override func getEventsToReplay() -> [Event<Element>] {
239+
return queue.map(Event.next)
235240
}
236241

237242
override func synchronized_dispose() {

0 commit comments

Comments
 (0)