From 928238bdcbc78c4196eb4a3508808c79e31f7c84 Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Wed, 14 May 2025 12:19:34 -0500 Subject: Update signatures of free-type functions to match the freecallback signature --- src/threadpool.c | 264 ++++++++++++++++++++++++++----------------------------- 1 file changed, 125 insertions(+), 139 deletions(-) (limited to 'src/threadpool.c') diff --git a/src/threadpool.c b/src/threadpool.c index 9d00030..e1c11aa 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -10,7 +10,7 @@ // Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) typedef struct mtxp { void *data; - mtx_t *mtx; + mtx_t mtx; } mtxpair; mtxpair * mtxpair_init(void * const data, int type) { @@ -18,16 +18,9 @@ mtxpair * mtxpair_init(void * const data, int type) { if(!mtxp) return NULL; - // Make room for the mutex - mtxp->mtx = VALLOC(1, sizeof(*mtxp->mtx)); - if(!mtxp->mtx) { - free(mtxp); - return NULL; - } - // Init the mutex - if(mtx_init(mtxp->mtx, type) == thrd_error) { - free(mtxp->mtx); free(mtxp); + if(mtx_init(&mtxp->mtx, type) == thrd_error) { + free(mtxp); RETURNWERR(errno, NULL); } @@ -39,8 +32,7 @@ void mtxpair_free(mtxpair *mp) { if(!mp) return; - mtx_destroy(mp->mtx); - free(mp->mtx); + mtx_destroy(&mp->mtx); free(mp); return; @@ -64,9 +56,9 @@ int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) if(!mtxd) RETURNWERR(EINVAL, thrd_error); - if(mtx_lock(mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} + if(mtx_lock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} int retval = thrd_create(thr, func, mtxd->data); - if(mtx_unlock(mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} + if(mtx_unlock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} return retval; } @@ -99,21 +91,6 @@ typedef struct task { void *arg; } task; -typedef struct cq { - dlinkedlist *list; - mtx_t *mtx; - cnd_t *cnd; - unsigned char canceled; -} cqueue; - -typedef struct tp { - thrd_t **threads; - int nthreads; - - cqueue *taskqueue; -} threadpool; - - task * task_init(task_callback cb, void *arg) { if(cb == NULL) RETURNWERR(EINVAL, NULL); @@ -138,149 +115,158 @@ void task_free(task *ts) { int task_fire(task *ts) { if(!ts) RETURNWERR(EINVAL, -1); + if(ts->cb == NULL) + RETURNWERR(EINVAL, -1); return ts->cb(ts->arg); } -/* Mutex: Lock a shared resource. Used to prevent race conditions when accessing / modifying some shared resource. A lock must -// always be followed by an unlock -// Semaphore: Send / wait on a signal; solves the consumer/producer problem. A function that sends should never wait, and a -// function that waits should never send */ +typedef struct cq { + dlinkedlist *taskqueue; + dlinkedlist *rthreads; -static void ___ucleanup_dfree(void *dll) { - if(!dll) - return; + mtx_t mtx; + cnd_t cnd; + + unsigned char canceled; - dlinkedlist_free((dlinkedlist *)dll); - return; -} +} cqueue; -static void ___ucleanup_cndd(void *cnd) { - if(!cnd) - return; - cnd_destroy((cnd_t *)cnd); - return; -} -static void ___ucleanup_mtxd(void *mtx) { - if(!mtx) - return; +// static void ___ucleanup_dfree(void *dll) { +// if(!dll) +// return; - mtx_destroy((mtx_t*)mtx); - return; -} +// dlinkedlist_free((dlinkedlist *)dll); +// return; +// } + +// static void ___ucleanup_cndd(void *cnd) { +// if(!cnd) +// return; + +// cnd_destroy((cnd_t *)cnd); +// return; +// } + +// static void ___ucleanup_mtxd(void *mtx) { +// if(!mtx) +// return; -cqueue * cqueue_init(int mtx_type) { - cleanup_CREATE(10); +// mtx_destroy((mtx_t*)mtx); +// return; +// } + +// cqueue * cqueue_init(int mtx_type) { +// cleanup_CREATE(10); - cqueue *cq = VALLOC(1, sizeof(*cq)); - if(!cq) - return NULL; - cleanup_REGISTER(free, cq); - - cq->canceled = FALSE; - cq->list = dlinkedlist_init(); - if(!cq->list) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_dfree, cq->list); - - if(!cleanup_ERRORFLAGGED) - if(!(cq->cnd = VALLOC(1, sizeof(*cq->cnd)))) - cleanup_MARK(); - cleanup_CNDREGISTER(free, cq->cnd); +// cqueue *cq = VALLOC(1, sizeof(*cq)); +// if(!cq) +// return NULL; +// cleanup_REGISTER(free, cq); + +// cq->canceled = FALSE; +// cq->list = dlinkedlist_init(); +// if(!cq->list) +// cleanup_MARK(); +// cleanup_CNDREGISTER(___ucleanup_dfree, cq->list); - if(!cleanup_ERRORFLAGGED) - if(cnd_init(cq->cnd) == thrd_error) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_cndd, cq->cnd); +// if(!cleanup_ERRORFLAGGED) +// if(cnd_init(&cq->cnd) == thrd_error) +// cleanup_MARK(); +// cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd); - if(!cleanup_ERRORFLAGGED) - if(!(cq->mtx = VALLOC(1, sizeof(*cq->mtx)))) - cleanup_MARK(); - cleanup_CNDREGISTER(free, cq->mtx); - - if(!cleanup_ERRORFLAGGED) - if(mtx_init(cq->mtx, mtx_type) != thrd_success) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx); +// if(!cleanup_ERRORFLAGGED) +// if(mtx_init(&cq->mtx, mtx_type) != thrd_success) +// cleanup_MARK(); +// cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx); - if(cleanup_ERRORFLAGGED) - cleanup_FIRE(); +// if(cleanup_ERRORFLAGGED) +// cleanup_FIRE(); - return cq; -} +// return cq; +// } -void cqueue_cancel(cqueue *cq) { - if(!cq) - return; +// void cqueue_cancel(cqueue *cq) { +// if(!cq) +// return; - mtx_lock(cq->mtx); - if(cq->canceled) { - mtx_unlock(cq->mtx); - thrd_exit(-1); - } +// mtx_lock(cq->mtx); +// if(cq->canceled) { +// mtx_unlock(cq->mtx); +// thrd_exit(-1); +// } - cq->canceled++; - mtx_unlock(cq->mtx); - cnd_broadcast(cq->cnd); +// cq->canceled++; +// mtx_unlock(cq->mtx); +// cnd_broadcast(cq->cnd); - return; -} +// return; +// } -void cqueue_free(cqueue *cq) { - if(!cq) - return; +// void cqueue_free(cqueue *cq) { +// if(!cq) +// return; - cqueue_cancel(cq); - mtx_destroy(cq->mtx); - cnd_destroy(cq->cnd); - free(cq->mtx); - free(cq->cnd); - dlinkedlist_free(cq->list); +// cqueue_cancel(cq); +// mtx_destroy(cq->mtx); +// cnd_destroy(cq->cnd); +// free(cq->mtx); +// free(cq->cnd); +// dlinkedlist_free(cq->list); - return; -} +// return; +// } -int cqueue_addtask(cqueue * const cq, task * const tsk) { - if(!cq || !tsk) - RETURNWERR(EINVAL, -1); +// int cqueue_addtask(cqueue * const cq, task * const tsk) { +// if(!cq || !tsk) +// RETURNWERR(EINVAL, -1); - mtx_lock(cq->mtx); +// mtx_lock(cq->mtx); - // TODO: Think about creating an "exception" via signal handling - if(cq->canceled) { - mtx_unlock(cq->mtx); - thrd_exit(-1); - } +// // TODO: Think about creating an "exception" via signal handling +// if(cq->canceled) { +// mtx_unlock(cq->mtx); +// thrd_exit(-1); +// } - dlinkedlist_prepend(cq->list, tsk, free); - mtx_unlock(cq->mtx); - cnd_signal(cq->cnd); +// dlinkedlist_prepend(cq->list, tsk, free); +// mtx_unlock(cq->mtx); +// cnd_signal(cq->cnd); - return 0; -} +// return 0; +// } -task * cqueue_waitpop(cqueue * const cq) { - if(!cq) - RETURNWERR(EINVAL, NULL); +// task * cqueue_waitpop(cqueue * const cq) { +// if(!cq) +// RETURNWERR(EINVAL, NULL); - task *retval = NULL; +// task *retval = NULL; - mtx_lock(cq->mtx); - while(dlinkedlist_isempty(cq->list) && !cq->canceled) - cnd_wait(cq->cnd, cq->mtx); +// mtx_lock(cq->mtx); +// while(dlinkedlist_isempty(cq->list) && !cq->canceled) +// cnd_wait(cq->cnd, cq->mtx); - if(cq->canceled) { - mtx_unlock(cq->mtx); - thrd_exit(-1); - } +// if(cq->canceled) { +// mtx_unlock(cq->mtx); +// thrd_exit(-1); +// } - retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); - dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); - mtx_unlock(cq->mtx); +// retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); +// dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); +// mtx_unlock(cq->mtx); - return retval; -} \ No newline at end of file +// return retval; +// } + + + +typedef struct tp { + thrd_t **threads; // thrd_t *threads[] + int nthreads; + + cqueue *taskqueue; +} threadpool; -- cgit v1.2.3