Skip to content
This repository was archived by the owner on Oct 31, 2022. It is now read-only.

Commit 562f364

Browse files
committed
Simpler task-handling code
1 parent 6aae821 commit 562f364

File tree

3 files changed

+55
-236
lines changed

3 files changed

+55
-236
lines changed

source/HapCompressor.c

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ typedef struct {
8282
HapCodecBufferRef finishedFrames;
8383
OSSpinLock lock;
8484

85-
Boolean endTasksPending;
8685
HapCodecBufferPoolRef compressTaskPool;
8786

8887
HapCodecDXTEncoderRef dxtEncoder;
@@ -93,7 +92,7 @@ typedef struct {
9392
HapCodecBufferPoolRef dxtBufferPool;
9493

9594
unsigned int dxtFormat;
96-
unsigned int taskGroup;
95+
HapCodecTaskGroupRef taskGroup;
9796

9897
#ifdef DEBUG
9998
unsigned int debugFrameCount;
@@ -140,6 +139,7 @@ struct HapCodecCompressTask
140139
#include <ComponentDispatchHelper.c>
141140
#endif
142141

142+
static void Background_Encode(void *info);
143143
static void releaseTaskFrames(HapCodecCompressTask *task);
144144
static ComponentResult finishFrame(HapCodecBufferRef buffer);
145145
static void queueEncodedFrame(HapCompressorGlobals glob, HapCodecBufferRef frame);
@@ -182,14 +182,13 @@ Hap_COpen(
182182
glob->lastFrameOut = 0;
183183
glob->finishedFrames = NULL;
184184
glob->lock = OS_SPINLOCK_INIT;
185-
glob->endTasksPending = false;
186185
glob->compressTaskPool = NULL;
187186
glob->dxtEncoder = NULL;
188187
glob->formatConvertPool = NULL;
189188
glob->formatConvertBufferBytesPerRow = 0;
190189
glob->dxtBufferPool = NULL;
191190
glob->dxtFormat = 0;
192-
glob->taskGroup = 0;
191+
glob->taskGroup = NULL;
193192

194193
bail:
195194
debug_print_err(glob, err);
@@ -247,10 +246,9 @@ Hap_CClose(
247246
HapCodecBufferPoolDestroy(glob->formatConvertPool);
248247
glob->formatConvertPool = NULL;
249248

250-
if (glob->endTasksPending)
251-
{
252-
HapCodecTasksWillStop();
253-
}
249+
HapCodecTasksWaitForGroupToComplete(glob->taskGroup);
250+
HapCodecTasksDestroyGroup(glob->taskGroup);
251+
glob->taskGroup = NULL;
254252

255253
// We should never have queued frames when we are closed
256254
// but we check and properly release the memory if we do
@@ -571,10 +569,7 @@ Hap_CPrepareToCompressFrames(
571569
goto bail;
572570
}
573571

574-
HapCodecTasksWillStart();
575-
glob->endTasksPending = true;
576-
577-
glob->taskGroup = HapCodecTasksNewGroup();
572+
glob->taskGroup = HapCodecTasksCreateGroup(Background_Encode, 20);
578573

579574
#ifdef DEBUG
580575
glob->debugStartTime = mach_absolute_time();
@@ -896,7 +891,7 @@ Hap_CEncodeFrame(
896891
task->encodedFrame = NULL;
897892
task->next = NULL;
898893

899-
HapCodecTasksAddTask(Background_Encode, glob->taskGroup, buffer);
894+
HapCodecTasksAddTask(glob->taskGroup, buffer);
900895

901896
// Dequeue and deliver any encoded frames
902897
do

source/Tasks.c

Lines changed: 41 additions & 218 deletions
Original file line numberDiff line numberDiff line change
@@ -26,243 +26,66 @@
2626
*/
2727

2828
#include "Tasks.h"
29-
#include <pthread.h>
30-
#include <sys/sysctl.h>
31-
#include <libkern/OSAtomic.h>
3229

33-
typedef struct HapCodecTaskRecord
34-
{
35-
unsigned int group;
36-
HapCodecTaskWorkFunction func;
37-
void *context;
38-
unsigned int running;
39-
} HapCodecTaskRecord;
40-
41-
// TODO: contain these in a struct which we malloc/free reducing our loaded code footprint and
42-
// making init/cleanup faster (we can set the pointer fast then teardown outside the lock)
43-
static OSSpinLock mGlobalLock = OS_SPINLOCK_INIT;
44-
static unsigned int mSenderCount = 0U;
45-
static int mInitted = 0;
46-
static unsigned int mThreadCount = 0U;
47-
static pthread_mutex_t mThreadLock;
48-
static pthread_cond_t mTaskWaitCond;
49-
static pthread_cond_t mFeedWaitCond;
50-
static HapCodecTaskRecord *mTasks;
51-
52-
static int HapCodecTasksGetMaximumThreadCount();
53-
54-
static void *HapCodecTasksThread(void *info)
55-
{
56-
#pragma unused (info)
57-
int done = 0;
58-
pthread_mutex_lock(&mThreadLock);
59-
do
60-
{
61-
int i = 0;
62-
int ran = 0;
63-
for (i = 0; i < HapCodecTasksGetMaximumThreadCount(); i++)
64-
{
65-
if (mTasks[i].func != NULL && mTasks[i].running == 0)
66-
{
67-
mTasks[i].running = 1U;
68-
pthread_mutex_unlock(&mThreadLock);
69-
mTasks[i].func(mTasks[i].context);
70-
pthread_mutex_lock(&mThreadLock);
71-
mTasks[i].func = NULL;
72-
mTasks[i].running = 0;
73-
ran = 1;
74-
break;
75-
}
76-
}
77-
if (ran == 0 && mSenderCount == 0)
78-
{
79-
done = 1;
80-
}
81-
if (ran == 1)
82-
{
83-
pthread_cond_signal(&mFeedWaitCond); // TODO: check we actually need to signal perhaps by wrapping our semaphores into a counting semaphore pseudo-class
84-
}
85-
if (done == 0 && ran == 0)
86-
{
87-
pthread_cond_wait(&mTaskWaitCond, &mThreadLock);
88-
}
89-
}
90-
while (done == 0);
91-
mThreadCount--;
92-
if (mThreadCount == 0)
93-
{
94-
pthread_cond_signal(&mFeedWaitCond);
95-
}
96-
pthread_mutex_unlock(&mThreadLock);
97-
return NULL;
98-
}
99-
100-
static int HapCodecTasksGetMaximumThreadCount()
101-
{
102-
static int mMaxThreadCount = 0;
103-
if (mMaxThreadCount == 0)
104-
{
105-
int mib[2] = {CTL_HW, HW_NCPU};
106-
size_t len = sizeof(mMaxThreadCount);
107-
108-
// could use something like and watch for changes to eg power state
109-
// sysctlbyname("hw.activecpu", &ncpu, &len, NULL, 0);
110-
111-
int result = sysctl(mib, 2, &mMaxThreadCount, &len, NULL, 0);
112-
if (result != 0)
113-
{
114-
mMaxThreadCount = 4; // conservative guess if we couldn't get a value
115-
}
116-
}
117-
return mMaxThreadCount;
118-
}
119-
120-
static int HapCodecTasksInit()
121-
{
122-
if (mInitted == 0)
123-
{
124-
mThreadCount = 0U;
125-
// This isn't ideal doing these longer operations inside a spinlock... once at load using an initializer?
126-
if (pthread_mutex_init(&mThreadLock, NULL) != 0)
127-
{
128-
return 1;
129-
}
130-
if (pthread_cond_init(&mTaskWaitCond, NULL) != 0)
131-
{
132-
pthread_mutex_destroy(&mThreadLock);
133-
return 1;
134-
}
135-
if (pthread_cond_init(&mFeedWaitCond, NULL) != 0)
136-
{
137-
pthread_mutex_destroy(&mThreadLock);
138-
pthread_cond_destroy(&mTaskWaitCond);
139-
return 1;
140-
}
141-
mTasks = malloc(sizeof(HapCodecTaskRecord) * HapCodecTasksGetMaximumThreadCount());
142-
int i;
143-
for (i = 0; i < HapCodecTasksGetMaximumThreadCount(); i++)
144-
{
145-
mTasks[i].running = 0;
146-
mTasks[i].func = NULL;
147-
}
148-
mInitted = 1;
149-
}
150-
return 0;
151-
}
30+
struct HapCodecTaskGroup {
31+
HapCodecTaskWorkFunction task;
32+
dispatch_group_t group;
33+
dispatch_queue_t queue;
34+
dispatch_semaphore_t semaphore;
35+
};
15236

153-
static void HapCodecTasksCleanup(void)
37+
void HapCodecTasksAddTask(HapCodecTaskGroupRef group, void *context)
15438
{
155-
if (mInitted != 0)
39+
if (group && group->group && group->queue && group->task && group->semaphore)
15640
{
157-
// TODO: we could do this on the last thread if we are sure we can avoid creating a new instance over the top of it
158-
pthread_mutex_lock(&mThreadLock);
159-
pthread_cond_broadcast(&mTaskWaitCond);
160-
while (mThreadCount > 0)
161-
{
162-
pthread_cond_wait(&mFeedWaitCond, &mThreadLock);
163-
}
164-
pthread_mutex_unlock(&mThreadLock);
165-
pthread_mutex_destroy(&mThreadLock);
166-
pthread_cond_destroy(&mTaskWaitCond);
167-
pthread_cond_destroy(&mFeedWaitCond);
168-
free(mTasks);
169-
mTasks = NULL;
170-
mInitted = 0;
41+
dispatch_semaphore_wait(group->semaphore, DISPATCH_TIME_FOREVER);
42+
// Copy values in case group is released before execution
43+
HapCodecTaskWorkFunction task = group->task;
44+
dispatch_semaphore_t semaphore = group->semaphore;
45+
// Retain semaphore for block
46+
dispatch_retain(semaphore);
47+
dispatch_group_async(group->group, group->queue, ^{
48+
task(context);
49+
dispatch_semaphore_signal(semaphore);
50+
dispatch_release(semaphore);
51+
});
17152
}
17253
}
17354

174-
void HapCodecTasksWillStart(void)
55+
void HapCodecTasksWaitForGroupToComplete(HapCodecTaskGroupRef group)
17556
{
176-
OSSpinLockLock(&mGlobalLock);
177-
mSenderCount++;
178-
if (mSenderCount == 1U)
179-
{
180-
HapCodecTasksInit();
181-
}
182-
OSSpinLockUnlock(&mGlobalLock);
183-
}
184-
185-
void HapCodecTasksWillStop(void)
186-
{
187-
OSSpinLockLock(&mGlobalLock);
188-
mSenderCount--;
189-
if (mSenderCount == 0U)
190-
{
191-
// clear state and stop our threads outside of the lock
192-
HapCodecTasksCleanup();
193-
}
194-
OSSpinLockUnlock(&mGlobalLock);
57+
if (group && group->group) dispatch_group_wait(group->group, DISPATCH_TIME_FOREVER);
19558
}
19659

197-
void HapCodecTasksAddTask(HapCodecTaskWorkFunction task, unsigned int group, void *context)
60+
HapCodecTaskGroupRef HapCodecTasksCreateGroup(HapCodecTaskWorkFunction task, unsigned int maxTasks)
19861
{
199-
pthread_mutex_lock(&mThreadLock);
200-
// Check to see if we can spawn a new thread for this task
201-
if (mThreadCount < HapCodecTasksGetMaximumThreadCount())
62+
HapCodecTaskGroupRef group = NULL;
63+
if (task)
20264
{
203-
pthread_t thread;
204-
pthread_attr_t attr;
205-
pthread_attr_init(&attr);
206-
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
207-
if (pthread_create(&thread, &attr, HapCodecTasksThread, NULL) == 0)
65+
group = malloc(sizeof(struct HapCodecTaskGroup));
66+
if (group)
20867
{
209-
mThreadCount++;
210-
}
211-
}
212-
// setup the task in a free slot, waiting for one if necessary
213-
int i;
214-
int found = 0;
215-
do
216-
{
217-
for (i = 0; i < HapCodecTasksGetMaximumThreadCount(); i++)
218-
{
219-
if (mTasks[i].func == NULL)
68+
group->task = task;
69+
group->group = dispatch_group_create();
70+
group->queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
71+
group->semaphore = dispatch_semaphore_create(maxTasks);
72+
if (group->group == NULL || group->queue == NULL || group->semaphore == NULL)
22073
{
221-
found = 1;
222-
mTasks[i].func = task;
223-
mTasks[i].group = group;
224-
mTasks[i].context = context;
225-
break;
74+
HapCodecTasksDestroyGroup(group);
75+
group = NULL;
22676
}
22777
}
228-
if (found == 0)
229-
{
230-
pthread_cond_wait(&mFeedWaitCond, &mThreadLock);
231-
}
23278
}
233-
while (found == 0);
234-
// signal the task thread to wake
235-
pthread_cond_signal(&mTaskWaitCond);
236-
pthread_mutex_unlock(&mThreadLock);
237-
238-
79+
return group;
23980
}
24081

241-
void HapCodecTasksWaitForGroupToComplete(unsigned int group)
82+
void HapCodecTasksDestroyGroup(HapCodecTaskGroupRef group)
24283
{
243-
pthread_mutex_lock(&mThreadLock);
244-
int done = 0;
245-
do
84+
if (group)
24685
{
247-
int i;
248-
done = 1;
249-
for (i = 0; i < HapCodecTasksGetMaximumThreadCount(); i++)
250-
{
251-
if (mTasks[i].func != NULL && mTasks[i].group == group)
252-
{
253-
done = 0;
254-
}
255-
}
256-
if (done == 0)
257-
{
258-
pthread_cond_wait(&mFeedWaitCond, &mThreadLock);
259-
}
260-
} while (done == 0);
261-
pthread_mutex_unlock(&mThreadLock);
262-
}
263-
264-
unsigned int HapCodecTasksNewGroup(void)
265-
{
266-
static int32_t mGroup = 0;
267-
return OSAtomicIncrement32(&mGroup);
86+
if (group->group) dispatch_release(group->group);
87+
if (group->queue) dispatch_release(group->queue);
88+
if (group->semaphore) dispatch_release(group->semaphore);
89+
free(group);
90+
}
26891
}

source/Tasks.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@
2525
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2626
*/
2727

28+
typedef struct HapCodecTaskGroup * HapCodecTaskGroupRef;
29+
2830
typedef void (*HapCodecTaskWorkFunction)(void *context);
2931

30-
void HapCodecTasksWillStart(void);
31-
void HapCodecTasksWillStop(void);
32-
unsigned int HapCodecTasksNewGroup(void);
33-
void HapCodecTasksAddTask(HapCodecTaskWorkFunction task, unsigned int group, void *context);
34-
void HapCodecTasksWaitForGroupToComplete(unsigned int group);
32+
HapCodecTaskGroupRef HapCodecTasksCreateGroup(HapCodecTaskWorkFunction task, unsigned int maxTasks);
33+
void HapCodecTasksDestroyGroup(HapCodecTaskGroupRef group);
34+
void HapCodecTasksAddTask(HapCodecTaskGroupRef group, void *context);
35+
void HapCodecTasksWaitForGroupToComplete(HapCodecTaskGroupRef group);

0 commit comments

Comments
 (0)