From 60715c7785e7fbe4759df64258ad58a8e2a0769d Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Fri, 16 May 2025 19:09:02 -0500 Subject: Do some more work on the concurrent queue --- src/threadpool.c | 171 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 133 insertions(+), 38 deletions(-) (limited to 'src/threadpool.c') diff --git a/src/threadpool.c b/src/threadpool.c index 0baa024..6912790 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -3,6 +3,8 @@ #include "ll.h" +#include +#include #include #include #include @@ -104,7 +106,7 @@ task * task_init(task_callback cb, void *arg) { return task; } -void task_free(task *ts) { +void task_free(void *ts) { if(!ts) return; @@ -215,8 +217,7 @@ static int ___cqueue_join(void *t) { return -1; int retval = 0; - thrd_t thread = *((thrd_t*)t); - thrd_join(thread, &retval); + thrd_join(*((thrd_t*)t), &retval); return retval; } @@ -240,52 +241,146 @@ void cqueue_free(void *cq) { return; } -// int cqueue_addtask(cqueue * const cq, task * const tsk) { -// if(!cq || !tsk) -// RETURNWERR(EINVAL, -1); +int cqueue_addtask(cqueue * const cq, task * const tsk) { + if(!cq || !tsk) + RETURNWERR(EINVAL, -1); -// mtx_lock(cq->mtx); + mtx_lock(&cq->mtx); -// // TODO: Think about creating an "exception" via signal handling -// if(cq->canceled) { -// mtx_unlock(cq->mtx); -// thrd_exit(-1); -// } + if(cq->canceled) { + mtx_unlock(&cq->mtx); + RETURNWERR(ECANCELED, -1); + } -// dlinkedlist_prepend(cq->list, tsk, free); -// mtx_unlock(cq->mtx); -// cnd_signal(cq->cnd); + dlinkedlist_prepend(cq->taskqueue, tsk, task_free); + mtx_unlock(&cq->mtx); + cnd_signal(&cq->cnd); -// return 0; -// } + return 0; +} -// task * cqueue_waitpop(cqueue * const cq) { -// if(!cq) -// RETURNWERR(EINVAL, NULL); +task * cqueue_waitpop(cqueue * const cq) { + if(!cq) + RETURNWERR(EINVAL, NULL); -// task *retval = NULL; + task *tsk = NULL; + int index = -1; -// mtx_lock(cq->mtx); -// while(dlinkedlist_isempty(cq->list) && !cq->canceled) -// cnd_wait(cq->cnd, cq->mtx); + mtx_lock(&cq->mtx); + while(dlinkedlist_size(cq->taskqueue) <= 0 && !cq->canceled) + cnd_wait(&cq->cnd, &cq->mtx); -// if(cq->canceled) { -// mtx_unlock(cq->mtx); -// thrd_exit(-1); -// } + if(cq->canceled) { + mtx_unlock(&cq->mtx); + thrd_exit(-1); + } -// retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); -// dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); -// mtx_unlock(cq->mtx); + tsk = dlinkedlist_get(cq->taskqueue, (index = dlinkedlist_size(cq->taskqueue) - 1)); + dlinkedlist_remove(cq->taskqueue, index); -// return retval; -// } + mtx_unlock(&cq->mtx); + return tsk; +} + +static int consumer(void *cq) { + if(!cq) + thrd_exit(-1); + + cqueue *real = (cqueue *)cq; + for(task *ctask;;) { + ctask = cqueue_waitpop(real); + if(!ctask) + task_fire(ctask); + } + + thrd_exit(0); +} + +int cqueue_registerthreads(cqueue * const cq, int threads) { + if(!cq || threads <= 0) + RETURNWERR(EINVAL, -1); + + mtx_lock(&cq->mtx); + if(cq->canceled) { + mtx_unlock(&cq->mtx); + RETURNWERR(ECANCELED, -1); + } + thrd_t *newthreads[threads]; + for(int i = 0; i < threads; i++) { + newthreads[i] = VALLOC(1, sizeof(thrd_t)); + if(!newthreads[i]) { + for(int j = 0; j < i; j++) + free(newthreads[j]); -typedef struct tp { - thrd_t **threads; // thrd_t *threads[] - int nthreads; + return -1; + } - cqueue *taskqueue; -} threadpool; + dlinkedlist_prepend(cq->rthreads, newthreads[i], free); + thrd_create(newthreads[i], consumer, cq); + } + + mtx_unlock(&cq->mtx); + + return 0; +} + +int cqueue_registerthread(cqueue * const cq) { + return cqueue_registerthreads(cq, 1); +} + +enum __CQUEUE_STAT_OPTIONS { + __CQUEUE_STAT_NOTDEF, + + __CQUEUE_CANCELED, + __CQUEUE_THREADS_NUM, + __CQUEUE_TASKS_NUM, + + __CQUEUE_STAT_TOOBIG, +}; + +int cqueue_getstat(cqueue * const cq, enum __CQUEUE_STAT_OPTIONS opt) { + if(!cq || opt <= __CQUEUE_STAT_NOTDEF || opt >= __CQUEUE_STAT_TOOBIG) + RETURNWERR(EINVAL, -1); + + int retval = -1; + mtx_lock(&cq->mtx); + + switch(opt) { + case __CQUEUE_CANCELED: + retval = cq->canceled; + mtx_unlock(&cq->mtx); + return retval; + break; + + case __CQUEUE_THREADS_NUM: + retval = dlinkedlist_size(cq->rthreads); + mtx_unlock(&cq->mtx); + return retval; + break; + + case __CQUEUE_TASKS_NUM: + retval = dlinkedlist_size(cq->taskqueue); + mtx_unlock(&cq->mtx); + return retval; + break; + + default: + RETURNWERR(EINVAL, -1); + break; + } + + // This should absolutely never run + RETURNWERR(ENOTRECOVERABLE, -1); +} + +int cqueue_iscanceled(cqueue * const cq) { + return cqueue_getstat(cq, __CQUEUE_CANCELED); +} +int cqueue_numthreads(cqueue * const cq) { + return cqueue_getstat(cq, __CQUEUE_THREADS_NUM); +} +int cqueue_numtasks(cqueue * const cq) { + return cqueue_getstat(cq, __CQUEUE_TASKS_NUM); +} -- cgit v1.2.3