@@ -106,9 +106,28 @@ class AtomicWaitQueue {
106106 return referenceCount == 1 ;
107107 }
108108
109+ // / This queue is being re-used with new construction arguments.
110+ // / Update it appropriately.
111+ void updateForNewArguments () {
112+ // We intentionally take no arguments so that only calls to
113+ // createQueue with no arguments will succeed at calling this no-op
114+ // implementation. Queue types with construction arguments
115+ // will need to implement this method to take the appropriate
116+ // arguments. Hopefully this discourages people from forgetting
117+ // that queues can be re-used if created in a loop.
118+ }
119+
109120 // / An RAII helper class for signalling that the current thread is a
110121 // / worker thread which has acquired the lock.
111122 // /
123+ // / `AtomicWaitQueue` does not require the global lock to be held
124+ // / while creating or publishing the queue. Clients taking advantage
125+ // / of this should inform the Worker class that a created queue has
126+ // / been published by calling `flagQueueIsPublished`. Clients who
127+ // / wish to publish the queue while holding the global lock, perhaps
128+ // / to get a rule that all stores are done under the lock, may instead
129+ // / use `tryPublishQueue`.
130+ // /
112131 // / The expected use pattern is something like:
113132 // /
114133 // / ```
@@ -117,26 +136,47 @@ class AtomicWaitQueue {
117136 // / while (true) {
118137 // / if (oldStatus.isDone()) return;
119138 // /
120- // / // Try to publish the wait queue. If this succeeds, we've become
121- // / // the worker thread.
122- // / bool published = worker.tryPublishQueue([&] {
123- // / auto newStatus = oldStatus.withLock(worker.createQueue());
124- // / return myAtomic.compare_exchange_weak(oldStatus, newStatus,
125- // / /*success*/ std::memory_order_release,
126- // / /*failure*/ std::memory_order_acquire);
127- // / });
128- // / if (!published) continue;
139+ // / if (oldStatus.hasWaitQueue()) {
140+ // / bool waited = worker.tryReloadAndWait([&] {
141+ // / oldStatus = myAtomic.load(std::memory_order_acquire);
142+ // / return (oldStatus.hasWaitQueue() ? oldStatus.getWaitQueue()
143+ // / : nullptr);
144+ // / });
129145 // /
130- // / // Do the actual work here.
146+ // / // If we waited, `oldStatus` will be out of date; reload it.
147+ // / //
148+ // / // (For the pattern in this example, where the worker thread
149+ // / // always transitions the status to done, this is actually
150+ // / // unnecessary: by virtue of having successfully waited, we've
151+ // / // synchronized with the worker thread and know that the status
152+ // / // is done, so we could just return. But in general, this
153+ // / // reload is necessary.)
154+ // / if (waited)
155+ // / oldStatus = myAtomic.load(std::memory_order_acquire);
131156 // /
132- // / // "Unpublish" the queue from the the atomic.
133- // / while (true) {
134- // / auto newStatus = oldStatus.withDone(true);
135- // / if (myAtomic.compare_exchange_weak(oldStatus, newStatus,
157+ // / // Go back and reconsider the updated status.
158+ // / continue;
159+ // / }
160+ // /
161+ // / // Create a queue and try to publish it. If this succeeds,
162+ // / // we've become the worker thread. We don't have to worry
163+ // / // about the queue leaking if we don't use it; that's managed
164+ // / // by the Worker class.
165+ // / {
166+ // / auto queue = worker.createQueue();
167+ // / auto newStatus = oldStatus.withWaitQueue(queue);
168+ // / if (!myAtomic.compare_exchange_weak(oldStatus, newStatus,
136169 // / /*success*/ std::memory_order_release,
137170 // / /*failure*/ std::memory_order_acquire))
138- // / break;
171+ // / continue;
172+ // / worker.flagQueueIsPublished(queue);
139173 // / }
174+ // /
175+ // / // Do the actual work here.
176+ // /
177+ // / // Report that the work is done and "unpublish" the queue from
178+ // / // the atomic.
179+ // / myAtomic.store(oldStatus.withDone(true), std::memory_order_release);
140180 // / worker.finishAndUnpublishQueue([]{});
141181 // / return;
142182 // / }
@@ -182,6 +222,19 @@ class AtomicWaitQueue {
182222 return Published;
183223 }
184224
225+ // / Given that this thread is not the worker thread and there seems
226+ // / to be a wait queue in place, try to wait on it.
227+ // /
228+ // / Acquire the global lock and call the given function. If it
229+ // / returns a wait queue, wait on that queue and return true;
230+ // / otherwise, return false.
231+ template <class Fn >
232+ bool tryReloadAndWait (Fn &&fn) {
233+ assert (!isWorkerThread ());
234+ typename Impl::Waiter waiter (GlobalLock);
235+ return waiter.tryReloadAndWait (std::forward<Fn>(fn));
236+ }
237+
185238 // / Given that this thread is the worker thread, return the queue
186239 // / that's been created and published for it.
187240 Impl *getPublishedQueue () const {
@@ -195,14 +248,18 @@ class AtomicWaitQueue {
195248 // /
196249 // / The Worker object takes ownership of the queue until it's
197250 // / published, so you can safely call this even if publishing
198- // / might fail. Note that the same queue will be returned on
199- // / successive invocations, so take care if the arguments might
200- // / change during the loop.
251+ // / might fail.
252+ // /
253+ // / Note that the same queue will be returned on successive
254+ // / invocations. Queues that accept arguments for construction
255+ // / should implement `updateForNewArguments`.
201256 template <class ... Args>
202257 Impl *createQueue (Args &&...args) {
203258 assert (!Published);
204259 if (!CurrentQueue)
205260 CurrentQueue = asImpl ().createNewQueue (std::forward<Args>(args)...);
261+ else
262+ CurrentQueue->updateForNewArguments (std::forward<Args>(args)...);
206263 return CurrentQueue;
207264 }
208265
0 commit comments