From db6aacd22f5c43cc6ea45e6c07bf962859ebac8d Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Tue, 22 Apr 2025 18:53:20 -0500 Subject: Stop threadpool from crashing --- src/main.c | 1 + src/threadpool.c | 120 ++++++++++++++++++++++++++++++------------------------- src/threadpool.h | 1 + 3 files changed, 68 insertions(+), 54 deletions(-) (limited to 'src') diff --git a/src/main.c b/src/main.c index f89a124..6af87ab 100644 --- a/src/main.c +++ b/src/main.c @@ -23,6 +23,7 @@ int main() { threadpool *tp = threadpool_init(2); task *tsk = task_init(testcb, "This is some data"); threadpool_addtask(tp, tsk); + threadpool_join(tp); threadpool_free(tp); return 0; diff --git a/src/threadpool.c b/src/threadpool.c index 31c300c..ab0733d 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -1,5 +1,4 @@ #include "threadpool.h" -#include "arena.h" #include "shared.h" #include "ll.h" @@ -102,8 +101,8 @@ typedef struct task { typedef struct cq { dlinkedlist *list; - mtx_t *mutex; - cnd_t *conditional; + mtx_t *mtx; + cnd_t *cnd; unsigned char canceled; } cqueue; @@ -168,32 +167,37 @@ cqueue * cqueue_init(int mtx_type) { return NULL; cleanup_REGISTER(free, cq); - cq->mutex = VALLOC(1, sizeof(*(cq->mutex))); - if(!(cq->mutex)) + cq->mtx = VALLOC(1, sizeof(*(cq->mtx))); + if(!(cq->mtx)) cleanup_MARK(); - cleanup_CNDREGISTER(free, cq->mutex); + cleanup_CNDREGISTER(free, cq->mtx); - if(cleanup_ERRORFLAGGED && mtx_init(cq->mutex, mtx_type) != thrd_success) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mutex); + if(!cleanup_ERRORFLAGGED) + if(mtx_init(cq->mtx, mtx_type) != thrd_success) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx); - if(cleanup_ERRORFLAGGED && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional))))) - cleanup_MARK(); - cleanup_CNDREGISTER(free, cq->conditional); + if(!cleanup_ERRORFLAGGED) + if(!(cq->cnd = VALLOC(1, sizeof(*(cq->cnd))))) + cleanup_MARK(); + cleanup_CNDREGISTER(free, cq->cnd); - if(cleanup_ERRORFLAGGED && cnd_init(cq->conditional) != thrd_success) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_cndd, cq->conditional); + if(!cleanup_ERRORFLAGGED) + if(cnd_init(cq->cnd) != thrd_success) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucleanup_cndd, cq->cnd); - cq->list = dlinkedlist_init(); - if(cleanup_ERRORFLAGGED && !cq->list) - cleanup_MARK(); + if(!cleanup_ERRORFLAGGED) + if(!(cq->list = dlinkedlist_init())) + cleanup_MARK(); cleanup_CNDREGISTER(___ucleanup_dll, cq->list); + if(cleanup_ERRORFLAGGED) cleanup_fire(&__CLEANUP); // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions + // The implementation was not better lmao cq->canceled = 0; return cq; @@ -207,10 +211,10 @@ void cqueue_free(cqueue *cq) { cqueue_cancel(cq); dlinkedlist_free(cq->list); - cnd_destroy(cq->conditional); - mtx_destroy(cq->mutex); - free(cq->conditional); - free(cq->mutex); + cnd_destroy(cq->cnd); + mtx_destroy(cq->mtx); + free(cq->cnd); + free(cq->mtx); free(cq); return; @@ -220,15 +224,15 @@ int cqueue_append(cqueue * const cq, task *tsk) { if(!cq || !tsk) RETURNWERR(EINVAL, -1); - mtx_lock(cq->mutex); + mtx_lock(cq->mtx); if(cq->canceled) { - mtx_unlock(cq->mutex); + mtx_unlock(cq->mtx); thrd_exit(thrd_timedout); } dlinkedlist_append(cq->list, tsk, free); - mtx_unlock(cq->mutex); - cnd_signal(cq->conditional); + mtx_unlock(cq->mtx); + cnd_signal(cq->cnd); return 0; } @@ -237,15 +241,15 @@ int cqueue_prepend(cqueue * const cq, task *tsk) { if(!cq || !tsk) RETURNWERR(EINVAL, -1); - mtx_lock(cq->mutex); + mtx_lock(cq->mtx); if(cq->canceled) { - mtx_unlock(cq->mutex); + mtx_unlock(cq->mtx); thrd_exit(thrd_timedout); } dlinkedlist_prepend(cq->list, tsk, free); - mtx_unlock(cq->mutex); - cnd_signal(cq->conditional); + mtx_unlock(cq->mtx); + cnd_signal(cq->cnd); return 0; } @@ -254,15 +258,15 @@ int cqueue_insert(cqueue * const cq, task *tsk, int index) { if(!cq || !tsk || index < 0) // Can't check to see if the index is too high without locking the mutex first RETURNWERR(EINVAL, -1); - mtx_lock(cq->mutex); + mtx_lock(cq->mtx); if(cq->canceled) { - mtx_unlock(cq->mutex); + mtx_unlock(cq->mtx); thrd_exit(thrd_timedout); } dlinkedlist_insert(cq->list, tsk, free, index); - mtx_unlock(cq->mutex); - cnd_signal(cq->conditional); + mtx_unlock(cq->mtx); + cnd_signal(cq->cnd); return 0; } @@ -271,14 +275,14 @@ int cqueue_size(cqueue const * const cq) { if(!cq) RETURNWERR(EINVAL, -1); - mtx_lock(cq->mutex); + mtx_lock(cq->mtx); if(cq->canceled) { - mtx_unlock(cq->mutex); + mtx_unlock(cq->mtx); thrd_exit(thrd_timedout); } int retval = dlinkedlist_size(cq->list); - mtx_unlock(cq->mutex); + mtx_unlock(cq->mtx); return retval; } @@ -294,9 +298,9 @@ int cqueue_trypop(cqueue * const cq, task **ret) { int retval = 0; - mtx_lock(cq->mutex); + mtx_lock(cq->mtx); if(cq->canceled) { - mtx_unlock(cq->mutex); + mtx_unlock(cq->mtx); thrd_exit(thrd_timedout); } @@ -304,7 +308,7 @@ int cqueue_trypop(cqueue * const cq, task **ret) { *ret = (task*)dlinkedlist_poplast(cq->list); retval = 1; } - mtx_unlock(cq->mutex); + mtx_unlock(cq->mtx); return retval; } @@ -313,19 +317,19 @@ int cqueue_waitpop(cqueue * const cq, task **ret) { if(!cq || !ret) RETURNWERR(EINVAL, -1); - mtx_lock(cq->mutex); + mtx_lock(cq->mtx); while(!dlinkedlist_isempty(cq->list) && !cq->canceled) - cnd_wait(cq->conditional, cq->mutex); // Unlocks mutex while waiting, acquires lock once waiting is done + cnd_wait(cq->cnd, cq->mtx); // Unlocks mutex while waiting, acquires lock once waiting is done if(cq->canceled) { - mtx_unlock(cq->mutex); + mtx_unlock(cq->mtx); thrd_exit(thrd_timedout); } *ret = dlinkedlist_poplast(cq->list); - mtx_unlock(cq->mutex); + mtx_unlock(cq->mtx); return 0; } @@ -336,14 +340,14 @@ int cqueue_cancel(cqueue * const cq) { int retval = 0; - mtx_lock(cq->mutex); + mtx_lock(cq->mtx); if(cq->canceled) retval = -1; else cq->canceled++; - mtx_unlock(cq->mutex); - cnd_broadcast(cq->conditional); + mtx_unlock(cq->mtx); + cnd_broadcast(cq->cnd); return retval; } @@ -389,9 +393,9 @@ threadpool * threadpool_init(int threads) { cleanup_MARK(); cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue); - tp->threads = VALLOC(threads, sizeof(*tp->threads)); - if(!tp->threads) - cleanup_MARK(); + if(!cleanup_ERRORFLAGGED) + if(!(tp->threads = VALLOC(threads, sizeof(*tp->threads)))) + cleanup_MARK(); cleanup_CNDREGISTER(free, tp->threads); for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) { @@ -420,13 +424,11 @@ void threadpool_free(threadpool *tp) { return; cqueue_free(tp->taskqueue); - for(int i = 0; i < tp->nthreads; i++) { - thrd_detach(*tp->threads[i]); + for(int i = 0; i < tp->nthreads; i++) free(tp->threads[i]); - } free(tp->threads); free(tp); - + return; } @@ -435,4 +437,14 @@ int threadpool_addtask(threadpool * const tp, task * const task) { RETURNWERR(EINVAL, -1); return cqueue_append(tp->taskqueue, task); +} + +int threadpool_join(const threadpool * const tp) { + if(!tp) + RETURNWERR(EINVAL, -1); + + for(int i = 0; i < tp->nthreads; i++) + thrd_join(*(tp->threads[i]), NULL); + + return 0; } \ No newline at end of file diff --git a/src/threadpool.h b/src/threadpool.h index 48ad9eb..bc3ce06 100644 --- a/src/threadpool.h +++ b/src/threadpool.h @@ -25,6 +25,7 @@ int cqueue_cancel(cqueue * const cq); threadpool * threadpool_init(int threads); void threadpool_free(threadpool *tp); int threadpool_addtask(threadpool * const tp, task * const task); +int threadpool_join(const threadpool * const tp); typedef struct mtxp mtxpair; mtxpair * mtxpair_init(void * const data, int type); -- cgit v1.2.3