From ee3621075106ac6cff84a94432c12c89c9e45979 Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Thu, 24 Apr 2025 23:08:28 -0500 Subject: Yeah this shit brokey --- src/main.c | 32 ++++-- src/shared.h | 2 + src/threadpool.c | 314 +++++++++++++------------------------------------------ src/threadpool.h | 24 +---- 4 files changed, 107 insertions(+), 265 deletions(-) (limited to 'src') diff --git a/src/main.c b/src/main.c index 6af87ab..95b55ff 100644 --- a/src/main.c +++ b/src/main.c @@ -7,8 +7,9 @@ #include #include +#include +#include -#include int testcb(void *data) { if(!data) return -1; @@ -17,14 +18,31 @@ int testcb(void *data) { return 0; } +int consumer(void *cq) { + if(!cq) + return -1; + + cqueue *rcq = (cqueue*)cq; + for(task *tsk = NULL;;) { + tsk = cqueue_waitpop(rcq); + if(!tsk) + thrd_exit(-1); + + task_fire(tsk); + } + + return 0; +} + int main() { // error(1, ENOTSUP, "No main file lol"); - - threadpool *tp = threadpool_init(2); - task *tsk = task_init(testcb, "This is some data"); - threadpool_addtask(tp, tsk); - threadpool_join(tp); - threadpool_free(tp); + + thrd_t thread; + cqueue *cq = cqueue_init(mtx_plain); + thrd_create(&thread, consumer, cq); + cqueue_addtask(cq, task_init(testcb, (void*)"This is some data")); + sleep(10); + cqueue_free(cq); return 0; } \ No newline at end of file diff --git a/src/shared.h b/src/shared.h index 9e7eaa8..825814e 100644 --- a/src/shared.h +++ b/src/shared.h @@ -4,6 +4,8 @@ #include #define STATIC_ARRAY_LEN(arr) (sizeof((arr))/sizeof((arr)[0])) +#define FALSE 0 +#define TRUE 1 #define RETURNWERR(errval, retval) do {\ errno = (errval);\ diff --git a/src/threadpool.c b/src/threadpool.c index ab0733d..9d00030 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -7,12 +7,6 @@ #include #include -/* Mutex: Lock a shared resource. Used to prevent race conditions when accessing / modifying some shared resource. A lock must -// always be followed by an unlock - -// Semaphore: Send / wait on a signal; solves the consumer/producer problem. A function that sends should never wait, and a -// function that waits should never send */ - // Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) typedef struct mtxp { void *data; @@ -86,12 +80,18 @@ int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) // Threadpool: // Array of threads // Task Queue - // Readiness semaphore + // Readiness semaphore / conditional + // Mutex // Linked List of Tasks // Task: // int (*callback)(void*) // void *arg +// Consumer: + // Wait for cqueue to pop + // Fire task + // Repeat + // Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 typedef struct task { @@ -135,14 +135,28 @@ void task_free(task *ts) { return; } +int task_fire(task *ts) { + if(!ts) + RETURNWERR(EINVAL, -1); -static void ___ucleanup_mtxd(void *mtx) { - if(!mtx) + return ts->cb(ts->arg); +} + + +/* Mutex: Lock a shared resource. Used to prevent race conditions when accessing / modifying some shared resource. A lock must +// always be followed by an unlock + +// Semaphore: Send / wait on a signal; solves the consumer/producer problem. A function that sends should never wait, and a +// function that waits should never send */ + +static void ___ucleanup_dfree(void *dll) { + if(!dll) return; - mtx_destroy((mtx_t *)mtx); + dlinkedlist_free((dlinkedlist *)dll); return; } + static void ___ucleanup_cndd(void *cnd) { if(!cnd) return; @@ -150,15 +164,15 @@ static void ___ucleanup_cndd(void *cnd) { cnd_destroy((cnd_t *)cnd); return; } -static void ___ucleanup_dll(void *dll) { - if(!dll) + +static void ___ucleanup_mtxd(void *mtx) { + if(!mtx) return; - dlinkedlist_free((dlinkedlist *)dll); + mtx_destroy((mtx_t*)mtx); return; } - cqueue * cqueue_init(int mtx_type) { cleanup_CREATE(10); @@ -167,284 +181,106 @@ cqueue * cqueue_init(int mtx_type) { return NULL; cleanup_REGISTER(free, cq); - cq->mtx = VALLOC(1, sizeof(*(cq->mtx))); - if(!(cq->mtx)) + cq->canceled = FALSE; + cq->list = dlinkedlist_init(); + if(!cq->list) cleanup_MARK(); - cleanup_CNDREGISTER(free, cq->mtx); - - if(!cleanup_ERRORFLAGGED) - if(mtx_init(cq->mtx, mtx_type) != thrd_success) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx); + cleanup_CNDREGISTER(___ucleanup_dfree, cq->list); if(!cleanup_ERRORFLAGGED) - if(!(cq->cnd = VALLOC(1, sizeof(*(cq->cnd))))) + if(!(cq->cnd = VALLOC(1, sizeof(*cq->cnd)))) cleanup_MARK(); cleanup_CNDREGISTER(free, cq->cnd); - + if(!cleanup_ERRORFLAGGED) - if(cnd_init(cq->cnd) != thrd_success) + if(cnd_init(cq->cnd) == thrd_error) cleanup_MARK(); cleanup_CNDREGISTER(___ucleanup_cndd, cq->cnd); - + if(!cleanup_ERRORFLAGGED) - if(!(cq->list = dlinkedlist_init())) + if(!(cq->mtx = VALLOC(1, sizeof(*cq->mtx)))) cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_dll, cq->list); - + cleanup_CNDREGISTER(free, cq->mtx); + if(!cleanup_ERRORFLAGGED) + if(mtx_init(cq->mtx, mtx_type) != thrd_success) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx); + if(cleanup_ERRORFLAGGED) - cleanup_fire(&__CLEANUP); - - // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions - // The implementation was not better lmao + cleanup_FIRE(); - cq->canceled = 0; return cq; } -void cqueue_free(cqueue *cq) { +void cqueue_cancel(cqueue *cq) { if(!cq) return; - // Cancel any outstanding threads before freeing everything - cqueue_cancel(cq); - - dlinkedlist_free(cq->list); - cnd_destroy(cq->cnd); - mtx_destroy(cq->mtx); - free(cq->cnd); - free(cq->mtx); - free(cq); - - return; -} - -int cqueue_append(cqueue * const cq, task *tsk) { - if(!cq || !tsk) - RETURNWERR(EINVAL, -1); - mtx_lock(cq->mtx); if(cq->canceled) { mtx_unlock(cq->mtx); - thrd_exit(thrd_timedout); + thrd_exit(-1); } - dlinkedlist_append(cq->list, tsk, free); + cq->canceled++; mtx_unlock(cq->mtx); - cnd_signal(cq->cnd); + cnd_broadcast(cq->cnd); - return 0; + return; } -int cqueue_prepend(cqueue * const cq, task *tsk) { - if(!cq || !tsk) - RETURNWERR(EINVAL, -1); - - mtx_lock(cq->mtx); - if(cq->canceled) { - mtx_unlock(cq->mtx); - thrd_exit(thrd_timedout); - } +void cqueue_free(cqueue *cq) { + if(!cq) + return; - dlinkedlist_prepend(cq->list, tsk, free); - mtx_unlock(cq->mtx); - cnd_signal(cq->cnd); + cqueue_cancel(cq); + mtx_destroy(cq->mtx); + cnd_destroy(cq->cnd); + free(cq->mtx); + free(cq->cnd); + dlinkedlist_free(cq->list); - return 0; + return; } -int cqueue_insert(cqueue * const cq, task *tsk, int index) { - if(!cq || !tsk || index < 0) // Can't check to see if the index is too high without locking the mutex first +int cqueue_addtask(cqueue * const cq, task * const tsk) { + if(!cq || !tsk) RETURNWERR(EINVAL, -1); mtx_lock(cq->mtx); + + // TODO: Think about creating an "exception" via signal handling if(cq->canceled) { mtx_unlock(cq->mtx); - thrd_exit(thrd_timedout); + thrd_exit(-1); } - - dlinkedlist_insert(cq->list, tsk, free, index); + + dlinkedlist_prepend(cq->list, tsk, free); mtx_unlock(cq->mtx); cnd_signal(cq->cnd); return 0; } -int cqueue_size(cqueue const * const cq) { +task * cqueue_waitpop(cqueue * const cq) { if(!cq) - RETURNWERR(EINVAL, -1); - - mtx_lock(cq->mtx); - if(cq->canceled) { - mtx_unlock(cq->mtx); - thrd_exit(thrd_timedout); - } - - int retval = dlinkedlist_size(cq->list); - mtx_unlock(cq->mtx); - - return retval; -} - -int cqueue_isempty(cqueue const * const cq) { - int val = cqueue_size(cq); - return (val < 0) ? -1 : (val == 0); -} + RETURNWERR(EINVAL, NULL); -int cqueue_trypop(cqueue * const cq, task **ret) { - if(!cq || !ret || !*ret) - RETURNWERR(EINVAL, -1); + task *retval = NULL; - int retval = 0; - mtx_lock(cq->mtx); - if(cq->canceled) { - mtx_unlock(cq->mtx); - thrd_exit(thrd_timedout); - } - - if(!dlinkedlist_isempty(cq->list)) { - *ret = (task*)dlinkedlist_poplast(cq->list); - retval = 1; - } - mtx_unlock(cq->mtx); - - return retval; -} - -int cqueue_waitpop(cqueue * const cq, task **ret) { - if(!cq || !ret) - RETURNWERR(EINVAL, -1); + while(dlinkedlist_isempty(cq->list) && !cq->canceled) + cnd_wait(cq->cnd, cq->mtx); - mtx_lock(cq->mtx); - - while(!dlinkedlist_isempty(cq->list) && !cq->canceled) - cnd_wait(cq->cnd, cq->mtx); // Unlocks mutex while waiting, acquires lock once waiting is done - if(cq->canceled) { mtx_unlock(cq->mtx); - thrd_exit(thrd_timedout); + thrd_exit(-1); } - - *ret = dlinkedlist_poplast(cq->list); - - mtx_unlock(cq->mtx); - - return 0; -} -int cqueue_cancel(cqueue * const cq) { - if(!cq) - RETURNWERR(EINVAL, -1); - - int retval = 0; - - mtx_lock(cq->mtx); - if(cq->canceled) - retval = -1; - else - cq->canceled++; - + retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); + dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); mtx_unlock(cq->mtx); - cnd_broadcast(cq->cnd); - - return retval; -} - -int cqueue_consumer(void *passed) { - if(!passed) - thrd_exit(thrd_error); - // Not setting errno because then I'd have to make a mutex for it - - cqueue *cq = (cqueue *)passed; - - for(task *current_task;;) { - cqueue_waitpop(cq, ¤t_task); - if(!current_task) - thrd_exit(thrd_error); - - current_task->cb(current_task->arg); - } - - thrd_exit(thrd_success); -} - -static void ___ucleanup_cqfree(void *cq) { - if(!cq) - return; - - cqueue_free(cq); - return; -} - -threadpool * threadpool_init(int threads) { - if(threads < 1) - RETURNWERR(EINVAL, NULL); - cleanup_CREATE(10); - - threadpool *tp = VALLOC(1, sizeof(*tp)); - if(!tp) - return NULL; - cleanup_REGISTER(free, tp); - - tp->taskqueue = cqueue_init(mtx_plain); - if(!tp->taskqueue) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue); - - if(!cleanup_ERRORFLAGGED) - if(!(tp->threads = VALLOC(threads, sizeof(*tp->threads)))) - cleanup_MARK(); - cleanup_CNDREGISTER(free, tp->threads); - - for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) { - tp->threads[i] = VALLOC(1, sizeof(**tp->threads)); - if(!tp->threads[i]) { - cleanup_MARK(); - for(int j = 0; j < i; j++) - free(tp->threads[j]); - } - - if(!cleanup_ERRORFLAGGED) - thrd_create(tp->threads[i], cqueue_consumer, tp->taskqueue); - // TODO: Error Checking ^ - } - - if(cleanup_ERRORFLAGGED) - cleanup_FIRE(); - else - tp->nthreads = threads; - - return tp; -} - -void threadpool_free(threadpool *tp) { - if(!tp) - return; - cqueue_free(tp->taskqueue); - for(int i = 0; i < tp->nthreads; i++) - free(tp->threads[i]); - free(tp->threads); - free(tp); - - return; -} - -int threadpool_addtask(threadpool * const tp, task * const task) { - if(!tp || !task) - RETURNWERR(EINVAL, -1); - - return cqueue_append(tp->taskqueue, task); -} - -int threadpool_join(const threadpool * const tp) { - if(!tp) - RETURNWERR(EINVAL, -1); - - for(int i = 0; i < tp->nthreads; i++) - thrd_join(*(tp->threads[i]), NULL); - - return 0; + return retval; } \ No newline at end of file diff --git a/src/threadpool.h b/src/threadpool.h index bc3ce06..bd1b787 100644 --- a/src/threadpool.h +++ b/src/threadpool.h @@ -10,27 +10,13 @@ typedef struct tp threadpool; task * task_init(task_callback cb, void *arg); void task_free(task *ts); +int task_fire(task *ts); cqueue * cqueue_init(int mtx_type); -void cqueue_free(cqueue *cq); -int cqueue_append(cqueue * const cq, task *tsk); -int cqueue_prepend(cqueue * const cq, task *tsk); -int cqueue_insert(cqueue * const cq, task *tsk, int index); -int cqueue_size(cqueue const * const cq); -int cqueue_isempty(cqueue const * const cq); -int cqueue_trypop(cqueue * const cq, task **ret); -int cqueue_waitpop(cqueue * const cq, task **ret); -int cqueue_cancel(cqueue * const cq); - -threadpool * threadpool_init(int threads); -void threadpool_free(threadpool *tp); -int threadpool_addtask(threadpool * const tp, task * const task); -int threadpool_join(const threadpool * const tp); -typedef struct mtxp mtxpair; -mtxpair * mtxpair_init(void * const data, int type); -void mtxpair_free(mtxpair *mp); -int mtxpair_setdata(mtxpair * const mp, void * const data); -int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd); +void cqueue_cancel(cqueue *cq); +void cqueue_free(cqueue *cq); +int cqueue_addtask(cqueue * const cq, task * const tsk); +task * cqueue_waitpop(cqueue * const cq); #endif \ No newline at end of file -- cgit v1.2.3