@@ -1137,8 +1137,8 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
11371137 }
11381138
11391139 // This needs to be a store release so that we also publish the contents of
1140- // the new Job we are adding to the atomic job queue. Pairs with load
1141- // acquire in drainOne.
1140+ // the new Job we are adding to the atomic job queue. Pairs with consume
1141+ // in drainOne.
11421142 if (_status ().compare_exchange_weak (oldState, newState,
11431143 /* success */ std::memory_order_release,
11441144 /* failure */ std::memory_order_relaxed)) {
@@ -1180,13 +1180,13 @@ Job * DefaultActorImpl::drainOne() {
11801180 SWIFT_TASK_DEBUG_LOG (" Draining one job from default actor %p" , this );
11811181
11821182 // Pairs with the store release in DefaultActorImpl::enqueue
1183- auto oldState = _status ().load (std::memory_order_acquire);
1183+ auto oldState = _status ().load (SWIFT_MEMORY_ORDER_CONSUME);
1184+ _swift_tsan_consume (this );
11841185
11851186 auto jobToPreprocessFrom = oldState.getFirstJob ();
11861187 Job *firstJob = preprocessQueue (jobToPreprocessFrom);
11871188 traceJobQueue (this , firstJob);
11881189
1189- _swift_tsan_release (this );
11901190 while (true ) {
11911191 assert (oldState.isAnyRunning ());
11921192
@@ -1200,8 +1200,8 @@ Job * DefaultActorImpl::drainOne() {
12001200 // Dequeue the first job and set up a new head
12011201 newState = newState.withFirstJob (getNextJobInQueue (firstJob));
12021202 if (_status ().compare_exchange_weak (oldState, newState,
1203- /* success */ std::memory_order_release ,
1204- /* failure */ std::memory_order_acquire )) {
1203+ /* success */ std::memory_order_relaxed ,
1204+ /* failure */ std::memory_order_relaxed )) {
12051205 SWIFT_TASK_DEBUG_LOG (" Drained first job %p from actor %p" , firstJob, this );
12061206 traceActorStateTransition (this , oldState, newState);
12071207 concurrency::trace::actor_dequeue (this , firstJob);
@@ -1393,8 +1393,6 @@ retry:;
13931393 SWIFT_TASK_DEBUG_LOG (" Thread attempting to jump onto %p, as drainer = %d" , this , asDrainer);
13941394#endif
13951395
1396- // Note: This doesn't have to be a load acquire because the jobQueue is part
1397- // of the same atomic.
13981396 auto oldState = _status ().load (std::memory_order_relaxed);
13991397 while (true ) {
14001398
@@ -1446,9 +1444,13 @@ retry:;
14461444 assert (!oldState.getFirstJob ());
14471445 }
14481446
1447+ // Taking the drain lock clears the max priority escalated bit because we've
1448+ // already represented the current max priority of the actor on the thread.
14491449 auto newState = oldState.withRunning ();
1450+
1451+ // This needs an acquire since we are taking a lock
14501452 if (_status ().compare_exchange_weak (oldState, newState,
1451- std::memory_order_relaxed ,
1453+ std::memory_order_acquire ,
14521454 std::memory_order_relaxed)) {
14531455 _swift_tsan_acquire (this );
14541456 traceActorStateTransition (this , oldState, newState);
@@ -1527,9 +1529,11 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
15271529 newState = newState.resetPriority ();
15281530 }
15291531
1532+ // This needs to be a release since we are unlocking a lock
15301533 if (_status ().compare_exchange_weak (oldState, newState,
1531- /* success */ std::memory_order_relaxed ,
1534+ /* success */ std::memory_order_release ,
15321535 /* failure */ std::memory_order_relaxed)) {
1536+ _swift_tsan_release (this );
15331537 traceActorStateTransition (this , oldState, newState);
15341538
15351539 if (newState.isScheduled ()) {
0 commit comments