From 9cf4667331b97f1123f4156273a46558e27c2d2d Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Mon, 21 Apr 2025 17:31:37 -0500 Subject: Start work on cqueue implementation, create cleanup set of functions --- src/threadpool.c | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 107 insertions(+), 6 deletions(-) (limited to 'src/threadpool.c') diff --git a/src/threadpool.c b/src/threadpool.c index d058d52..e230ebb 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -1,7 +1,7 @@ #include "threadpool.h" +#include "arena.h" #include "shared.h" -#include "ll-internal.h" #include "ll.h" #include @@ -95,14 +95,18 @@ int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) // Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 -typedef struct cq { - void*placehoilder; -} cqueue; - +typedef int (*task_callback)(void*); typedef struct task { - int (*callback)(void*); + task_callback cb; void *arg; } task; + +typedef struct cq { + dlinkedlist *list; + mtx_t *mutex; + cnd_t *conditional; +} cqueue; + typedef struct tp { thrd_t **threads; int nthreads; @@ -110,3 +114,100 @@ typedef struct tp { cqueue *taskqueue; } threadpool; +task * task_init(task_callback cb, void *arg) { + if(cb == NULL) + RETURNWERR(EINVAL, NULL); + task *task = VALLOC(1, sizeof(*task)); + if(!task) + return NULL; + + task->cb = cb; + task->arg = arg; + + return task; +} + +void task_free(task *ts) { + if(!ts) + return; + + free(ts); // Not making any assumptions about the data in the task + return; +} + + +static void ___ucleanup_mtxd(void *mtx) { + if(!mtx) + return; + + mtx_destroy((mtx_t *)mtx); + return; +} +static void ___ucleanup_cndd(void *cnd) { + if(!cnd) + return; + + cnd_destroy((cnd_t *)cnd); + return; +} +static void ___ucleanup_dll(void *dll) { + if(!dll) + return; + + dlinkedlist_free((dlinkedlist *)dll); + return; +} + + +cqueue * cqueue_init(int mtx_type) { + unsigned char flag = 0; + cleanup_create(10); + + cqueue *cq = VALLOC(1, sizeof(*cq)); + if(!cq) + return NULL; + cleanup_register(&__CLEANUP, free, cq); + + cq->mutex = VALLOC(1, sizeof(*(cq->mutex))); + if(!(cq->mutex)) + flag++; + cleanup_cndregister(&__CLEANUP, flag, free, cq->mutex); + + if(!flag && mtx_init(cq->mutex, mtx_type) != thrd_success) + flag++; + cleanup_cndregister(&__CLEANUP, flag, ___ucleanup_mtxd, cq->mutex); + + if(!flag && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional))))) + flag++; + cleanup_cndregister(&__CLEANUP, flag, free, cq->conditional); + + if(!flag && cnd_init(cq->conditional) != thrd_success) + flag++; + cleanup_cndregister(&__CLEANUP, flag, ___ucleanup_cndd, cq->conditional); + + cq->list = dlinkedlist_init(); + if(!flag && !cq->list) + flag++; + cleanup_cndregister(&__CLEANUP, flag, ___ucleanup_dll, cq->list); + + if(flag) + cleanup_fire(&__CLEANUP); + + // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions + + return cq; +} + +void cqueue_free(cqueue *cq) { + if(!cq) + return; + + dlinkedlist_free(cq->list); + cnd_destroy(cq->conditional); + mtx_destroy(cq->mutex); + free(cq->conditional); + free(cq->mutex); + free(cq); + + return; +} \ No newline at end of file -- cgit v1.2.3