summaryrefslogtreecommitdiff
path: root/src/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/threadpool.c')
-rw-r--r--src/threadpool.c550
1 files changed, 263 insertions, 287 deletions
diff --git a/src/threadpool.c b/src/threadpool.c
index 6912790..c4d8a5c 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -1,386 +1,362 @@
1#include "threadpool.h" 1#include "threadpool.h"
2#include "shared.h"
3 2
4#include "ll.h"
5
6#include <asm-generic/errno-base.h>
7#include <asm-generic/errno.h>
8#include <threads.h> 3#include <threads.h>
9#include <stdlib.h> 4#include <stdlib.h>
10#include <errno.h> 5#include <errno.h>
6#include <error.h>
7
11 8
12// Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) 9task * task_init(gcallback callback, fcallback freecb, void *data) {
13typedef struct mtxp { 10 if(callback == NULL) {errno = EINVAL; return NULL;}
14 void *data;
15 mtx_t mtx;
16} mtxpair;
17 11
18mtxpair * mtxpair_init(void * const data, int type) { 12 task *tsk = calloc(1, sizeof(*tsk));
19 mtxpair *mtxp = VALLOC(1, sizeof(*mtxp)); 13 if(!tsk)
20 if(!mtxp)
21 return NULL; 14 return NULL;
22 15
23 // Init the mutex 16 tsk->callback = callback;
24 if(mtx_init(&mtxp->mtx, type) == thrd_error) { 17 tsk->freecb = freecb;
25 free(mtxp); 18 tsk->data = data;
26 RETURNWERR(errno, NULL);
27 }
28 19
29 mtxp->data = data; 20 return tsk;
30 return mtxp;
31} 21}
32 22
33void mtxpair_free(mtxpair *mp) { 23void task_free(void *tsk) {
34 if(!mp) 24 task *real = (task *)tsk;
25 if(!real)
35 return; 26 return;
36 27
37 mtx_destroy(&mp->mtx); 28 if(real->freecb != NULL)
38 free(mp); 29 real->freecb(real->data);
30 free(real);
39 31
40 return; 32 return;
41} 33}
42 34
43int mtxpair_setdata(mtxpair * const mp, void * const data) { 35int task_fire(task *tsk) {
44 if(!mp) 36 if(!tsk) {errno = EINVAL; return -1;}
45 RETURNWERR(EINVAL, -1); 37 return tsk->callback(tsk->data);
46
47 mp->data = data;
48 return 0;
49} 38}
50 39
51 40int task_fired(task *tsk) {
52// thrd_create which calls mtx_lock/unlock on `arg` automatically 41 int retval = task_fire(tsk);
53int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) { 42 if(errno == EINVAL && retval == -1) {return -1;}
54 if(!thr) 43 task_free(tsk);
55 RETURNWERR(EINVAL, thrd_error);
56 if(!func)
57 RETURNWERR(EINVAL, thrd_error);
58 if(!mtxd)
59 RETURNWERR(EINVAL, thrd_error);
60
61 if(mtx_lock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);}
62 int retval = thrd_create(thr, func, mtxd->data);
63 if(mtx_unlock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);}
64
65 return retval; 44 return retval;
66} 45}
67 46
68 47
69/* Ok, after doing a little more research, the best way to do this is probaby via a producer/consumer architecture. Spawn a bunch of 48tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) {
70// threads waiting on a queue (via semaphore) and when one is notified pop a task of the queue and execute it. In this case, the 49 if(!tsk) {errno = EINVAL; return NULL;}
71// producer would be the filesystem scanner funciton providing new files to encrypt, and the consumers would be threads waiting 50 tqnode *node = calloc(1, sizeof(*node));
72// to encrypt them */ 51 if(!node)
73
74// Threadpool:
75 // Array of threads
76 // Task Queue
77 // Readiness semaphore / conditional
78 // Mutex
79 // Linked List of Tasks
80 // Task:
81 // int (*callback)(void*)
82 // void *arg
83
84// Consumer:
85 // Wait for cqueue to pop
86 // Fire task
87 // Repeat
88
89// Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342
90
91typedef struct task {
92 task_callback cb;
93 void *arg;
94} task;
95
96task * task_init(task_callback cb, void *arg) {
97 if(cb == NULL)
98 RETURNWERR(EINVAL, NULL);
99 task *task = VALLOC(1, sizeof(*task));
100 if(!task)
101 return NULL; 52 return NULL;
102 53
103 task->cb = cb; 54 node->next = next;
104 task->arg = arg; 55 node->prev = prev;
56 node->task = tsk;
105 57
106 return task; 58 return node;
107} 59}
108 60
109void task_free(void *ts) { 61void tqnode_free(void *tqn) {
110 if(!ts) 62 tqnode *real = (tqnode *)tqn;
63 if(!real)
111 return; 64 return;
112 65
113 free(ts); // Not making any assumptions about the data in the task 66 task_free(real->task);
67 free(real);
114 return; 68 return;
115} 69}
116 70
117int task_fire(task *ts) {
118 if(!ts)
119 RETURNWERR(EINVAL, -1);
120 if(ts->cb == NULL)
121 RETURNWERR(EINVAL, -1);
122
123 return ts->cb(ts->arg);
124}
125 71
126 72
127 73
128typedef struct cq { 74taskqueue * taskqueue_init(void) {
129 dlinkedlist *taskqueue; 75 taskqueue *tq = calloc(1, sizeof(*tq));
130 dlinkedlist *rthreads; 76 if(!tq)
77 return NULL;
131 78
132 mtx_t mtx; 79 tq->start = NULL;
133 cnd_t cnd; 80 tq->end = NULL;
134 81 tq->size = 0;
135 unsigned char canceled;
136} cqueue;
137 82
83 return tq;
84}
138 85
139static void ___ucleanup_mtxd(void *mtx) { 86void taskqueue_free(void *tq) {
140 if(!mtx) 87 if(!tq)
141 return; 88 return;
142 89
143 mtx_destroy((mtx_t*)mtx); 90 for(tqnode *p = ((taskqueue*)tq)->start, *n; p != NULL;) {
91 n = p->next;
92 tqnode_free(p);
93 p = n;
94 }
95 free(tq);
96
144 return; 97 return;
145} 98}
146 99
147static void ___ucleanup_cndd(void *cnd) { 100int taskqueue_handlefirst(taskqueue *tq, task *tsk) {
148 if(!cnd) 101 if(!tq || !tsk) {errno = EINVAL; return -1;}
149 return; 102 if(tq->size) {return 0;}
150 103
151 cnd_destroy((cnd_t *)cnd); 104 tqnode *first = tqnode_init(NULL, NULL, tsk);
152 return; 105 if(!first)
153} 106 return -1;
154 107
155cqueue * cqueue_init() { 108 tq->start = first;
156 cleanup_CREATE(10); 109 tq->end = first;
157 110 tq->size = 1;
158 // Create base object
159 cqueue *cq = VALLOC(1, sizeof(*cq));
160 if(!cq)
161 RETURNWERR(errno, NULL);
162 cleanup_REGISTER(free, cq);
163 cq->canceled = 0;
164
165 // Initialize the mutex
166 if(mtx_init(&cq->mtx, mtx_plain) != thrd_success)
167 cleanup_MARK();
168 cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx);
169
170 // Initialize the conditional
171 if(!cleanup_ERRORFLAGGED)
172 if(cnd_init(&cq->cnd) != thrd_success)
173 cleanup_MARK();
174 cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd);
175 111
176 // Create the taskqueue 112 return 1;
177 if(!cleanup_ERRORFLAGGED) 113}
178 if(!(cq->taskqueue = dlinkedlist_init()))
179 cleanup_MARK();
180 cleanup_CNDREGISTER(dlinkedlist_free, cq->taskqueue);
181 114
182 // Create the thread list 115int taskqueue_push(taskqueue *tq, task *tsk) {
183 if(!cleanup_ERRORFLAGGED) 116 if(!tq || !tsk) {errno = EINVAL; return -1;}
184 if(!(cq->rthreads = dlinkedlist_init()))
185 cleanup_MARK();
186 cleanup_CNDREGISTER(dlinkedlist_free, cq->rthreads);
187 117
188 if(cleanup_ERRORFLAGGED) 118 int hf;
189 cleanup_FIRE(); 119 if((hf = taskqueue_handlefirst(tq, tsk)))
120 return (hf >= 0) ? 0 : -1;
190 121
191 return cq; 122 tqnode *newstart = tqnode_init(tq->start, NULL, tsk);
123 if(!newstart)
124 return -1;
125 tq->start->prev = newstart;
126 tq->start = newstart;
127 tq->size++;
192 128
193 // Lambdas would make this a million times easier, as I could wrap this whole thing in a while loop then run a bunch of in-line 129 return 0;
194 // callbacks that do these operations and I wouldn't need this badness. That or I could use a goto, but I also hate that idea
195} 130}
196 131
197void cqueue_cancel(cqueue * const cq) { 132task * taskqueue_pop(taskqueue *tq) {
198 if(!cq) 133 if(!tq) {errno = EINVAL; return NULL;}
199 return; 134 if(tq->size <= 0) {errno = ENODATA; return NULL;}
200 135
201 mtx_lock(&cq->mtx); 136 tqnode *end = tq->end;
202 137 task *ret = end->task;
203 if(cq->canceled) { 138
204 mtx_unlock(&cq->mtx); 139 if(tq->size == 1) {
205 return; 140 tq->end = NULL;
141 tq->start = NULL;
142 } else {
143 tq->end = end->prev;
144 tq->end->next = NULL;
206 } 145 }
207 cq->canceled = 1;
208 146
209 mtx_unlock(&cq->mtx); 147 free(end);
210 cnd_broadcast(&cq->cnd); 148 tq->size--;
211 149 return ret;
212 return;
213} 150}
214 151
215static int ___cqueue_join(void *t) { 152int taskqueue_pushfront(taskqueue *tq, task *tsk) {
216 if(!t) 153 if(!tq || !tsk) {errno = EINVAL; return -1;}
154
155 int hf;
156 if((hf = taskqueue_handlefirst(tq, tsk)))
157 return (hf >= 0) ? 0 : -1;
158
159 tqnode *newend = tqnode_init(NULL, tq->end, tsk);
160 if(!newend)
217 return -1; 161 return -1;
162 tq->end->next = newend;
163 tq->end = newend;
164 tq->size++;
218 165
219 int retval = 0; 166 return 0;
220 thrd_join(*((thrd_t*)t), &retval);
221
222 return retval;
223} 167}
224 168
225void cqueue_free(void *cq) { 169task * taskqueue_popback(taskqueue *tq) {
226 if(!cq) 170 if(!tq) {errno = EINVAL; return NULL;}
227 return; 171 if(tq->size <= 0) {errno = ENODATA; return NULL;}
228 172
229 cqueue *real = (cqueue *)cq; 173 tqnode *start = tq->start;
174 task *ret = start->task;
230 175
231 // Cancel threads and wait for them to exit 176 if(tq->size == 1) {
232 cqueue_cancel(real); 177 tq->start = NULL;
233 dlinkedlist_foreach(real->rthreads, ___cqueue_join); 178 tq->end = NULL;
179 } else {
180 tq->start = start->next;
181 tq->start->prev = NULL;
182 }
234 183
235 // Threads are dead, no need to worry about concurrency anymore 184 free(start);
236 mtx_destroy(&real->mtx); 185 tq->size--;
237 cnd_destroy(&real->cnd); 186 return ret;
238 dlinkedlist_free(real->rthreads); 187}
239 dlinkedlist_free(real->taskqueue);
240 188
241 return; 189int taskqueue_size(taskqueue *tq) {
190 if(!tq) {errno = EINVAL; return -1;}
191 return tq->size;
242} 192}
243 193
244int cqueue_addtask(cqueue * const cq, task * const tsk) {
245 if(!cq || !tsk)
246 RETURNWERR(EINVAL, -1);
247 194
248 mtx_lock(&cq->mtx);
249
250 if(cq->canceled) {
251 mtx_unlock(&cq->mtx);
252 RETURNWERR(ECANCELED, -1);
253 }
254 195
255 dlinkedlist_prepend(cq->taskqueue, tsk, task_free); 196// Internal helper macro for ctq functions. Acquires a lock via the ctq's mutex, checks to see if the queue has been canceled, then executes "code" as written
256 mtx_unlock(&cq->mtx); 197#define __CTQ_INLOCK(ctq, retval, code) do {\
257 cnd_signal(&cq->cnd); 198 mtx_lock(&(ctq)->mutex); \
199 if((ctq)->canceled) { \
200 errno = ECANCELED; \
201 mtx_unlock(&(ctq)->mutex); \
202 return (retval); \
203 } \
204 \
205 code \
206 mtx_unlock(&(ctq)->mutex); \
207} while (0)
258 208
259 return 0; 209static void ___ucl_mtxdestroy(void *mtx) {
210 if(!mtx) return;
211 mtx_destroy((mtx_t *)mtx);
212 return;
260} 213}
261 214
262task * cqueue_waitpop(cqueue * const cq) { 215static void ___ucl_cnddestroy(void *cond) {
263 if(!cq) 216 if(cond) return;
264 RETURNWERR(EINVAL, NULL); 217 cnd_destroy((cnd_t *)cond);
218 return;
219}
265 220
266 task *tsk = NULL; 221ctqueue * ctqueue_init(int nthreads) {
267 int index = -1; 222 if(nthreads <= 0) {errno = EINVAL; return NULL;}
223 cleanup_CREATE(6);
268 224
269 mtx_lock(&cq->mtx); 225 ctqueue *ctq = calloc(1, sizeof(*ctq));
270 while(dlinkedlist_size(cq->taskqueue) <= 0 && !cq->canceled) 226 if(!ctq)
271 cnd_wait(&cq->cnd, &cq->mtx); 227 return NULL;
228 cleanup_REGISTER(free, ctq);
272 229
273 if(cq->canceled) { 230 ctq->canceled = 0;
274 mtx_unlock(&cq->mtx); 231 ctq->talen = nthreads;
275 thrd_exit(-1);
276 }
277 232
278 tsk = dlinkedlist_get(cq->taskqueue, (index = dlinkedlist_size(cq->taskqueue) - 1)); 233 cleanup_CNDEXEC(
279 dlinkedlist_remove(cq->taskqueue, index); 234 ctq->tq = taskqueue_init();
235 if(!ctq->tq)
236 cleanup_MARK();
237 cleanup_CNDREGISTER(taskqueue_free, ctq->tq);
238 );
280 239
281 mtx_unlock(&cq->mtx); 240 cleanup_CNDEXEC(
241 if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success)
242 cleanup_MARK();
243 cleanup_CNDREGISTER(___ucl_mtxdestroy, (void*)&ctq->mutex);
244 );
282 245
283 return tsk; 246 cleanup_CNDEXEC(
247 if(cnd_init(&ctq->cond) != thrd_success)
248 cleanup_MARK();
249 cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond);
250 );
251
252 cleanup_CNDEXEC(
253 ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t));
254 if(!ctq->thrdarr)
255 cleanup_MARK();
256 cleanup_CNDREGISTER(free, ctq->thrdarr);
257 )
258
259 cleanup_CNDFIRE();
260 if(cleanup_ERRORFLAGGED)
261 return NULL;
262
263 return ctq;
284} 264}
285 265
286static int consumer(void *cq) { 266int ctqueue_cancel(ctqueue *ctq) {
287 if(!cq) 267 if(!ctq) {errno = EINVAL; return -1;}
288 thrd_exit(-1);
289 268
290 cqueue *real = (cqueue *)cq; 269 __CTQ_INLOCK(ctq, 1,
291 for(task *ctask;;) { 270 ctq->canceled = 1;
292 ctask = cqueue_waitpop(real); 271 );
293 if(!ctask) 272 cnd_broadcast(&ctq->cond);
294 task_fire(ctask);
295 }
296 273
297 thrd_exit(0); 274 return 0;
298} 275}
299 276
300int cqueue_registerthreads(cqueue * const cq, int threads) { 277void ctqueue_free(void *ctq) {
301 if(!cq || threads <= 0) 278 if(!ctq)
302 RETURNWERR(EINVAL, -1); 279 return;
303 280
304 mtx_lock(&cq->mtx); 281 ctqueue *real = (ctqueue *)ctq;
305 if(cq->canceled) { 282 ctqueue_cancel(real);
306 mtx_unlock(&cq->mtx);
307 RETURNWERR(ECANCELED, -1);
308 }
309 283
310 thrd_t *newthreads[threads]; 284 for(int i = 0; i < real->talen; i++)
311 for(int i = 0; i < threads; i++) { 285 thrd_join(real->thrdarr[i], NULL);
312 newthreads[i] = VALLOC(1, sizeof(thrd_t));
313 if(!newthreads[i]) {
314 for(int j = 0; j < i; j++)
315 free(newthreads[j]);
316 286
317 return -1; 287 // Threads are dead, everything's free game
318 } 288 mtx_destroy(&real->mutex);
289 cnd_destroy(&real->cond);
290 taskqueue_free(real->tq);
291 free(real->thrdarr);
292 free(real);
319 293
320 dlinkedlist_prepend(cq->rthreads, newthreads[i], free); 294 // TODO: figure out if it's necessary / a good idea to do error handling on these functions
321 thrd_create(newthreads[i], consumer, cq);
322 }
323 295
324 mtx_unlock(&cq->mtx); 296 return;
297}
325 298
326 return 0; 299int ctqueue_waitpush(ctqueue *ctq, task *tsk) {
300 if(!ctq || !tsk) {errno = EINVAL; return -1;}
301 int retval = 0;
302
303 __CTQ_INLOCK(ctq, -1,
304 retval = taskqueue_push(ctq->tq, tsk);
305 );
306 if(retval == 0)
307 cnd_signal(&ctq->cond);
308
309 return retval;
327} 310}
328 311
329int cqueue_registerthread(cqueue * const cq) { 312task * ctqueue_waitpop(ctqueue *ctq) {
330 return cqueue_registerthreads(cq, 1); 313 if(!ctq) {errno = EINVAL; return NULL;}
314 task *retval = NULL;
315
316 __CTQ_INLOCK(ctq, NULL,
317 while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled)
318 cnd_wait(&ctq->cond, &ctq->mutex);
319
320 if(ctq->canceled) {
321 errno = ECANCELED;
322 mtx_unlock(&ctq->mutex);
323 return NULL;
324 }
325
326 retval = taskqueue_pop(ctq->tq);
327 );
328
329 return retval;
331} 330}
332 331
333enum __CQUEUE_STAT_OPTIONS { 332static int __CTQ_CONSUMER(void *ctq) {
334 __CQUEUE_STAT_NOTDEF, 333 if(!ctq) {errno = EINVAL; thrd_exit(-1);}
335 334 ctqueue *real = (ctqueue *)ctq;
336 __CQUEUE_CANCELED, 335
337 __CQUEUE_THREADS_NUM, 336 for(task *ctask = NULL;;) {
338 __CQUEUE_TASKS_NUM, 337 ctask = ctqueue_waitpop(real);
339 338 if(!ctask)
340 __CQUEUE_STAT_TOOBIG, 339 break;
341}; 340
342 341 task_fire(ctask);
343int cqueue_getstat(cqueue * const cq, enum __CQUEUE_STAT_OPTIONS opt) { 342 task_free(ctask);
344 if(!cq || opt <= __CQUEUE_STAT_NOTDEF || opt >= __CQUEUE_STAT_TOOBIG)
345 RETURNWERR(EINVAL, -1);
346
347 int retval = -1;
348 mtx_lock(&cq->mtx);
349
350 switch(opt) {
351 case __CQUEUE_CANCELED:
352 retval = cq->canceled;
353 mtx_unlock(&cq->mtx);
354 return retval;
355 break;
356
357 case __CQUEUE_THREADS_NUM:
358 retval = dlinkedlist_size(cq->rthreads);
359 mtx_unlock(&cq->mtx);
360 return retval;
361 break;
362
363 case __CQUEUE_TASKS_NUM:
364 retval = dlinkedlist_size(cq->taskqueue);
365 mtx_unlock(&cq->mtx);
366 return retval;
367 break;
368
369 default:
370 RETURNWERR(EINVAL, -1);
371 break;
372 } 343 }
373 344
374 // This should absolutely never run 345 thrd_exit(1); // non-zero indicates error, -1 indicates invalid argument
375 RETURNWERR(ENOTRECOVERABLE, -1);
376} 346}
377 347
378int cqueue_iscanceled(cqueue * const cq) { 348int ctqueue_start(ctqueue *ctq) {
379 return cqueue_getstat(cq, __CQUEUE_CANCELED); 349 if(!ctq) {errno = EINVAL; return -1;}
380} 350
381int cqueue_numthreads(cqueue * const cq) { 351 ctq->canceled = 0;
382 return cqueue_getstat(cq, __CQUEUE_THREADS_NUM); 352
383} 353 int retval = 0;
384int cqueue_numtasks(cqueue * const cq) { 354 for(int i = 0; i < ctq->talen; i++)
385 return cqueue_getstat(cq, __CQUEUE_TASKS_NUM); 355 if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success)
386} 356 break;
357
358 if(retval != thrd_success)
359 ctqueue_cancel(ctq);
360
361 return (retval == thrd_success) ? 0 : -1;
362} \ No newline at end of file