From 21c168bf02bbab8b473873f0822d68859a025c24 Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Wed, 22 Oct 2025 00:41:19 -0500 Subject: Fix threadpool after ripping out cleanup function suite --- src/threadpool.c | 156 +++++++++++++++++++++++++------------------------------ 1 file changed, 71 insertions(+), 85 deletions(-) (limited to 'src/threadpool.c') diff --git a/src/threadpool.c b/src/threadpool.c index c266964..959c060 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -15,6 +15,7 @@ #include "shared.c" #include +#include #include #include #include @@ -44,9 +45,9 @@ typedef struct tqnode { * */ typedef struct taskqueue { - tqnode *start; //!< The first element of the queue - tqnode *end; //!< The final element of the queue - unsigned int size; //!< The number of elements in the queue + tqnode *start; //!< The first element of the queue + tqnode *end; //!< The final element of the queue + size_t size; //!< The number of elements in the queue } taskqueue; /** @@ -54,13 +55,14 @@ typedef struct taskqueue { * */ typedef struct ctqueue { - mtx_t mutex; //!< A mutex for locking sensitive resources - cnd_t cond; //!< A conditional for waiting on / sending a signal - unsigned char canceled; //!< Whether the threads are currently canceled or not - - taskqueue *tq; //!< A taskqueue to be accessed concurrently - thrd_t *thrdarr; //!< An array of threads to be dispatched as consumers - int talen; //!< The length of the thread array + mtx_t mutex; //!< A mutex for locking sensitive resources + cnd_t cond; //!< A conditional for waiting on / sending a signal + uint8_t canceled; //!< Whether the threads are currently canceled or not + taskqueue *tq; //!< A taskqueue to be accessed concurrently + + thrd_t *thrdarr; //!< An array of threads to be dispatched as consumers + int talen; //!< The length of the thread array + // Consider making these another linked list or stack or something } ctqueue; /** @@ -71,12 +73,11 @@ typedef struct ctqueue { * @param data Data to be passed to the callback. May be null * @retval (task*)[NULL, task*] Returns a task object with set parameters. Returns `null` and sets errno on error */ -task * task_init(gcallback callback, fcallback freecb, void *data) { +task * task_new(gcallback callback, fcallback freecb, void *data) { if(callback == NULL) ERRRET(EINVAL, NULL); task *tsk = calloc(1, sizeof(*tsk)); - if(!tsk) - return NULL; + if(!tsk) return NULL; tsk->callback = callback; tsk->freecb = freecb; @@ -91,12 +92,10 @@ task * task_init(gcallback callback, fcallback freecb, void *data) { * @param tsk A task object to free. Frees data associated with task via `freecb` value specified in its creation. May be null */ void task_free(void *tsk) { - task *real = (task *)tsk; - if(!real) - return; + task *real = tsk; + if(!real) return; - if(real->freecb != NULL) - real->freecb(real->data); + if(real->freecb) real->freecb(real->data); free(real); return; @@ -120,18 +119,18 @@ int task_fire(task *tsk) { * @retval (int) Returns value of the callback. Returns -1 and sets errno on error */ int task_fired(task *tsk) { + if(!tsk) return -1; int retval = task_fire(tsk); - if(errno == EINVAL && retval == -1) {return -1;} task_free(tsk); return retval; } -tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) { +tqnode * tqnode_new(tqnode *next, tqnode *prev, task *tsk) { if(!tsk) ERRRET(EINVAL, NULL); + tqnode *node = calloc(1, sizeof(*node)); - if(!node) - return NULL; + if(!node) return NULL; node->next = next; node->prev = prev; @@ -140,10 +139,19 @@ tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) { return node; } +// Create a tqnode and task at the same time. Returns a valid tqnode with a valid task on success, NULL on error. Does not call task_free on error +tqnode * tqnode_newtask(tqnode *next, tqnode *prev, gcallback callback, fcallback freecb, void *data) { + task *tsk = task_new(callback, freecb, data); + if(!tsk) return NULL; + + tqnode *node = tqnode_new(next, prev, tsk); + if(!node) free(tsk); + return node; +} + void tqnode_free(void *tqn) { - tqnode *real = (tqnode *)tqn; - if(!real) - return; + tqnode *real = tqn; + if(!real) return; task_free(real->task); free(real); @@ -157,10 +165,9 @@ void tqnode_free(void *tqn) { * * @retval (taskqueue*)[NULL, taskqueue*] Returns a new taskqueue object. Returns `null` and sets errno on error */ -taskqueue * taskqueue_init(void) { +taskqueue * taskqueue_new(void) { taskqueue *tq = calloc(1, sizeof(*tq)); - if(!tq) - return NULL; + if(!tq) return NULL; tq->start = NULL; tq->end = NULL; @@ -175,8 +182,7 @@ taskqueue * taskqueue_init(void) { * @param tq A taskqueue to be freed. May be null */ void taskqueue_free(void *tq) { - if(!tq) - return; + if(!tq) return; for(tqnode *p = ((taskqueue*)tq)->start, *n; p != NULL;) { n = p->next; @@ -192,9 +198,8 @@ int taskqueue_handlefirst(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); if(tq->size) {return 0;} - tqnode *first = tqnode_init(NULL, NULL, tsk); - if(!first) - return -1; + tqnode *first = tqnode_new(NULL, NULL, tsk); + if(!first) return -1; tq->start = first; tq->end = first; @@ -214,13 +219,12 @@ int taskqueue_push(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); int hf; - if((hf = taskqueue_handlefirst(tq, tsk))) - return (hf >= 0) ? 0 : -1; + if((hf = taskqueue_handlefirst(tq, tsk))) return (hf >= 0) ? 0 : -1; + + tqnode *curstart = tq->start, *newstart = tqnode_new(tq->start, NULL, tsk); + if(!newstart) return -1; - tqnode *newstart = tqnode_init(tq->start, NULL, tsk); - if(!newstart) - return -1; - tq->start->prev = newstart; + curstart->prev = newstart; tq->start = newstart; tq->size++; @@ -264,13 +268,12 @@ int taskqueue_pushfront(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); int hf; - if((hf = taskqueue_handlefirst(tq, tsk))) - return (hf >= 0) ? 0 : -1; + if((hf = taskqueue_handlefirst(tq, tsk))) return (hf >= 0) ? 0 : -1; - tqnode *newend = tqnode_init(NULL, tq->end, tsk); - if(!newend) - return -1; - tq->end->next = newend; + tqnode *end =tq->end, *newend = tqnode_new(NULL, tq->end, tsk); + if(!newend) return -1; + + end->next = newend; tq->end = newend; tq->size++; @@ -323,15 +326,15 @@ int taskqueue_size(taskqueue *tq) { mtx_unlock(&(ctq)->mutex); \ } while (0) -static void ___ucl_mtxdestroy(void *mtx) { - if(!mtx) return; - mtx_destroy((mtx_t *)mtx); +static void mtxd_helper(mtx_t *mutex) { + if(!mutex) return; + mtx_destroy(mutex); return; } -static void ___ucl_cnddestroy(void *cond) { - if(cond) return; - cnd_destroy((cnd_t *)cond); +static void cndd_helper(cnd_t *cond) { + if(!cond) return; + cnd_destroy(cond); return; } @@ -343,47 +346,32 @@ static void ___ucl_cnddestroy(void *cond) { */ ctqueue * ctqueue_init(int nthreads) { if(nthreads <= 0) ERRRET(EINVAL, NULL); - cleanup_CREATE(6); ctqueue *ctq = calloc(1, sizeof(*ctq)); - if(!ctq) - return NULL; - cleanup_REGISTER(free, ctq); + if(!ctq) return NULL; ctq->canceled = 0; ctq->talen = nthreads; - cleanup_CNDEXEC( - ctq->tq = taskqueue_init(); - if(!ctq->tq) - cleanup_MARK(); - cleanup_CNDREGISTER(taskqueue_free, ctq->tq); - ); + ctq->tq = taskqueue_new(); + if(!ctq->tq) goto ERR_ctqueue_init; - cleanup_CNDEXEC( - if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucl_mtxdestroy, (void*)&ctq->mutex); - ); + if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) goto ERR_ctqueue_init; + if(cnd_init(&ctq->cond) != thrd_success) goto ERR_ctqueue_init; - cleanup_CNDEXEC( - if(cnd_init(&ctq->cond) != thrd_success) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond); - ); + ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); + if(!ctq->thrdarr) goto ERR_ctqueue_init; - cleanup_CNDEXEC( - ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); - if(!ctq->thrdarr) - cleanup_MARK(); - cleanup_CNDREGISTER(free, ctq->thrdarr); - ) + return ctq; - cleanup_CNDFIRE(); - if(cleanup_ERRORFLAGGED) - return NULL; +ERR_ctqueue_init: + free(ctq->thrdarr); + cndd_helper(&ctq->cond); + mtxd_helper(&ctq->mutex); + taskqueue_free(ctq->tq); + free(ctq); - return ctq; + return NULL; } /** @@ -410,8 +398,7 @@ int ctqueue_cancel(ctqueue *ctq) { * @param ctq The concurrent taskqueue to free. May be null */ void ctqueue_free(void *ctq) { - if(!ctq) - return; + if(!ctq) return; ctqueue *real = (ctqueue *)ctq; ctqueue_cancel(real); @@ -485,8 +472,7 @@ static int __CTQ_CONSUMER(void *ctq) { for(task *ctask = NULL;;) { ctask = ctqueue_waitpop(real); - if(!ctask) - break; + if(!ctask) break; task_fire(ctask); task_free(ctask); -- cgit v1.2.3