From 60715c7785e7fbe4759df64258ad58a8e2a0769d Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Fri, 16 May 2025 19:09:02 -0500 Subject: Do some more work on the concurrent queue --- src/shared.h | 9 ++- src/threadpool.c | 171 ++++++++++++++++++++++++++++++++++++++++++------------- src/threadpool.h | 10 +++- 3 files changed, 147 insertions(+), 43 deletions(-) diff --git a/src/shared.h b/src/shared.h index 825814e..66d1af3 100644 --- a/src/shared.h +++ b/src/shared.h @@ -7,6 +7,9 @@ #define FALSE 0 #define TRUE 1 +typedef int (*gcallback)(void*); // Generic callback signature +typedef void (*fcallback)(void*); // free()-like callback signature + #define RETURNWERR(errval, retval) do {\ errno = (errval);\ return (retval);\ @@ -60,12 +63,12 @@ char * xdirname(const char * const path); // Cleanup callback. Should act like `free()`, in that it doesn't crash if the pointer it's given is null -typedef void (*cleanup_callback)(void*); +typedef fcallback cleanup_callback; // Cleanup struct. Stores a STATICALLY defined array of callbacks and void* arguments typedef struct cl { - cleanup_callback *funcs; - void **args; + cleanup_callback *funcs; // Actual type: cleanup_callback funcs[] + void **args; // Actual type: void *args[] int size; int used; diff --git a/src/threadpool.c b/src/threadpool.c index 0baa024..6912790 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -3,6 +3,8 @@ #include "ll.h" +#include +#include #include #include #include @@ -104,7 +106,7 @@ task * task_init(task_callback cb, void *arg) { return task; } -void task_free(task *ts) { +void task_free(void *ts) { if(!ts) return; @@ -215,8 +217,7 @@ static int ___cqueue_join(void *t) { return -1; int retval = 0; - thrd_t thread = *((thrd_t*)t); - thrd_join(thread, &retval); + thrd_join(*((thrd_t*)t), &retval); return retval; } @@ -240,52 +241,146 @@ void cqueue_free(void *cq) { return; } -// int cqueue_addtask(cqueue * const cq, task * const tsk) { -// if(!cq || !tsk) -// RETURNWERR(EINVAL, -1); +int cqueue_addtask(cqueue * const cq, task * const tsk) { + if(!cq || !tsk) + RETURNWERR(EINVAL, -1); -// mtx_lock(cq->mtx); + mtx_lock(&cq->mtx); -// // TODO: Think about creating an "exception" via signal handling -// if(cq->canceled) { -// mtx_unlock(cq->mtx); -// thrd_exit(-1); -// } + if(cq->canceled) { + mtx_unlock(&cq->mtx); + RETURNWERR(ECANCELED, -1); + } -// dlinkedlist_prepend(cq->list, tsk, free); -// mtx_unlock(cq->mtx); -// cnd_signal(cq->cnd); + dlinkedlist_prepend(cq->taskqueue, tsk, task_free); + mtx_unlock(&cq->mtx); + cnd_signal(&cq->cnd); -// return 0; -// } + return 0; +} -// task * cqueue_waitpop(cqueue * const cq) { -// if(!cq) -// RETURNWERR(EINVAL, NULL); +task * cqueue_waitpop(cqueue * const cq) { + if(!cq) + RETURNWERR(EINVAL, NULL); -// task *retval = NULL; + task *tsk = NULL; + int index = -1; -// mtx_lock(cq->mtx); -// while(dlinkedlist_isempty(cq->list) && !cq->canceled) -// cnd_wait(cq->cnd, cq->mtx); + mtx_lock(&cq->mtx); + while(dlinkedlist_size(cq->taskqueue) <= 0 && !cq->canceled) + cnd_wait(&cq->cnd, &cq->mtx); -// if(cq->canceled) { -// mtx_unlock(cq->mtx); -// thrd_exit(-1); -// } + if(cq->canceled) { + mtx_unlock(&cq->mtx); + thrd_exit(-1); + } -// retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); -// dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); -// mtx_unlock(cq->mtx); + tsk = dlinkedlist_get(cq->taskqueue, (index = dlinkedlist_size(cq->taskqueue) - 1)); + dlinkedlist_remove(cq->taskqueue, index); -// return retval; -// } + mtx_unlock(&cq->mtx); + return tsk; +} + +static int consumer(void *cq) { + if(!cq) + thrd_exit(-1); + + cqueue *real = (cqueue *)cq; + for(task *ctask;;) { + ctask = cqueue_waitpop(real); + if(!ctask) + task_fire(ctask); + } + + thrd_exit(0); +} + +int cqueue_registerthreads(cqueue * const cq, int threads) { + if(!cq || threads <= 0) + RETURNWERR(EINVAL, -1); + + mtx_lock(&cq->mtx); + if(cq->canceled) { + mtx_unlock(&cq->mtx); + RETURNWERR(ECANCELED, -1); + } + thrd_t *newthreads[threads]; + for(int i = 0; i < threads; i++) { + newthreads[i] = VALLOC(1, sizeof(thrd_t)); + if(!newthreads[i]) { + for(int j = 0; j < i; j++) + free(newthreads[j]); -typedef struct tp { - thrd_t **threads; // thrd_t *threads[] - int nthreads; + return -1; + } - cqueue *taskqueue; -} threadpool; + dlinkedlist_prepend(cq->rthreads, newthreads[i], free); + thrd_create(newthreads[i], consumer, cq); + } + + mtx_unlock(&cq->mtx); + + return 0; +} + +int cqueue_registerthread(cqueue * const cq) { + return cqueue_registerthreads(cq, 1); +} + +enum __CQUEUE_STAT_OPTIONS { + __CQUEUE_STAT_NOTDEF, + + __CQUEUE_CANCELED, + __CQUEUE_THREADS_NUM, + __CQUEUE_TASKS_NUM, + + __CQUEUE_STAT_TOOBIG, +}; + +int cqueue_getstat(cqueue * const cq, enum __CQUEUE_STAT_OPTIONS opt) { + if(!cq || opt <= __CQUEUE_STAT_NOTDEF || opt >= __CQUEUE_STAT_TOOBIG) + RETURNWERR(EINVAL, -1); + + int retval = -1; + mtx_lock(&cq->mtx); + + switch(opt) { + case __CQUEUE_CANCELED: + retval = cq->canceled; + mtx_unlock(&cq->mtx); + return retval; + break; + + case __CQUEUE_THREADS_NUM: + retval = dlinkedlist_size(cq->rthreads); + mtx_unlock(&cq->mtx); + return retval; + break; + + case __CQUEUE_TASKS_NUM: + retval = dlinkedlist_size(cq->taskqueue); + mtx_unlock(&cq->mtx); + return retval; + break; + + default: + RETURNWERR(EINVAL, -1); + break; + } + + // This should absolutely never run + RETURNWERR(ENOTRECOVERABLE, -1); +} + +int cqueue_iscanceled(cqueue * const cq) { + return cqueue_getstat(cq, __CQUEUE_CANCELED); +} +int cqueue_numthreads(cqueue * const cq) { + return cqueue_getstat(cq, __CQUEUE_THREADS_NUM); +} +int cqueue_numtasks(cqueue * const cq) { + return cqueue_getstat(cq, __CQUEUE_TASKS_NUM); +} diff --git a/src/threadpool.h b/src/threadpool.h index db6fa2e..d7b713c 100644 --- a/src/threadpool.h +++ b/src/threadpool.h @@ -9,12 +9,18 @@ typedef struct cq cqueue; typedef struct tp threadpool; task * task_init(task_callback cb, void *arg); -void task_free(task *ts); +void task_free(void *ts); int task_fire(task *ts); cqueue * cqueue_init(); void cqueue_cancel(cqueue * const cq); void cqueue_free(void *cq); - +int cqueue_addtask(cqueue * const cq, task * const tsk); +task * cqueue_waitpop(cqueue * const cq); +int cqueue_registerthreads(cqueue * const cq, int threads); +int cqueue_registerthread(cqueue * const cq); +int cqueue_iscanceled(cqueue * const cq); +int cqueue_numthreads(cqueue * const cq); +int cqueue_numtasks(cqueue * const cq); #endif \ No newline at end of file -- cgit v1.2.3