22//
33// This source file is part of the Swift.org open source project
44//
5- // Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
5+ // Copyright (c) 2014 - 2021 Apple Inc. and the Swift project authors
66// Licensed under Apache License v2.0 with Runtime Library Exception
77//
88// See https://swift.org/LICENSE.txt for license information
@@ -279,7 +279,7 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
279279
280280private:
281281
282- // // TODO: move to lockless via the status atomic
282+ // TODO: move to lockless via the status atomic (make readyQueue an mpsc_queue_t<ReadyQueueItem>)
283283 mutable std::mutex mutex;
284284
285285 // / Used for queue management, counting number of waiting and ready tasks
@@ -290,7 +290,6 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
290290 // / The low bits contain the status, the rest of the pointer is the
291291 // / AsyncTask.
292292 NaiveQueue<ReadyQueueItem> readyQueue;
293- // mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?
294293
295294 // / Single waiting `AsyncTask` currently waiting on `group.next()`,
296295 // / or `nullptr` if no task is currently waiting.
@@ -305,10 +304,8 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
305304 : TaskGroupTaskStatusRecord(),
306305 status(GroupStatus::initial().status),
307306 readyQueue(),
308- // readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
309307 waitQueue(nullptr ), successType(T) {}
310308
311-
312309 TaskGroupTaskStatusRecord *getTaskRecord () {
313310 return reinterpret_cast <TaskGroupTaskStatusRecord *>(this );
314311 }
@@ -472,11 +469,18 @@ static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T)
472469
473470// =============================================================================
474471// ==== add / attachChild ------------------------------------------------------
472+
475473SWIFT_CC (swift)
476474static void swift_taskGroup_attachChildImpl(TaskGroup *group,
477475 AsyncTask *child) {
476+ SWIFT_TASK_DEBUG_LOG (" attach child task = %p to group = %p\n " ,
477+ child, group);
478+
479+ // The counterpart of this (detachChild) is performed by the group itself,
480+ // when it offers the completed (child) task's value to a waiting task -
481+ // during the implementation of `await group.next()`.
478482 auto groupRecord = asImpl (group)->getTaskRecord ();
479- return groupRecord->attachChild (child);
483+ groupRecord->attachChild (child);
480484}
481485
482486// =============================================================================
@@ -506,8 +510,7 @@ bool TaskGroup::isCancelled() {
506510}
507511
508512static void fillGroupNextResult (TaskFutureWaitAsyncContext *context,
509- PollResult result,
510- bool releaseResultRetainedTask) {
513+ PollResult result) {
511514 // / Fill in the result value
512515 switch (result.status ) {
513516 case PollStatus::MustWait:
@@ -516,7 +519,7 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
516519
517520 case PollStatus::Error: {
518521 context->fillWithError (reinterpret_cast <SwiftError *>(result.storage ));
519- break ;
522+ return ;
520523 }
521524
522525 case PollStatus::Success: {
@@ -528,28 +531,17 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
528531 // remaining references to it.
529532 successType->vw_initializeWithCopy (destPtr, result.storage );
530533 successType->vw_storeEnumTagSinglePayload (destPtr, 0 , 1 );
531- break ;
534+ return ;
532535 }
533536
534537 case PollStatus::Empty: {
535538 // Initialize the result as a nil Optional<Success>.
536539 const Metadata *successType = result.successType ;
537540 OpaqueValue *destPtr = context->successResultPointer ;
538541 successType->vw_storeEnumTagSinglePayload (destPtr, 1 , 1 );
539- break ;
542+ return ;
540543 }
541544 }
542-
543- // We only release if asked to; This is because this function is called in two
544- // cases "immediately":
545- // a) when a completed task arrives and a waiting one existed then we don't
546- // need to retain the completed task at all, thus we also don't release it.
547- // b) when the task was stored in the readyQueue it was retained. As a
548- // waitingTask arrives we will fill-in with the value from the retained
549- // task. In this situation we must release the ready task, to allow it to
550- // be destroyed.
551- if (releaseResultRetainedTask)
552- swift_release (result.retainedTask );
553545}
554546
555547void TaskGroupImpl::offer (AsyncTask *completedTask, AsyncContext *context) {
@@ -558,7 +550,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
558550 assert (completedTask->hasChildFragment ());
559551 assert (completedTask->hasGroupChildFragment ());
560552 assert (completedTask->groupChildFragment ()->getGroup () == asAbstract (this ));
561- SWIFT_TASK_DEBUG_LOG (" offer task %p to group %p\n " , completedTask, group );
553+ SWIFT_TASK_DEBUG_LOG (" offer task %p to group %p" , completedTask, this );
562554
563555 mutex.lock (); // TODO: remove fragment lock, and use status for synchronization
564556
@@ -604,7 +596,9 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
604596 static_cast <TaskFutureWaitAsyncContext *>(
605597 waitingTask->ResumeContext );
606598
607- fillGroupNextResult (waitingContext, result, /* release*/ false );
599+ fillGroupNextResult (waitingContext, result);
600+ detachChild (result.retainedTask );
601+
608602 _swift_tsan_acquire (static_cast <Job *>(waitingTask));
609603
610604 // TODO: allow the caller to suggest an executor
@@ -622,11 +616,12 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
622616 // queue when a task polls during next() it will notice that we have a value
623617 // ready for it, and will process it immediately without suspending.
624618 assert (!waitQueue.load (std::memory_order_relaxed));
625- SWIFT_TASK_DEBUG_LOG (" group has no waiting tasks, store ready task = %p" ,
619+ SWIFT_TASK_DEBUG_LOG (" group has no waiting tasks, RETAIN and store ready task = %p" ,
626620 completedTask);
627621 // Retain the task while it is in the queue;
628622 // it must remain alive until the task group is alive.
629623 swift_retain (completedTask);
624+
630625 auto readyItem = ReadyQueueItem::get (
631626 hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
632627 completedTask
@@ -692,7 +687,7 @@ static void swift_taskGroup_wait_next_throwingImpl(
692687 PollResult polled = group->poll (waitingTask);
693688 switch (polled.status ) {
694689 case PollStatus::MustWait:
695- SWIFT_TASK_DEBUG_LOG (" poll group = %p, no ready tasks, waiting task = %p\n " ,
690+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, no ready tasks, waiting task = %p" ,
696691 group, waitingTask);
697692 // The waiting task has been queued on the channel,
698693 // there were pending tasks so it will be woken up eventually.
@@ -706,15 +701,22 @@ static void swift_taskGroup_wait_next_throwingImpl(
706701 case PollStatus::Empty:
707702 case PollStatus::Error:
708703 case PollStatus::Success:
709- SWIFT_TASK_DEBUG_LOG (" poll group = %p, task = %p, ready task available = %p\n " ,
704+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, task = %p, ready task available = %p" ,
710705 group, waitingTask, polled.retainedTask );
711- fillGroupNextResult (context, polled, /* release*/ true );
706+ fillGroupNextResult (context, polled);
707+ if (auto completedTask = polled.retainedTask ) {
708+ // it would be null for PollStatus::Empty, then we don't need to release
709+ group->detachChild (polled.retainedTask );
710+ swift_release (polled.retainedTask );
711+ }
712+
712713 return waitingTask->runInFullyEstablishedContext ();
713714 }
714715}
715716
716717PollResult TaskGroupImpl::poll (AsyncTask *waitingTask) {
717718 mutex.lock (); // TODO: remove group lock, and use status for synchronization
719+ SWIFT_TASK_DEBUG_LOG (" poll group = %p" , this );
718720 auto assumed = statusMarkWaitingAssumeAcquire ();
719721
720722 PollResult result;
@@ -724,6 +726,7 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
724726
725727 // ==== 1) bail out early if no tasks are pending ----------------------------
726728 if (assumed.isEmpty ()) {
729+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, group is empty, no pending tasks" , this );
727730 // No tasks in flight, we know no tasks were submitted before this poll
728731 // was issued, and if we parked here we'd potentially never be woken up.
729732 // Bail out and return `nil` from `group.next()`.
@@ -741,6 +744,9 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
741744
742745 // ==== 2) Ready task was polled, return with it immediately -----------------
743746 if (assumed.readyTasks ()) {
747+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, group has ready tasks = %d" ,
748+ this , assumed.readyTasks ());
749+
744750 auto assumedStatus = assumed.status ;
745751 auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
746752 if (status.compare_exchange_weak (
@@ -844,13 +850,16 @@ static void swift_taskGroup_cancelAllImpl(TaskGroup *group) {
844850}
845851
846852bool TaskGroupImpl::cancelAll () {
853+ SWIFT_TASK_DEBUG_LOG (" cancel all tasks in group = %p" , this );
854+
847855 // store the cancelled bit
848856 auto old = statusCancel ();
849857 if (old.isCancelled ()) {
850858 // already was cancelled previously, nothing to do?
851859 return false ;
852860 }
853861
862+ // FIXME: must also remove the records!!!!
854863 // cancel all existing tasks within the group
855864 swift_task_cancel_group_child_tasks (asAbstract (this ));
856865 return true ;
0 commit comments