summaryrefslogtreecommitdiff
path: root/src/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/threadpool.c')
-rw-r--r--src/threadpool.c63
1 files changed, 45 insertions, 18 deletions
diff --git a/src/threadpool.c b/src/threadpool.c
index c4d8a5c..bd52c75 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -5,9 +5,37 @@
5#include <errno.h> 5#include <errno.h>
6#include <error.h> 6#include <error.h>
7 7
8typedef struct task {
9 gcallback callback;
10 fcallback freecb;
11 void *data;
12} task;
13
14typedef struct tqnode {
15 struct tqnode *next;
16 struct tqnode *prev;
17 task *task;
18} tqnode;
19
20typedef struct taskqueue {
21 tqnode *start;
22 tqnode *end;
23 unsigned int size;
24} taskqueue;
25
26typedef struct ctqueue {
27 mtx_t mutex;
28 cnd_t cond;
29 unsigned char canceled;
30
31 taskqueue *tq;
32 thrd_t *thrdarr;
33 int talen;
34} ctqueue;
35
8 36
9task * task_init(gcallback callback, fcallback freecb, void *data) { 37task * task_init(gcallback callback, fcallback freecb, void *data) {
10 if(callback == NULL) {errno = EINVAL; return NULL;} 38 if(callback == NULL) ERRRET(EINVAL, NULL);
11 39
12 task *tsk = calloc(1, sizeof(*tsk)); 40 task *tsk = calloc(1, sizeof(*tsk));
13 if(!tsk) 41 if(!tsk)
@@ -33,7 +61,7 @@ void task_free(void *tsk) {
33} 61}
34 62
35int task_fire(task *tsk) { 63int task_fire(task *tsk) {
36 if(!tsk) {errno = EINVAL; return -1;} 64 if(!tsk) ERRRET(EINVAL, -1);
37 return tsk->callback(tsk->data); 65 return tsk->callback(tsk->data);
38} 66}
39 67
@@ -46,7 +74,7 @@ int task_fired(task *tsk) {
46 74
47 75
48tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) { 76tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) {
49 if(!tsk) {errno = EINVAL; return NULL;} 77 if(!tsk) ERRRET(EINVAL, NULL);
50 tqnode *node = calloc(1, sizeof(*node)); 78 tqnode *node = calloc(1, sizeof(*node));
51 if(!node) 79 if(!node)
52 return NULL; 80 return NULL;
@@ -98,7 +126,7 @@ void taskqueue_free(void *tq) {
98} 126}
99 127
100int taskqueue_handlefirst(taskqueue *tq, task *tsk) { 128int taskqueue_handlefirst(taskqueue *tq, task *tsk) {
101 if(!tq || !tsk) {errno = EINVAL; return -1;} 129 if(!tq || !tsk) ERRRET(EINVAL, -1);
102 if(tq->size) {return 0;} 130 if(tq->size) {return 0;}
103 131
104 tqnode *first = tqnode_init(NULL, NULL, tsk); 132 tqnode *first = tqnode_init(NULL, NULL, tsk);
@@ -113,7 +141,7 @@ int taskqueue_handlefirst(taskqueue *tq, task *tsk) {
113} 141}
114 142
115int taskqueue_push(taskqueue *tq, task *tsk) { 143int taskqueue_push(taskqueue *tq, task *tsk) {
116 if(!tq || !tsk) {errno = EINVAL; return -1;} 144 if(!tq || !tsk) ERRRET(EINVAL, -1);
117 145
118 int hf; 146 int hf;
119 if((hf = taskqueue_handlefirst(tq, tsk))) 147 if((hf = taskqueue_handlefirst(tq, tsk)))
@@ -130,8 +158,8 @@ int taskqueue_push(taskqueue *tq, task *tsk) {
130} 158}
131 159
132task * taskqueue_pop(taskqueue *tq) { 160task * taskqueue_pop(taskqueue *tq) {
133 if(!tq) {errno = EINVAL; return NULL;} 161 if(!tq) ERRRET(EINVAL, NULL);
134 if(tq->size <= 0) {errno = ENODATA; return NULL;} 162 if(tq->size <= 0) ERRRET(ENODATA, NULL);
135 163
136 tqnode *end = tq->end; 164 tqnode *end = tq->end;
137 task *ret = end->task; 165 task *ret = end->task;
@@ -150,7 +178,7 @@ task * taskqueue_pop(taskqueue *tq) {
150} 178}
151 179
152int taskqueue_pushfront(taskqueue *tq, task *tsk) { 180int taskqueue_pushfront(taskqueue *tq, task *tsk) {
153 if(!tq || !tsk) {errno = EINVAL; return -1;} 181 if(!tq || !tsk) ERRRET(EINVAL, -1);
154 182
155 int hf; 183 int hf;
156 if((hf = taskqueue_handlefirst(tq, tsk))) 184 if((hf = taskqueue_handlefirst(tq, tsk)))
@@ -167,8 +195,8 @@ int taskqueue_pushfront(taskqueue *tq, task *tsk) {
167} 195}
168 196
169task * taskqueue_popback(taskqueue *tq) { 197task * taskqueue_popback(taskqueue *tq) {
170 if(!tq) {errno = EINVAL; return NULL;} 198 if(!tq) ERRRET(EINVAL, NULL);
171 if(tq->size <= 0) {errno = ENODATA; return NULL;} 199 if(tq->size <= 0) ERRRET(ENODATA, NULL);
172 200
173 tqnode *start = tq->start; 201 tqnode *start = tq->start;
174 task *ret = start->task; 202 task *ret = start->task;
@@ -187,7 +215,7 @@ task * taskqueue_popback(taskqueue *tq) {
187} 215}
188 216
189int taskqueue_size(taskqueue *tq) { 217int taskqueue_size(taskqueue *tq) {
190 if(!tq) {errno = EINVAL; return -1;} 218 if(!tq) ERRRET(EINVAL, -1);
191 return tq->size; 219 return tq->size;
192} 220}
193 221
@@ -219,7 +247,7 @@ static void ___ucl_cnddestroy(void *cond) {
219} 247}
220 248
221ctqueue * ctqueue_init(int nthreads) { 249ctqueue * ctqueue_init(int nthreads) {
222 if(nthreads <= 0) {errno = EINVAL; return NULL;} 250 if(nthreads <= 0) ERRRET(EINVAL, NULL);
223 cleanup_CREATE(6); 251 cleanup_CREATE(6);
224 252
225 ctqueue *ctq = calloc(1, sizeof(*ctq)); 253 ctqueue *ctq = calloc(1, sizeof(*ctq));
@@ -264,7 +292,7 @@ ctqueue * ctqueue_init(int nthreads) {
264} 292}
265 293
266int ctqueue_cancel(ctqueue *ctq) { 294int ctqueue_cancel(ctqueue *ctq) {
267 if(!ctq) {errno = EINVAL; return -1;} 295 if(!ctq) ERRRET(EINVAL, -1);
268 296
269 __CTQ_INLOCK(ctq, 1, 297 __CTQ_INLOCK(ctq, 1,
270 ctq->canceled = 1; 298 ctq->canceled = 1;
@@ -297,7 +325,7 @@ void ctqueue_free(void *ctq) {
297} 325}
298 326
299int ctqueue_waitpush(ctqueue *ctq, task *tsk) { 327int ctqueue_waitpush(ctqueue *ctq, task *tsk) {
300 if(!ctq || !tsk) {errno = EINVAL; return -1;} 328 if(!ctq || !tsk) ERRRET(EINVAL, -1);
301 int retval = 0; 329 int retval = 0;
302 330
303 __CTQ_INLOCK(ctq, -1, 331 __CTQ_INLOCK(ctq, -1,
@@ -310,7 +338,7 @@ int ctqueue_waitpush(ctqueue *ctq, task *tsk) {
310} 338}
311 339
312task * ctqueue_waitpop(ctqueue *ctq) { 340task * ctqueue_waitpop(ctqueue *ctq) {
313 if(!ctq) {errno = EINVAL; return NULL;} 341 if(!ctq) ERRRET(EINVAL, NULL);
314 task *retval = NULL; 342 task *retval = NULL;
315 343
316 __CTQ_INLOCK(ctq, NULL, 344 __CTQ_INLOCK(ctq, NULL,
@@ -318,9 +346,8 @@ task * ctqueue_waitpop(ctqueue *ctq) {
318 cnd_wait(&ctq->cond, &ctq->mutex); 346 cnd_wait(&ctq->cond, &ctq->mutex);
319 347
320 if(ctq->canceled) { 348 if(ctq->canceled) {
321 errno = ECANCELED;
322 mtx_unlock(&ctq->mutex); 349 mtx_unlock(&ctq->mutex);
323 return NULL; 350 ERRRET(ECANCELED, NULL);
324 } 351 }
325 352
326 retval = taskqueue_pop(ctq->tq); 353 retval = taskqueue_pop(ctq->tq);
@@ -346,7 +373,7 @@ static int __CTQ_CONSUMER(void *ctq) {
346} 373}
347 374
348int ctqueue_start(ctqueue *ctq) { 375int ctqueue_start(ctqueue *ctq) {
349 if(!ctq) {errno = EINVAL; return -1;} 376 if(!ctq) ERRRET(EINVAL, -1);
350 377
351 ctq->canceled = 0; 378 ctq->canceled = 0;
352 379