From 5a5ff094101d63935f7fc65c124b3e56d553522d Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Wed, 14 May 2025 13:20:21 -0500 Subject: Start reimplementing some of the earlier concurrent queue code --- src/threadpool.c | 159 +++++++++++++++++++++++++++++++------------------------ src/threadpool.h | 8 ++- 2 files changed, 92 insertions(+), 75 deletions(-) diff --git a/src/threadpool.c b/src/threadpool.c index e1c11aa..0baa024 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -131,95 +131,114 @@ typedef struct cq { cnd_t cnd; unsigned char canceled; - } cqueue; +static void ___ucleanup_mtxd(void *mtx) { + if(!mtx) + return; -// static void ___ucleanup_dfree(void *dll) { -// if(!dll) -// return; + mtx_destroy((mtx_t*)mtx); + return; +} -// dlinkedlist_free((dlinkedlist *)dll); -// return; -// } +static void ___ucleanup_cndd(void *cnd) { + if(!cnd) + return; -// static void ___ucleanup_cndd(void *cnd) { -// if(!cnd) -// return; + cnd_destroy((cnd_t *)cnd); + return; +} -// cnd_destroy((cnd_t *)cnd); -// return; -// } +cqueue * cqueue_init() { + cleanup_CREATE(10); + + // Create base object + cqueue *cq = VALLOC(1, sizeof(*cq)); + if(!cq) + RETURNWERR(errno, NULL); + cleanup_REGISTER(free, cq); + cq->canceled = 0; -// static void ___ucleanup_mtxd(void *mtx) { -// if(!mtx) -// return; + // Initialize the mutex + if(mtx_init(&cq->mtx, mtx_plain) != thrd_success) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx); + + // Initialize the conditional + if(!cleanup_ERRORFLAGGED) + if(cnd_init(&cq->cnd) != thrd_success) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd); + + // Create the taskqueue + if(!cleanup_ERRORFLAGGED) + if(!(cq->taskqueue = dlinkedlist_init())) + cleanup_MARK(); + cleanup_CNDREGISTER(dlinkedlist_free, cq->taskqueue); + + // Create the thread list + if(!cleanup_ERRORFLAGGED) + if(!(cq->rthreads = dlinkedlist_init())) + cleanup_MARK(); + cleanup_CNDREGISTER(dlinkedlist_free, cq->rthreads); + + if(cleanup_ERRORFLAGGED) + cleanup_FIRE(); + + return cq; + + // Lambdas would make this a million times easier, as I could wrap this whole thing in a while loop then run a bunch of in-line + // callbacks that do these operations and I wouldn't need this badness. That or I could use a goto, but I also hate that idea +} -// mtx_destroy((mtx_t*)mtx); -// return; -// } +void cqueue_cancel(cqueue * const cq) { + if(!cq) + return; -// cqueue * cqueue_init(int mtx_type) { -// cleanup_CREATE(10); - -// cqueue *cq = VALLOC(1, sizeof(*cq)); -// if(!cq) -// return NULL; -// cleanup_REGISTER(free, cq); - -// cq->canceled = FALSE; -// cq->list = dlinkedlist_init(); -// if(!cq->list) -// cleanup_MARK(); -// cleanup_CNDREGISTER(___ucleanup_dfree, cq->list); - -// if(!cleanup_ERRORFLAGGED) -// if(cnd_init(&cq->cnd) == thrd_error) -// cleanup_MARK(); -// cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd); - -// if(!cleanup_ERRORFLAGGED) -// if(mtx_init(&cq->mtx, mtx_type) != thrd_success) -// cleanup_MARK(); -// cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx); + mtx_lock(&cq->mtx); -// if(cleanup_ERRORFLAGGED) -// cleanup_FIRE(); + if(cq->canceled) { + mtx_unlock(&cq->mtx); + return; + } + cq->canceled = 1; -// return cq; -// } + mtx_unlock(&cq->mtx); + cnd_broadcast(&cq->cnd); + + return; +} -// void cqueue_cancel(cqueue *cq) { -// if(!cq) -// return; +static int ___cqueue_join(void *t) { + if(!t) + return -1; -// mtx_lock(cq->mtx); -// if(cq->canceled) { -// mtx_unlock(cq->mtx); -// thrd_exit(-1); -// } + int retval = 0; + thrd_t thread = *((thrd_t*)t); + thrd_join(thread, &retval); + + return retval; +} -// cq->canceled++; -// mtx_unlock(cq->mtx); -// cnd_broadcast(cq->cnd); +void cqueue_free(void *cq) { + if(!cq) + return; -// return; -// } + cqueue *real = (cqueue *)cq; -// void cqueue_free(cqueue *cq) { -// if(!cq) -// return; + // Cancel threads and wait for them to exit + cqueue_cancel(real); + dlinkedlist_foreach(real->rthreads, ___cqueue_join); -// cqueue_cancel(cq); -// mtx_destroy(cq->mtx); -// cnd_destroy(cq->cnd); -// free(cq->mtx); -// free(cq->cnd); -// dlinkedlist_free(cq->list); + // Threads are dead, no need to worry about concurrency anymore + mtx_destroy(&real->mtx); + cnd_destroy(&real->cnd); + dlinkedlist_free(real->rthreads); + dlinkedlist_free(real->taskqueue); -// return; -// } + return; +} // int cqueue_addtask(cqueue * const cq, task * const tsk) { // if(!cq || !tsk) diff --git a/src/threadpool.h b/src/threadpool.h index bd1b787..db6fa2e 100644 --- a/src/threadpool.h +++ b/src/threadpool.h @@ -12,11 +12,9 @@ task * task_init(task_callback cb, void *arg); void task_free(task *ts); int task_fire(task *ts); -cqueue * cqueue_init(int mtx_type); +cqueue * cqueue_init(); +void cqueue_cancel(cqueue * const cq); +void cqueue_free(void *cq); -void cqueue_cancel(cqueue *cq); -void cqueue_free(cqueue *cq); -int cqueue_addtask(cqueue * const cq, task * const tsk); -task * cqueue_waitpop(cqueue * const cq); #endif \ No newline at end of file -- cgit v1.2.3