summaryrefslogtreecommitdiff
path: root/src/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/threadpool.c')
-rw-r--r--src/threadpool.c31
1 files changed, 29 insertions, 2 deletions
diff --git a/src/threadpool.c b/src/threadpool.c
index 66c0d06..31c300c 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -310,7 +310,7 @@ int cqueue_trypop(cqueue * const cq, task **ret) {
310} 310}
311 311
312int cqueue_waitpop(cqueue * const cq, task **ret) { 312int cqueue_waitpop(cqueue * const cq, task **ret) {
313 if(!cq || !ret || !*ret) 313 if(!cq || !ret)
314 RETURNWERR(EINVAL, -1); 314 RETURNWERR(EINVAL, -1);
315 315
316 mtx_lock(cq->mutex); 316 mtx_lock(cq->mutex);
@@ -348,6 +348,23 @@ int cqueue_cancel(cqueue * const cq) {
348 return retval; 348 return retval;
349} 349}
350 350
351int cqueue_consumer(void *passed) {
352 if(!passed)
353 thrd_exit(thrd_error);
354 // Not setting errno because then I'd have to make a mutex for it
355
356 cqueue *cq = (cqueue *)passed;
357
358 for(task *current_task;;) {
359 cqueue_waitpop(cq, &current_task);
360 if(!current_task)
361 thrd_exit(thrd_error);
362
363 current_task->cb(current_task->arg);
364 }
365
366 thrd_exit(thrd_success);
367}
351 368
352static void ___ucleanup_cqfree(void *cq) { 369static void ___ucleanup_cqfree(void *cq) {
353 if(!cq) 370 if(!cq)
@@ -384,6 +401,10 @@ threadpool * threadpool_init(int threads) {
384 for(int j = 0; j < i; j++) 401 for(int j = 0; j < i; j++)
385 free(tp->threads[j]); 402 free(tp->threads[j]);
386 } 403 }
404
405 if(!cleanup_ERRORFLAGGED)
406 thrd_create(tp->threads[i], cqueue_consumer, tp->taskqueue);
407 // TODO: Error Checking ^
387 } 408 }
388 409
389 if(cleanup_ERRORFLAGGED) 410 if(cleanup_ERRORFLAGGED)
@@ -404,8 +425,14 @@ void threadpool_free(threadpool *tp) {
404 free(tp->threads[i]); 425 free(tp->threads[i]);
405 } 426 }
406 free(tp->threads); 427 free(tp->threads);
407 cqueue_free(tp->taskqueue);
408 free(tp); 428 free(tp);
409 429
410 return; 430 return;
431}
432
433int threadpool_addtask(threadpool * const tp, task * const task) {
434 if(!tp || !task)
435 RETURNWERR(EINVAL, -1);
436
437 return cqueue_append(tp->taskqueue, task);
411} \ No newline at end of file 438} \ No newline at end of file