@@ -67,6 +67,17 @@ extension Subscriber {
6767public struct Subscription < StateType: State > {
6868 private( set) weak var subscriber : AnySubscriber ? = nil
6969 let selector : ( ( StateType ) -> Any ) ?
70+ let notifyQueue : DispatchQueue
71+
72+ fileprivate func notify( with state: StateType ) {
73+ notifyQueue. async {
74+ if let selector = self . selector {
75+ self . subscriber? . _update ( with: selector ( state) )
76+ } else {
77+ self . subscriber? . _update ( with: state)
78+ }
79+ }
80+ }
7081}
7182
7283
@@ -75,15 +86,29 @@ public struct Subscription<StateType: State> {
7586
7687public class Core < StateType: State > {
7788
78- private var subscriptions = [ Subscription < StateType > ] ( )
79- private var middlewares = [ Middlewares < StateType > ] ( )
89+ private let jobQueue : DispatchQueue = DispatchQueue ( label: " reactor.core.queue " , qos: . userInitiated, attributes: [ ] )
90+
91+ private let subscriptionsSyncQueue = DispatchQueue ( label: " reactor.core.subscription.sync " )
92+ private var _subscriptions = [ Subscription < StateType > ] ( )
93+ private var subscriptions : [ Subscription < StateType > ] {
94+ get {
95+ return subscriptionsSyncQueue. sync {
96+ return self . _subscriptions
97+ }
98+ }
99+ set {
100+ subscriptionsSyncQueue. sync {
101+ self . _subscriptions = newValue
102+ }
103+ }
104+ }
105+
106+ private let middlewares : [ Middlewares < StateType > ]
80107 public private ( set) var state : StateType {
81108 didSet {
82109 subscriptions = subscriptions. filter { $0. subscriber != nil }
83- DispatchQueue . main. async {
84- for subscription in self . subscriptions {
85- self . publishStateChange ( subscriber: subscription. subscriber, selector: subscription. selector)
86- }
110+ for subscription in subscriptions {
111+ subscription. notify ( with: state)
87112 }
88113 }
89114 }
@@ -96,33 +121,33 @@ public class Core<StateType: State> {
96121
97122 // MARK: - Subscriptions
98123
99- public func add( subscriber: AnySubscriber , selector: ( ( StateType ) -> Any ) ? = nil ) {
100- guard !subscriptions. contains ( where: { $0. subscriber === subscriber} ) else { return }
101- subscriptions. append ( Subscription ( subscriber: subscriber, selector: selector) )
102- publishStateChange ( subscriber: subscriber, selector: selector)
124+ public func add( subscriber: AnySubscriber , notifyOnQueue queue: DispatchQueue ? = DispatchQueue . main, selector: ( ( StateType ) -> Any ) ? = nil ) {
125+ jobQueue. async {
126+ guard !self . subscriptions. contains ( where: { $0. subscriber === subscriber} ) else { return }
127+ let subscription = Subscription ( subscriber: subscriber, selector: selector, notifyQueue: queue ?? self . jobQueue)
128+ self . subscriptions. append ( subscription)
129+ subscription. notify ( with: self . state)
130+ }
103131 }
104132
105133 public func remove( subscriber: AnySubscriber ) {
106134 subscriptions = subscriptions. filter { $0. subscriber !== subscriber }
107135 }
108136
109- private func publishStateChange( subscriber: AnySubscriber ? , selector: ( ( StateType ) -> Any ) ? ) {
110- if let selector = selector {
111- subscriber? . _update ( with: selector ( self . state) )
112- } else {
113- subscriber? . _update ( with: self . state)
114- }
115- }
116-
117137 // MARK: - Events
118138
119139 public func fire( event: Event ) {
120- state. react ( to: event)
121- middlewares. forEach { $0. middleware. _process ( event: event, state: state) }
140+ jobQueue. async {
141+ self . state. react ( to: event)
142+ let state = self . state
143+ self . middlewares. forEach { $0. middleware. _process ( event: event, state: state) }
144+ }
122145 }
123146
124147 public func fire< C: Command > ( command: C ) where C. StateType == StateType {
125- command. execute ( state: state, core: self )
148+ jobQueue. async {
149+ command. execute ( state: self . state, core: self )
150+ }
126151 }
127152
128153}
0 commit comments