summaryrefslogtreecommitdiff
path: root/src/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/threadpool.c')
-rw-r--r--src/threadpool.c156
1 files changed, 71 insertions, 85 deletions
diff --git a/src/threadpool.c b/src/threadpool.c
index c266964..959c060 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -15,6 +15,7 @@
15#include "shared.c" 15#include "shared.c"
16 16
17#include <threads.h> 17#include <threads.h>
18#include <stdint.h>
18#include <stdlib.h> 19#include <stdlib.h>
19#include <errno.h> 20#include <errno.h>
20#include <error.h> 21#include <error.h>
@@ -44,9 +45,9 @@ typedef struct tqnode {
44 * 45 *
45 */ 46 */
46typedef struct taskqueue { 47typedef struct taskqueue {
47 tqnode *start; //!< The first element of the queue 48 tqnode *start; //!< The first element of the queue
48 tqnode *end; //!< The final element of the queue 49 tqnode *end; //!< The final element of the queue
49 unsigned int size; //!< The number of elements in the queue 50 size_t size; //!< The number of elements in the queue
50} taskqueue; 51} taskqueue;
51 52
52/** 53/**
@@ -54,13 +55,14 @@ typedef struct taskqueue {
54 * 55 *
55 */ 56 */
56typedef struct ctqueue { 57typedef struct ctqueue {
57 mtx_t mutex; //!< A mutex for locking sensitive resources 58 mtx_t mutex; //!< A mutex for locking sensitive resources
58 cnd_t cond; //!< A conditional for waiting on / sending a signal 59 cnd_t cond; //!< A conditional for waiting on / sending a signal
59 unsigned char canceled; //!< Whether the threads are currently canceled or not 60 uint8_t canceled; //!< Whether the threads are currently canceled or not
60 61 taskqueue *tq; //!< A taskqueue to be accessed concurrently
61 taskqueue *tq; //!< A taskqueue to be accessed concurrently 62
62 thrd_t *thrdarr; //!< An array of threads to be dispatched as consumers 63 thrd_t *thrdarr; //!< An array of threads to be dispatched as consumers
63 int talen; //!< The length of the thread array 64 int talen; //!< The length of the thread array
65 // Consider making these another linked list or stack or something
64} ctqueue; 66} ctqueue;
65 67
66/** 68/**
@@ -71,12 +73,11 @@ typedef struct ctqueue {
71 * @param data Data to be passed to the callback. May be null 73 * @param data Data to be passed to the callback. May be null
72 * @retval (task*)[NULL, task*] Returns a task object with set parameters. Returns `null` and sets errno on error 74 * @retval (task*)[NULL, task*] Returns a task object with set parameters. Returns `null` and sets errno on error
73 */ 75 */
74task * task_init(gcallback callback, fcallback freecb, void *data) { 76task * task_new(gcallback callback, fcallback freecb, void *data) {
75 if(callback == NULL) ERRRET(EINVAL, NULL); 77 if(callback == NULL) ERRRET(EINVAL, NULL);
76 78
77 task *tsk = calloc(1, sizeof(*tsk)); 79 task *tsk = calloc(1, sizeof(*tsk));
78 if(!tsk) 80 if(!tsk) return NULL;
79 return NULL;
80 81
81 tsk->callback = callback; 82 tsk->callback = callback;
82 tsk->freecb = freecb; 83 tsk->freecb = freecb;
@@ -91,12 +92,10 @@ task * task_init(gcallback callback, fcallback freecb, void *data) {
91 * @param tsk A task object to free. Frees data associated with task via `freecb` value specified in its creation. May be null 92 * @param tsk A task object to free. Frees data associated with task via `freecb` value specified in its creation. May be null
92 */ 93 */
93void task_free(void *tsk) { 94void task_free(void *tsk) {
94 task *real = (task *)tsk; 95 task *real = tsk;
95 if(!real) 96 if(!real) return;
96 return;
97 97
98 if(real->freecb != NULL) 98 if(real->freecb) real->freecb(real->data);
99 real->freecb(real->data);
100 free(real); 99 free(real);
101 100
102 return; 101 return;
@@ -120,18 +119,18 @@ int task_fire(task *tsk) {
120 * @retval (int) Returns value of the callback. Returns -1 and sets errno on error 119 * @retval (int) Returns value of the callback. Returns -1 and sets errno on error
121 */ 120 */
122int task_fired(task *tsk) { 121int task_fired(task *tsk) {
122 if(!tsk) return -1;
123 int retval = task_fire(tsk); 123 int retval = task_fire(tsk);
124 if(errno == EINVAL && retval == -1) {return -1;}
125 task_free(tsk); 124 task_free(tsk);
126 return retval; 125 return retval;
127} 126}
128 127
129 128
130tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) { 129tqnode * tqnode_new(tqnode *next, tqnode *prev, task *tsk) {
131 if(!tsk) ERRRET(EINVAL, NULL); 130 if(!tsk) ERRRET(EINVAL, NULL);
131
132 tqnode *node = calloc(1, sizeof(*node)); 132 tqnode *node = calloc(1, sizeof(*node));
133 if(!node) 133 if(!node) return NULL;
134 return NULL;
135 134
136 node->next = next; 135 node->next = next;
137 node->prev = prev; 136 node->prev = prev;
@@ -140,10 +139,19 @@ tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) {
140 return node; 139 return node;
141} 140}
142 141
142// Create a tqnode and task at the same time. Returns a valid tqnode with a valid task on success, NULL on error. Does not call task_free on error
143tqnode * tqnode_newtask(tqnode *next, tqnode *prev, gcallback callback, fcallback freecb, void *data) {
144 task *tsk = task_new(callback, freecb, data);
145 if(!tsk) return NULL;
146
147 tqnode *node = tqnode_new(next, prev, tsk);
148 if(!node) free(tsk);
149 return node;
150}
151
143void tqnode_free(void *tqn) { 152void tqnode_free(void *tqn) {
144 tqnode *real = (tqnode *)tqn; 153 tqnode *real = tqn;
145 if(!real) 154 if(!real) return;
146 return;
147 155
148 task_free(real->task); 156 task_free(real->task);
149 free(real); 157 free(real);
@@ -157,10 +165,9 @@ void tqnode_free(void *tqn) {
157 * 165 *
158 * @retval (taskqueue*)[NULL, taskqueue*] Returns a new taskqueue object. Returns `null` and sets errno on error 166 * @retval (taskqueue*)[NULL, taskqueue*] Returns a new taskqueue object. Returns `null` and sets errno on error
159 */ 167 */
160taskqueue * taskqueue_init(void) { 168taskqueue * taskqueue_new(void) {
161 taskqueue *tq = calloc(1, sizeof(*tq)); 169 taskqueue *tq = calloc(1, sizeof(*tq));
162 if(!tq) 170 if(!tq) return NULL;
163 return NULL;
164 171
165 tq->start = NULL; 172 tq->start = NULL;
166 tq->end = NULL; 173 tq->end = NULL;
@@ -175,8 +182,7 @@ taskqueue * taskqueue_init(void) {
175 * @param tq A taskqueue to be freed. May be null 182 * @param tq A taskqueue to be freed. May be null
176 */ 183 */
177void taskqueue_free(void *tq) { 184void taskqueue_free(void *tq) {
178 if(!tq) 185 if(!tq) return;
179 return;
180 186
181 for(tqnode *p = ((taskqueue*)tq)->start, *n; p != NULL;) { 187 for(tqnode *p = ((taskqueue*)tq)->start, *n; p != NULL;) {
182 n = p->next; 188 n = p->next;
@@ -192,9 +198,8 @@ int taskqueue_handlefirst(taskqueue *tq, task *tsk) {
192 if(!tq || !tsk) ERRRET(EINVAL, -1); 198 if(!tq || !tsk) ERRRET(EINVAL, -1);
193 if(tq->size) {return 0;} 199 if(tq->size) {return 0;}
194 200
195 tqnode *first = tqnode_init(NULL, NULL, tsk); 201 tqnode *first = tqnode_new(NULL, NULL, tsk);
196 if(!first) 202 if(!first) return -1;
197 return -1;
198 203
199 tq->start = first; 204 tq->start = first;
200 tq->end = first; 205 tq->end = first;
@@ -214,13 +219,12 @@ int taskqueue_push(taskqueue *tq, task *tsk) {
214 if(!tq || !tsk) ERRRET(EINVAL, -1); 219 if(!tq || !tsk) ERRRET(EINVAL, -1);
215 220
216 int hf; 221 int hf;
217 if((hf = taskqueue_handlefirst(tq, tsk))) 222 if((hf = taskqueue_handlefirst(tq, tsk))) return (hf >= 0) ? 0 : -1;
218 return (hf >= 0) ? 0 : -1; 223
224 tqnode *curstart = tq->start, *newstart = tqnode_new(tq->start, NULL, tsk);
225 if(!newstart) return -1;
219 226
220 tqnode *newstart = tqnode_init(tq->start, NULL, tsk); 227 curstart->prev = newstart;
221 if(!newstart)
222 return -1;
223 tq->start->prev = newstart;
224 tq->start = newstart; 228 tq->start = newstart;
225 tq->size++; 229 tq->size++;
226 230
@@ -264,13 +268,12 @@ int taskqueue_pushfront(taskqueue *tq, task *tsk) {
264 if(!tq || !tsk) ERRRET(EINVAL, -1); 268 if(!tq || !tsk) ERRRET(EINVAL, -1);
265 269
266 int hf; 270 int hf;
267 if((hf = taskqueue_handlefirst(tq, tsk))) 271 if((hf = taskqueue_handlefirst(tq, tsk))) return (hf >= 0) ? 0 : -1;
268 return (hf >= 0) ? 0 : -1;
269 272
270 tqnode *newend = tqnode_init(NULL, tq->end, tsk); 273 tqnode *end =tq->end, *newend = tqnode_new(NULL, tq->end, tsk);
271 if(!newend) 274 if(!newend) return -1;
272 return -1; 275
273 tq->end->next = newend; 276 end->next = newend;
274 tq->end = newend; 277 tq->end = newend;
275 tq->size++; 278 tq->size++;
276 279
@@ -323,15 +326,15 @@ int taskqueue_size(taskqueue *tq) {
323 mtx_unlock(&(ctq)->mutex); \ 326 mtx_unlock(&(ctq)->mutex); \
324} while (0) 327} while (0)
325 328
326static void ___ucl_mtxdestroy(void *mtx) { 329static void mtxd_helper(mtx_t *mutex) {
327 if(!mtx) return; 330 if(!mutex) return;
328 mtx_destroy((mtx_t *)mtx); 331 mtx_destroy(mutex);
329 return; 332 return;
330} 333}
331 334
332static void ___ucl_cnddestroy(void *cond) { 335static void cndd_helper(cnd_t *cond) {
333 if(cond) return; 336 if(!cond) return;
334 cnd_destroy((cnd_t *)cond); 337 cnd_destroy(cond);
335 return; 338 return;
336} 339}
337 340
@@ -343,47 +346,32 @@ static void ___ucl_cnddestroy(void *cond) {
343 */ 346 */
344ctqueue * ctqueue_init(int nthreads) { 347ctqueue * ctqueue_init(int nthreads) {
345 if(nthreads <= 0) ERRRET(EINVAL, NULL); 348 if(nthreads <= 0) ERRRET(EINVAL, NULL);
346 cleanup_CREATE(6);
347 349
348 ctqueue *ctq = calloc(1, sizeof(*ctq)); 350 ctqueue *ctq = calloc(1, sizeof(*ctq));
349 if(!ctq) 351 if(!ctq) return NULL;
350 return NULL;
351 cleanup_REGISTER(free, ctq);
352 352
353 ctq->canceled = 0; 353 ctq->canceled = 0;
354 ctq->talen = nthreads; 354 ctq->talen = nthreads;
355 355
356 cleanup_CNDEXEC( 356 ctq->tq = taskqueue_new();
357 ctq->tq = taskqueue_init(); 357 if(!ctq->tq) goto ERR_ctqueue_init;
358 if(!ctq->tq)
359 cleanup_MARK();
360 cleanup_CNDREGISTER(taskqueue_free, ctq->tq);
361 );
362 358
363 cleanup_CNDEXEC( 359 if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) goto ERR_ctqueue_init;
364 if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) 360 if(cnd_init(&ctq->cond) != thrd_success) goto ERR_ctqueue_init;
365 cleanup_MARK();
366 cleanup_CNDREGISTER(___ucl_mtxdestroy, (void*)&ctq->mutex);
367 );
368 361
369 cleanup_CNDEXEC( 362 ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t));
370 if(cnd_init(&ctq->cond) != thrd_success) 363 if(!ctq->thrdarr) goto ERR_ctqueue_init;
371 cleanup_MARK();
372 cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond);
373 );
374 364
375 cleanup_CNDEXEC( 365 return ctq;
376 ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t));
377 if(!ctq->thrdarr)
378 cleanup_MARK();
379 cleanup_CNDREGISTER(free, ctq->thrdarr);
380 )
381 366
382 cleanup_CNDFIRE(); 367ERR_ctqueue_init:
383 if(cleanup_ERRORFLAGGED) 368 free(ctq->thrdarr);
384 return NULL; 369 cndd_helper(&ctq->cond);
370 mtxd_helper(&ctq->mutex);
371 taskqueue_free(ctq->tq);
372 free(ctq);
385 373
386 return ctq; 374 return NULL;
387} 375}
388 376
389/** 377/**
@@ -410,8 +398,7 @@ int ctqueue_cancel(ctqueue *ctq) {
410 * @param ctq The concurrent taskqueue to free. May be null 398 * @param ctq The concurrent taskqueue to free. May be null
411 */ 399 */
412void ctqueue_free(void *ctq) { 400void ctqueue_free(void *ctq) {
413 if(!ctq) 401 if(!ctq) return;
414 return;
415 402
416 ctqueue *real = (ctqueue *)ctq; 403 ctqueue *real = (ctqueue *)ctq;
417 ctqueue_cancel(real); 404 ctqueue_cancel(real);
@@ -485,8 +472,7 @@ static int __CTQ_CONSUMER(void *ctq) {
485 472
486 for(task *ctask = NULL;;) { 473 for(task *ctask = NULL;;) {
487 ctask = ctqueue_waitpop(real); 474 ctask = ctqueue_waitpop(real);
488 if(!ctask) 475 if(!ctask) break;
489 break;
490 476
491 task_fire(ctask); 477 task_fire(ctask);
492 task_free(ctask); 478 task_free(ctask);