From 521e9a2fc8e0ef1dffe4e590d0d4f65292f0d1df Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Mon, 21 Apr 2025 21:26:23 -0500 Subject: Implement a Concurrent Queue --- src/ll.c | 12 +++++++ src/ll.h | 3 ++ src/shared.h | 5 ++- src/threadpool.c | 95 +++++++++++++++++++++++++++++++++++++++++++++++++++----- 4 files changed, 107 insertions(+), 8 deletions(-) diff --git a/src/ll.c b/src/ll.c index 9bb1441..285203d 100644 --- a/src/ll.c +++ b/src/ll.c @@ -196,4 +196,16 @@ int dlinkedlist_foreach(dlinkedlist *ll, int (*callback)(void*)) { callback(p->data); return 0; +} + +void * dlinkedlist_poplast(dlinkedlist *ll) { + if(!ll) + RETURNWERR(EINVAL, NULL); + if(dlinkedlist_isempty(ll)) + RETURNWERR(ENODATA, NULL); + + void *data = dlinkedlist_get(ll, ll->size - 1); + dlinkedlist_remove(ll, ll->size - 1); + + return data; } \ No newline at end of file diff --git a/src/ll.h b/src/ll.h index 3636e77..4b5a2df 100644 --- a/src/ll.h +++ b/src/ll.h @@ -25,6 +25,7 @@ typedef void (*dll_freecb)(void*); typedef struct dlinked dlinkedlist; #ifndef __VXGG_REWRITE___LL_INTERNAL___ + dlinkedlist * dlinkedlist_init(void); void dlinkedlist_free(dlinkedlist *ll); int dlinkedlist_append(dlinkedlist * const ll, void *data, dll_freecb fcb); @@ -32,11 +33,13 @@ int dlinkedlist_prepend(dlinkedlist * const ll, void *data, dll_freecb fcb); int dlinkedlist_insert(dlinkedlist * const ll, void *data, dll_freecb fcb, int index); void* dlinkedlist_get(const dlinkedlist * const ll, int index); int dlinkedlist_remove(dlinkedlist * const ll, int index); +void * dlinkedlist_poplast(dlinkedlist *ll); int dlinkedlist_size(const dlinkedlist * const ll); #define dlinkedlist_isempty(ll) (dlinkedlist_size((ll)) == 0) int dlinkedlist_foreach(dlinkedlist *ll, int (*callback)(void*)); + #endif #endif \ No newline at end of file diff --git a/src/shared.h b/src/shared.h index 1a629e6..3b0a5f7 100644 --- a/src/shared.h +++ b/src/shared.h @@ -76,10 +76,13 @@ int cleanup_fire(cleanup * const loc); /* Cleanup environment creator. Automatically defines the variables `__CLEANUP`, `__CLEANUP_FUNCS`, and `__CLEANUP_ARGS` and initializes // via `cleanup_init()` using these variables. */ -#define cleanup_create(size) \ +#define cleanup_CREATE(size) \ cleanup __CLEANUP; \ cleanup_callback __CLEANUP_FUNCS[(size)]; \ void *__CLEANUP_ARGS[(size)]; \ cleanup_init(&__CLEANUP, (size), __CLEANUP_FUNCS, __CLEANUP_ARGS) +#define cleanup_REGISTER(cb, arg) cleanup_register(&__CLEANUP, (cb), (arg)) +#define cleanup_CNDREGISTER(flag, cb, arg) cleanup_cndregister(&__CLEANUP, (flag), (cb), (arg)) + #endif \ No newline at end of file diff --git a/src/threadpool.c b/src/threadpool.c index e230ebb..808dd5d 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -4,6 +4,7 @@ #include "ll.h" +#include #include #include #include @@ -161,34 +162,34 @@ static void ___ucleanup_dll(void *dll) { cqueue * cqueue_init(int mtx_type) { unsigned char flag = 0; - cleanup_create(10); + cleanup_CREATE(10); cqueue *cq = VALLOC(1, sizeof(*cq)); if(!cq) return NULL; - cleanup_register(&__CLEANUP, free, cq); + cleanup_REGISTER(free, cq); cq->mutex = VALLOC(1, sizeof(*(cq->mutex))); if(!(cq->mutex)) flag++; - cleanup_cndregister(&__CLEANUP, flag, free, cq->mutex); + cleanup_CNDREGISTER(flag, free, cq->mutex); if(!flag && mtx_init(cq->mutex, mtx_type) != thrd_success) flag++; - cleanup_cndregister(&__CLEANUP, flag, ___ucleanup_mtxd, cq->mutex); + cleanup_CNDREGISTER(flag, ___ucleanup_mtxd, cq->mutex); if(!flag && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional))))) flag++; - cleanup_cndregister(&__CLEANUP, flag, free, cq->conditional); + cleanup_CNDREGISTER(flag, free, cq->conditional); if(!flag && cnd_init(cq->conditional) != thrd_success) flag++; - cleanup_cndregister(&__CLEANUP, flag, ___ucleanup_cndd, cq->conditional); + cleanup_CNDREGISTER(flag, ___ucleanup_cndd, cq->conditional); cq->list = dlinkedlist_init(); if(!flag && !cq->list) flag++; - cleanup_cndregister(&__CLEANUP, flag, ___ucleanup_dll, cq->list); + cleanup_CNDREGISTER(flag, ___ucleanup_dll, cq->list); if(flag) cleanup_fire(&__CLEANUP); @@ -210,4 +211,84 @@ void cqueue_free(cqueue *cq) { free(cq); return; +} + +int cqueue_append(cqueue * const cq, task *tsk) { + if(!cq || !tsk) + RETURNWERR(EINVAL, -1); + + mtx_lock(cq->mutex); + dlinkedlist_append(cq->list, tsk, free); + mtx_unlock(cq->mutex); + cnd_signal(cq->conditional); + + return 0; +} + +int cqueue_prepend(cqueue * const cq, task *tsk) { + if(!cq || !tsk) + RETURNWERR(EINVAL, -1); + + mtx_lock(cq->mutex); + dlinkedlist_prepend(cq->list, tsk, free); + mtx_unlock(cq->mutex); + cnd_signal(cq->conditional); + + return 0; +} + +int cqueue_insert(cqueue * const cq, task *tsk, int index) { + if(!cq || !tsk || index < 0) // Can't check to see if the index is too high without locking the mutex first + RETURNWERR(EINVAL, -1); + + mtx_lock(cq->mutex); + dlinkedlist_insert(cq->list, tsk, free, index); + mtx_unlock(cq->mutex); + cnd_signal(cq->conditional); + + return 0; +} + +int cqueue_size(cqueue const * const cq) { + if(!cq) + RETURNWERR(EINVAL, -1); + + mtx_lock(cq->mutex); + int retval = dlinkedlist_size(cq->list); + mtx_unlock(cq->mutex); + + return retval; +} + +int cqueue_isempty(cqueue const * const cq) { + return (cqueue_size(cq) == 0); +} + +int cqueue_trypop(cqueue * const cq, task **ret) { + if(!cq || !ret || !*ret) + RETURNWERR(EINVAL, -1); + + int retval = 0; + + mtx_lock(cq->mutex); + if(!dlinkedlist_isempty(cq->list)) { + *ret = (task*)dlinkedlist_poplast(cq->list); + retval = 1; + } + mtx_unlock(cq->mutex); + + return retval; +} + +int cqueue_waitpop(cqueue * const cq, task **ret) { + if(!cq || !ret || !*ret) + RETURNWERR(EINVAL, -1); + + mtx_lock(cq->mutex); + while(!dlinkedlist_isempty(cq->list)) + cnd_wait(cq->conditional, cq->mutex); // Unlocks mutex while waiting, acquires lock once waiting is done + *ret = dlinkedlist_poplast(cq->list); + mtx_unlock(cq->mutex); + + return 0; } \ No newline at end of file -- cgit v1.2.3