From 16528ac295215e788cb226f0cc49f11f82919741 Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Fri, 6 Jun 2025 13:30:34 -0500 Subject: Get threadpool implementation working --- src/threadpool.c | 550 ++++++++++++++++++++++++++----------------------------- 1 file changed, 263 insertions(+), 287 deletions(-) (limited to 'src/threadpool.c') diff --git a/src/threadpool.c b/src/threadpool.c index 6912790..c4d8a5c 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -1,386 +1,362 @@ #include "threadpool.h" -#include "shared.h" -#include "ll.h" - -#include -#include #include #include #include +#include + -// Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) -typedef struct mtxp { - void *data; - mtx_t mtx; -} mtxpair; +task * task_init(gcallback callback, fcallback freecb, void *data) { + if(callback == NULL) {errno = EINVAL; return NULL;} -mtxpair * mtxpair_init(void * const data, int type) { - mtxpair *mtxp = VALLOC(1, sizeof(*mtxp)); - if(!mtxp) + task *tsk = calloc(1, sizeof(*tsk)); + if(!tsk) return NULL; - // Init the mutex - if(mtx_init(&mtxp->mtx, type) == thrd_error) { - free(mtxp); - RETURNWERR(errno, NULL); - } + tsk->callback = callback; + tsk->freecb = freecb; + tsk->data = data; - mtxp->data = data; - return mtxp; + return tsk; } -void mtxpair_free(mtxpair *mp) { - if(!mp) +void task_free(void *tsk) { + task *real = (task *)tsk; + if(!real) return; - - mtx_destroy(&mp->mtx); - free(mp); + + if(real->freecb != NULL) + real->freecb(real->data); + free(real); return; } -int mtxpair_setdata(mtxpair * const mp, void * const data) { - if(!mp) - RETURNWERR(EINVAL, -1); - - mp->data = data; - return 0; +int task_fire(task *tsk) { + if(!tsk) {errno = EINVAL; return -1;} + return tsk->callback(tsk->data); } - -// thrd_create which calls mtx_lock/unlock on `arg` automatically -int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) { - if(!thr) - RETURNWERR(EINVAL, thrd_error); - if(!func) - RETURNWERR(EINVAL, thrd_error); - if(!mtxd) - RETURNWERR(EINVAL, thrd_error); - - if(mtx_lock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} - int retval = thrd_create(thr, func, mtxd->data); - if(mtx_unlock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} - +int task_fired(task *tsk) { + int retval = task_fire(tsk); + if(errno == EINVAL && retval == -1) {return -1;} + task_free(tsk); return retval; } -/* Ok, after doing a little more research, the best way to do this is probaby via a producer/consumer architecture. Spawn a bunch of -// threads waiting on a queue (via semaphore) and when one is notified pop a task of the queue and execute it. In this case, the -// producer would be the filesystem scanner funciton providing new files to encrypt, and the consumers would be threads waiting -// to encrypt them */ - -// Threadpool: - // Array of threads - // Task Queue - // Readiness semaphore / conditional - // Mutex - // Linked List of Tasks - // Task: - // int (*callback)(void*) - // void *arg - -// Consumer: - // Wait for cqueue to pop - // Fire task - // Repeat - -// Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 - -typedef struct task { - task_callback cb; - void *arg; -} task; - -task * task_init(task_callback cb, void *arg) { - if(cb == NULL) - RETURNWERR(EINVAL, NULL); - task *task = VALLOC(1, sizeof(*task)); - if(!task) +tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) { + if(!tsk) {errno = EINVAL; return NULL;} + tqnode *node = calloc(1, sizeof(*node)); + if(!node) return NULL; - task->cb = cb; - task->arg = arg; + node->next = next; + node->prev = prev; + node->task = tsk; - return task; + return node; } -void task_free(void *ts) { - if(!ts) +void tqnode_free(void *tqn) { + tqnode *real = (tqnode *)tqn; + if(!real) return; - free(ts); // Not making any assumptions about the data in the task + task_free(real->task); + free(real); return; } -int task_fire(task *ts) { - if(!ts) - RETURNWERR(EINVAL, -1); - if(ts->cb == NULL) - RETURNWERR(EINVAL, -1); - - return ts->cb(ts->arg); -} -typedef struct cq { - dlinkedlist *taskqueue; - dlinkedlist *rthreads; +taskqueue * taskqueue_init(void) { + taskqueue *tq = calloc(1, sizeof(*tq)); + if(!tq) + return NULL; - mtx_t mtx; - cnd_t cnd; - - unsigned char canceled; -} cqueue; + tq->start = NULL; + tq->end = NULL; + tq->size = 0; + return tq; +} -static void ___ucleanup_mtxd(void *mtx) { - if(!mtx) +void taskqueue_free(void *tq) { + if(!tq) return; - mtx_destroy((mtx_t*)mtx); + for(tqnode *p = ((taskqueue*)tq)->start, *n; p != NULL;) { + n = p->next; + tqnode_free(p); + p = n; + } + free(tq); + return; } -static void ___ucleanup_cndd(void *cnd) { - if(!cnd) - return; +int taskqueue_handlefirst(taskqueue *tq, task *tsk) { + if(!tq || !tsk) {errno = EINVAL; return -1;} + if(tq->size) {return 0;} - cnd_destroy((cnd_t *)cnd); - return; -} + tqnode *first = tqnode_init(NULL, NULL, tsk); + if(!first) + return -1; -cqueue * cqueue_init() { - cleanup_CREATE(10); - - // Create base object - cqueue *cq = VALLOC(1, sizeof(*cq)); - if(!cq) - RETURNWERR(errno, NULL); - cleanup_REGISTER(free, cq); - cq->canceled = 0; - - // Initialize the mutex - if(mtx_init(&cq->mtx, mtx_plain) != thrd_success) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx); - - // Initialize the conditional - if(!cleanup_ERRORFLAGGED) - if(cnd_init(&cq->cnd) != thrd_success) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd); + tq->start = first; + tq->end = first; + tq->size = 1; - // Create the taskqueue - if(!cleanup_ERRORFLAGGED) - if(!(cq->taskqueue = dlinkedlist_init())) - cleanup_MARK(); - cleanup_CNDREGISTER(dlinkedlist_free, cq->taskqueue); + return 1; +} - // Create the thread list - if(!cleanup_ERRORFLAGGED) - if(!(cq->rthreads = dlinkedlist_init())) - cleanup_MARK(); - cleanup_CNDREGISTER(dlinkedlist_free, cq->rthreads); +int taskqueue_push(taskqueue *tq, task *tsk) { + if(!tq || !tsk) {errno = EINVAL; return -1;} - if(cleanup_ERRORFLAGGED) - cleanup_FIRE(); + int hf; + if((hf = taskqueue_handlefirst(tq, tsk))) + return (hf >= 0) ? 0 : -1; - return cq; + tqnode *newstart = tqnode_init(tq->start, NULL, tsk); + if(!newstart) + return -1; + tq->start->prev = newstart; + tq->start = newstart; + tq->size++; - // Lambdas would make this a million times easier, as I could wrap this whole thing in a while loop then run a bunch of in-line - // callbacks that do these operations and I wouldn't need this badness. That or I could use a goto, but I also hate that idea + return 0; } -void cqueue_cancel(cqueue * const cq) { - if(!cq) - return; +task * taskqueue_pop(taskqueue *tq) { + if(!tq) {errno = EINVAL; return NULL;} + if(tq->size <= 0) {errno = ENODATA; return NULL;} - mtx_lock(&cq->mtx); - - if(cq->canceled) { - mtx_unlock(&cq->mtx); - return; + tqnode *end = tq->end; + task *ret = end->task; + + if(tq->size == 1) { + tq->end = NULL; + tq->start = NULL; + } else { + tq->end = end->prev; + tq->end->next = NULL; } - cq->canceled = 1; - mtx_unlock(&cq->mtx); - cnd_broadcast(&cq->cnd); - - return; + free(end); + tq->size--; + return ret; } -static int ___cqueue_join(void *t) { - if(!t) +int taskqueue_pushfront(taskqueue *tq, task *tsk) { + if(!tq || !tsk) {errno = EINVAL; return -1;} + + int hf; + if((hf = taskqueue_handlefirst(tq, tsk))) + return (hf >= 0) ? 0 : -1; + + tqnode *newend = tqnode_init(NULL, tq->end, tsk); + if(!newend) return -1; + tq->end->next = newend; + tq->end = newend; + tq->size++; - int retval = 0; - thrd_join(*((thrd_t*)t), &retval); - - return retval; + return 0; } -void cqueue_free(void *cq) { - if(!cq) - return; +task * taskqueue_popback(taskqueue *tq) { + if(!tq) {errno = EINVAL; return NULL;} + if(tq->size <= 0) {errno = ENODATA; return NULL;} - cqueue *real = (cqueue *)cq; + tqnode *start = tq->start; + task *ret = start->task; - // Cancel threads and wait for them to exit - cqueue_cancel(real); - dlinkedlist_foreach(real->rthreads, ___cqueue_join); + if(tq->size == 1) { + tq->start = NULL; + tq->end = NULL; + } else { + tq->start = start->next; + tq->start->prev = NULL; + } - // Threads are dead, no need to worry about concurrency anymore - mtx_destroy(&real->mtx); - cnd_destroy(&real->cnd); - dlinkedlist_free(real->rthreads); - dlinkedlist_free(real->taskqueue); + free(start); + tq->size--; + return ret; +} - return; +int taskqueue_size(taskqueue *tq) { + if(!tq) {errno = EINVAL; return -1;} + return tq->size; } -int cqueue_addtask(cqueue * const cq, task * const tsk) { - if(!cq || !tsk) - RETURNWERR(EINVAL, -1); - mtx_lock(&cq->mtx); - - if(cq->canceled) { - mtx_unlock(&cq->mtx); - RETURNWERR(ECANCELED, -1); - } - dlinkedlist_prepend(cq->taskqueue, tsk, task_free); - mtx_unlock(&cq->mtx); - cnd_signal(&cq->cnd); +// Internal helper macro for ctq functions. Acquires a lock via the ctq's mutex, checks to see if the queue has been canceled, then executes "code" as written +#define __CTQ_INLOCK(ctq, retval, code) do {\ + mtx_lock(&(ctq)->mutex); \ + if((ctq)->canceled) { \ + errno = ECANCELED; \ + mtx_unlock(&(ctq)->mutex); \ + return (retval); \ + } \ + \ + code \ + mtx_unlock(&(ctq)->mutex); \ +} while (0) - return 0; +static void ___ucl_mtxdestroy(void *mtx) { + if(!mtx) return; + mtx_destroy((mtx_t *)mtx); + return; } -task * cqueue_waitpop(cqueue * const cq) { - if(!cq) - RETURNWERR(EINVAL, NULL); +static void ___ucl_cnddestroy(void *cond) { + if(cond) return; + cnd_destroy((cnd_t *)cond); + return; +} - task *tsk = NULL; - int index = -1; +ctqueue * ctqueue_init(int nthreads) { + if(nthreads <= 0) {errno = EINVAL; return NULL;} + cleanup_CREATE(6); - mtx_lock(&cq->mtx); - while(dlinkedlist_size(cq->taskqueue) <= 0 && !cq->canceled) - cnd_wait(&cq->cnd, &cq->mtx); + ctqueue *ctq = calloc(1, sizeof(*ctq)); + if(!ctq) + return NULL; + cleanup_REGISTER(free, ctq); - if(cq->canceled) { - mtx_unlock(&cq->mtx); - thrd_exit(-1); - } + ctq->canceled = 0; + ctq->talen = nthreads; - tsk = dlinkedlist_get(cq->taskqueue, (index = dlinkedlist_size(cq->taskqueue) - 1)); - dlinkedlist_remove(cq->taskqueue, index); + cleanup_CNDEXEC( + ctq->tq = taskqueue_init(); + if(!ctq->tq) + cleanup_MARK(); + cleanup_CNDREGISTER(taskqueue_free, ctq->tq); + ); - mtx_unlock(&cq->mtx); + cleanup_CNDEXEC( + if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucl_mtxdestroy, (void*)&ctq->mutex); + ); - return tsk; + cleanup_CNDEXEC( + if(cnd_init(&ctq->cond) != thrd_success) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond); + ); + + cleanup_CNDEXEC( + ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); + if(!ctq->thrdarr) + cleanup_MARK(); + cleanup_CNDREGISTER(free, ctq->thrdarr); + ) + + cleanup_CNDFIRE(); + if(cleanup_ERRORFLAGGED) + return NULL; + + return ctq; } -static int consumer(void *cq) { - if(!cq) - thrd_exit(-1); +int ctqueue_cancel(ctqueue *ctq) { + if(!ctq) {errno = EINVAL; return -1;} - cqueue *real = (cqueue *)cq; - for(task *ctask;;) { - ctask = cqueue_waitpop(real); - if(!ctask) - task_fire(ctask); - } + __CTQ_INLOCK(ctq, 1, + ctq->canceled = 1; + ); + cnd_broadcast(&ctq->cond); - thrd_exit(0); + return 0; } -int cqueue_registerthreads(cqueue * const cq, int threads) { - if(!cq || threads <= 0) - RETURNWERR(EINVAL, -1); +void ctqueue_free(void *ctq) { + if(!ctq) + return; - mtx_lock(&cq->mtx); - if(cq->canceled) { - mtx_unlock(&cq->mtx); - RETURNWERR(ECANCELED, -1); - } + ctqueue *real = (ctqueue *)ctq; + ctqueue_cancel(real); - thrd_t *newthreads[threads]; - for(int i = 0; i < threads; i++) { - newthreads[i] = VALLOC(1, sizeof(thrd_t)); - if(!newthreads[i]) { - for(int j = 0; j < i; j++) - free(newthreads[j]); + for(int i = 0; i < real->talen; i++) + thrd_join(real->thrdarr[i], NULL); - return -1; - } + // Threads are dead, everything's free game + mtx_destroy(&real->mutex); + cnd_destroy(&real->cond); + taskqueue_free(real->tq); + free(real->thrdarr); + free(real); - dlinkedlist_prepend(cq->rthreads, newthreads[i], free); - thrd_create(newthreads[i], consumer, cq); - } + // TODO: figure out if it's necessary / a good idea to do error handling on these functions - mtx_unlock(&cq->mtx); + return; +} - return 0; +int ctqueue_waitpush(ctqueue *ctq, task *tsk) { + if(!ctq || !tsk) {errno = EINVAL; return -1;} + int retval = 0; + + __CTQ_INLOCK(ctq, -1, + retval = taskqueue_push(ctq->tq, tsk); + ); + if(retval == 0) + cnd_signal(&ctq->cond); + + return retval; } -int cqueue_registerthread(cqueue * const cq) { - return cqueue_registerthreads(cq, 1); +task * ctqueue_waitpop(ctqueue *ctq) { + if(!ctq) {errno = EINVAL; return NULL;} + task *retval = NULL; + + __CTQ_INLOCK(ctq, NULL, + while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled) + cnd_wait(&ctq->cond, &ctq->mutex); + + if(ctq->canceled) { + errno = ECANCELED; + mtx_unlock(&ctq->mutex); + return NULL; + } + + retval = taskqueue_pop(ctq->tq); + ); + + return retval; } -enum __CQUEUE_STAT_OPTIONS { - __CQUEUE_STAT_NOTDEF, - - __CQUEUE_CANCELED, - __CQUEUE_THREADS_NUM, - __CQUEUE_TASKS_NUM, - - __CQUEUE_STAT_TOOBIG, -}; - -int cqueue_getstat(cqueue * const cq, enum __CQUEUE_STAT_OPTIONS opt) { - if(!cq || opt <= __CQUEUE_STAT_NOTDEF || opt >= __CQUEUE_STAT_TOOBIG) - RETURNWERR(EINVAL, -1); - - int retval = -1; - mtx_lock(&cq->mtx); - - switch(opt) { - case __CQUEUE_CANCELED: - retval = cq->canceled; - mtx_unlock(&cq->mtx); - return retval; - break; - - case __CQUEUE_THREADS_NUM: - retval = dlinkedlist_size(cq->rthreads); - mtx_unlock(&cq->mtx); - return retval; - break; - - case __CQUEUE_TASKS_NUM: - retval = dlinkedlist_size(cq->taskqueue); - mtx_unlock(&cq->mtx); - return retval; - break; - - default: - RETURNWERR(EINVAL, -1); - break; +static int __CTQ_CONSUMER(void *ctq) { + if(!ctq) {errno = EINVAL; thrd_exit(-1);} + ctqueue *real = (ctqueue *)ctq; + + for(task *ctask = NULL;;) { + ctask = ctqueue_waitpop(real); + if(!ctask) + break; + + task_fire(ctask); + task_free(ctask); } - // This should absolutely never run - RETURNWERR(ENOTRECOVERABLE, -1); + thrd_exit(1); // non-zero indicates error, -1 indicates invalid argument } -int cqueue_iscanceled(cqueue * const cq) { - return cqueue_getstat(cq, __CQUEUE_CANCELED); -} -int cqueue_numthreads(cqueue * const cq) { - return cqueue_getstat(cq, __CQUEUE_THREADS_NUM); -} -int cqueue_numtasks(cqueue * const cq) { - return cqueue_getstat(cq, __CQUEUE_TASKS_NUM); -} +int ctqueue_start(ctqueue *ctq) { + if(!ctq) {errno = EINVAL; return -1;} + + ctq->canceled = 0; + + int retval = 0; + for(int i = 0; i < ctq->talen; i++) + if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success) + break; + + if(retval != thrd_success) + ctqueue_cancel(ctq); + + return (retval == thrd_success) ? 0 : -1; +} \ No newline at end of file -- cgit v1.2.3