From 0ee5044805c8d157d5023fc1322f980e3a480df7 Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Tue, 22 Apr 2025 16:35:44 -0500 Subject: Start work on threadpool implementation --- src/shared.h | 31 ++++++++++- src/threadpool.c | 160 +++++++++++++++++++++++++++++++++++++++++++++++-------- src/threadpool.h | 21 ++++++++ 3 files changed, 189 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/shared.h b/src/shared.h index 7034255..9e7eaa8 100644 --- a/src/shared.h +++ b/src/shared.h @@ -69,14 +69,31 @@ typedef struct cl { int used; } cleanup; +// Initialize a local cleanup stack. `loc`, `funcs` and `args` need to be locally defined, non allocated arrays, and must be at least `size` elements large int cleanup_init(cleanup * const loc, int size, cleanup_callback funcs[], void *args[]); + +// Register a cleanup callback for a given cleanup object int cleanup_register(cleanup * const loc, cleanup_callback cb, void *arg); + +// Register a cleanup callback, if and only if `flag == 0` int cleanup_cndregister(cleanup * const loc, unsigned char flag, cleanup_callback cb, void *arg); + +// Clear the contents of a cleanup stack int cleanup_clear(cleanup * const loc); + +// Get the top callback without removing it from the cleanup stack cleanup_callback cleanup_peekf(cleanup * const loc); + +// Get and remove the top callback from the cleanup stack. Does not return the argument for the given callback cleanup_callback cleanup_popf(cleanup * const loc); + +// Get the top argument without removing it from the cleanup stack void * cleanup_peeka(cleanup * const loc); + +// Get and remove the top argument from the cleanup stack. Does not return the callback it was to be fed into void * cleanup_popa(cleanup * const loc); + +// Fire all the callbacks in the cleanup stack int cleanup_fire(cleanup * const loc); /* Cleanup environment creator. Automatically defines the variables `__CLEANUP`, `__CLEANUP_FUNCS`, and `__CLEANUP_ARGS` and initializes @@ -85,9 +102,19 @@ int cleanup_fire(cleanup * const loc); cleanup __CLEANUP; \ cleanup_callback __CLEANUP_FUNCS[(size)]; \ void *__CLEANUP_ARGS[(size)]; \ +unsigned char __FLAG = 0; \ cleanup_init(&__CLEANUP, (size), __CLEANUP_FUNCS, __CLEANUP_ARGS) -#define cleanup_REGISTER(cb, arg) cleanup_register(&__CLEANUP, (cb), (arg)) -#define cleanup_CNDREGISTER(flag, cb, arg) cleanup_cndregister(&__CLEANUP, (flag), (cb), (arg)) +#define cleanup_REGISTER(cb, arg) cleanup_register(&__CLEANUP, (cb), (arg)) +#define cleanup_CNDREGISTER(cb, arg) cleanup_cndregister(&__CLEANUP, __FLAG, (cb), (arg)) +#define cleanup_CLEAR() cleanup_clear(&__CLEANUP) +#define cleanup_PEEKF() cleanup_peekf(&__CLEANUP) +#define cleanup_POPF() cleanup_popf(&__CLEANUP) +#define cleanup_PEEKA() cleanup_peeka(&__CLEANUP) +#define cleanup_POPA() cleanup_popa(&__CLEANUP) +#define cleanup_FIRE() cleanup_fire(&__CLEANUP) +#define cleanup_MARK() (__FLAG = 1) +#define cleanup_UNMARK() (__FLAG = 0) +#define cleanup_ERRORFLAGGED (__FLAG != 0) #endif \ No newline at end of file diff --git a/src/threadpool.c b/src/threadpool.c index 56dcd6b..66c0d06 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -21,12 +21,12 @@ typedef struct mtxp { } mtxpair; mtxpair * mtxpair_init(void * const data, int type) { - mtxpair *mtxp = malloc(1 * sizeof(*mtxp)); + mtxpair *mtxp = VALLOC(1, sizeof(*mtxp)); if(!mtxp) return NULL; // Make room for the mutex - mtxp->mtx = malloc(1 * sizeof(*mtxp->mtx)); + mtxp->mtx = VALLOC(1, sizeof(*mtxp->mtx)); if(!mtxp->mtx) { free(mtxp); return NULL; @@ -95,7 +95,6 @@ int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) // Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 -typedef int (*task_callback)(void*); typedef struct task { task_callback cb; void *arg; @@ -105,6 +104,7 @@ typedef struct cq { dlinkedlist *list; mtx_t *mutex; cnd_t *conditional; + unsigned char canceled; } cqueue; typedef struct tp { @@ -114,6 +114,7 @@ typedef struct tp { cqueue *taskqueue; } threadpool; + task * task_init(task_callback cb, void *arg) { if(cb == NULL) RETURNWERR(EINVAL, NULL); @@ -160,7 +161,6 @@ static void ___ucleanup_dll(void *dll) { cqueue * cqueue_init(int mtx_type) { - unsigned char flag = 0; cleanup_CREATE(10); cqueue *cq = VALLOC(1, sizeof(*cq)); @@ -170,31 +170,32 @@ cqueue * cqueue_init(int mtx_type) { cq->mutex = VALLOC(1, sizeof(*(cq->mutex))); if(!(cq->mutex)) - flag++; - cleanup_CNDREGISTER(flag, free, cq->mutex); + cleanup_MARK(); + cleanup_CNDREGISTER(free, cq->mutex); - if(!flag && mtx_init(cq->mutex, mtx_type) != thrd_success) - flag++; - cleanup_CNDREGISTER(flag, ___ucleanup_mtxd, cq->mutex); + if(cleanup_ERRORFLAGGED && mtx_init(cq->mutex, mtx_type) != thrd_success) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mutex); - if(!flag && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional))))) - flag++; - cleanup_CNDREGISTER(flag, free, cq->conditional); + if(cleanup_ERRORFLAGGED && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional))))) + cleanup_MARK(); + cleanup_CNDREGISTER(free, cq->conditional); - if(!flag && cnd_init(cq->conditional) != thrd_success) - flag++; - cleanup_CNDREGISTER(flag, ___ucleanup_cndd, cq->conditional); + if(cleanup_ERRORFLAGGED && cnd_init(cq->conditional) != thrd_success) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucleanup_cndd, cq->conditional); cq->list = dlinkedlist_init(); - if(!flag && !cq->list) - flag++; - cleanup_CNDREGISTER(flag, ___ucleanup_dll, cq->list); + if(cleanup_ERRORFLAGGED && !cq->list) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucleanup_dll, cq->list); - if(flag) + if(cleanup_ERRORFLAGGED) cleanup_fire(&__CLEANUP); // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions + cq->canceled = 0; return cq; } @@ -202,6 +203,9 @@ void cqueue_free(cqueue *cq) { if(!cq) return; + // Cancel any outstanding threads before freeing everything + cqueue_cancel(cq); + dlinkedlist_free(cq->list); cnd_destroy(cq->conditional); mtx_destroy(cq->mutex); @@ -217,6 +221,11 @@ int cqueue_append(cqueue * const cq, task *tsk) { RETURNWERR(EINVAL, -1); mtx_lock(cq->mutex); + if(cq->canceled) { + mtx_unlock(cq->mutex); + thrd_exit(thrd_timedout); + } + dlinkedlist_append(cq->list, tsk, free); mtx_unlock(cq->mutex); cnd_signal(cq->conditional); @@ -229,6 +238,11 @@ int cqueue_prepend(cqueue * const cq, task *tsk) { RETURNWERR(EINVAL, -1); mtx_lock(cq->mutex); + if(cq->canceled) { + mtx_unlock(cq->mutex); + thrd_exit(thrd_timedout); + } + dlinkedlist_prepend(cq->list, tsk, free); mtx_unlock(cq->mutex); cnd_signal(cq->conditional); @@ -241,6 +255,11 @@ int cqueue_insert(cqueue * const cq, task *tsk, int index) { RETURNWERR(EINVAL, -1); mtx_lock(cq->mutex); + if(cq->canceled) { + mtx_unlock(cq->mutex); + thrd_exit(thrd_timedout); + } + dlinkedlist_insert(cq->list, tsk, free, index); mtx_unlock(cq->mutex); cnd_signal(cq->conditional); @@ -253,6 +272,11 @@ int cqueue_size(cqueue const * const cq) { RETURNWERR(EINVAL, -1); mtx_lock(cq->mutex); + if(cq->canceled) { + mtx_unlock(cq->mutex); + thrd_exit(thrd_timedout); + } + int retval = dlinkedlist_size(cq->list); mtx_unlock(cq->mutex); @@ -260,7 +284,8 @@ int cqueue_size(cqueue const * const cq) { } int cqueue_isempty(cqueue const * const cq) { - return (cqueue_size(cq) == 0); + int val = cqueue_size(cq); + return (val < 0) ? -1 : (val == 0); } int cqueue_trypop(cqueue * const cq, task **ret) { @@ -270,6 +295,11 @@ int cqueue_trypop(cqueue * const cq, task **ret) { int retval = 0; mtx_lock(cq->mutex); + if(cq->canceled) { + mtx_unlock(cq->mutex); + thrd_exit(thrd_timedout); + } + if(!dlinkedlist_isempty(cq->list)) { *ret = (task*)dlinkedlist_poplast(cq->list); retval = 1; @@ -284,10 +314,98 @@ int cqueue_waitpop(cqueue * const cq, task **ret) { RETURNWERR(EINVAL, -1); mtx_lock(cq->mutex); - while(!dlinkedlist_isempty(cq->list)) + + while(!dlinkedlist_isempty(cq->list) && !cq->canceled) cnd_wait(cq->conditional, cq->mutex); // Unlocks mutex while waiting, acquires lock once waiting is done + + if(cq->canceled) { + mtx_unlock(cq->mutex); + thrd_exit(thrd_timedout); + } + *ret = dlinkedlist_poplast(cq->list); + mtx_unlock(cq->mutex); return 0; +} + +int cqueue_cancel(cqueue * const cq) { + if(!cq) + RETURNWERR(EINVAL, -1); + + int retval = 0; + + mtx_lock(cq->mutex); + if(cq->canceled) + retval = -1; + else + cq->canceled++; + + mtx_unlock(cq->mutex); + cnd_broadcast(cq->conditional); + + return retval; +} + + +static void ___ucleanup_cqfree(void *cq) { + if(!cq) + return; + + cqueue_free(cq); + return; +} + +threadpool * threadpool_init(int threads) { + if(threads < 1) + RETURNWERR(EINVAL, NULL); + cleanup_CREATE(10); + + threadpool *tp = VALLOC(1, sizeof(*tp)); + if(!tp) + return NULL; + cleanup_REGISTER(free, tp); + + tp->taskqueue = cqueue_init(mtx_plain); + if(!tp->taskqueue) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue); + + tp->threads = VALLOC(threads, sizeof(*tp->threads)); + if(!tp->threads) + cleanup_MARK(); + cleanup_CNDREGISTER(free, tp->threads); + + for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) { + tp->threads[i] = VALLOC(1, sizeof(**tp->threads)); + if(!tp->threads[i]) { + cleanup_MARK(); + for(int j = 0; j < i; j++) + free(tp->threads[j]); + } + } + + if(cleanup_ERRORFLAGGED) + cleanup_FIRE(); + else + tp->nthreads = threads; + + return tp; +} + +void threadpool_free(threadpool *tp) { + if(!tp) + return; + + cqueue_free(tp->taskqueue); + for(int i = 0; i < tp->nthreads; i++) { + thrd_detach(*tp->threads[i]); + free(tp->threads[i]); + } + free(tp->threads); + cqueue_free(tp->taskqueue); + free(tp); + + return; } \ No newline at end of file diff --git a/src/threadpool.h b/src/threadpool.h index 8e3ee41..c28f03a 100644 --- a/src/threadpool.h +++ b/src/threadpool.h @@ -3,6 +3,25 @@ #include +typedef int (*task_callback)(void*); +typedef struct task task; +typedef struct cq cqueue; +typedef struct tp threadpool; + +task * task_init(task_callback cb, void *arg); +void task_free(task *ts); + +cqueue * cqueue_init(int mtx_type); +void cqueue_free(cqueue *cq); +int cqueue_append(cqueue * const cq, task *tsk); +int cqueue_prepend(cqueue * const cq, task *tsk); +int cqueue_insert(cqueue * const cq, task *tsk, int index); +int cqueue_size(cqueue const * const cq); +int cqueue_isempty(cqueue const * const cq); +int cqueue_trypop(cqueue * const cq, task **ret); +int cqueue_waitpop(cqueue * const cq, task **ret); +int cqueue_cancel(cqueue * const cq); + typedef struct mtxp mtxpair; mtxpair * mtxpair_init(void * const data, int type); void mtxpair_free(mtxpair *mp); @@ -10,4 +29,6 @@ int mtxpair_setdata(mtxpair * const mp, void * const data); int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd); +int cqueue_cancel(cqueue * const cq); + #endif \ No newline at end of file -- cgit v1.2.3