From 33647dd2c4f4af7c7cb7d99c1a6dac7f1d059a67 Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Mon, 20 Oct 2025 15:51:49 -0500 Subject: Switch to unity build --- src/threadpool.c | 148 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 126 insertions(+), 22 deletions(-) (limited to 'src/threadpool.c') diff --git a/src/threadpool.c b/src/threadpool.c index 02cd945..c266964 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -1,15 +1,18 @@ /** * @file threadpool.c * @author syxhe (https://t.me/syxhe) - * @brief *Implementing `threadpool.h`* + * @brief An implementation of a threadpool using libc threads * @version 0.1 * @date 2025-06-09 - * + * * @copyright Copyright (c) 2025 - * + * */ -#include "threadpool.h" +#ifndef __VXGG_REWRITE___THREADPOOL_C___193271180830131___ +#define __VXGG_REWRITE___THREADPOOL_C___193271180830131___ 1 + +#include "shared.c" #include #include @@ -18,7 +21,7 @@ /** * @brief A generic task - A function, data for that function, and a way to free the data - * + * */ typedef struct task { gcallback callback; //!< A generic callback to be ran when executing the task @@ -28,7 +31,7 @@ typedef struct task { /** * @brief An internal structure used for the `taskqueue`. Analogous to a doubly-linked list's internal node - * + * */ typedef struct tqnode { struct tqnode *next; //!< The next element in the `taskqueue` @@ -38,7 +41,7 @@ typedef struct tqnode { /** * @brief A FIFO queue of tasks - * + * */ typedef struct taskqueue { tqnode *start; //!< The first element of the queue @@ -48,7 +51,7 @@ typedef struct taskqueue { /** * @brief A `taskqueue` built for concurrent access. Essentially a threadpool - * + * */ typedef struct ctqueue { mtx_t mutex; //!< A mutex for locking sensitive resources @@ -60,7 +63,14 @@ typedef struct ctqueue { int talen; //!< The length of the thread array } ctqueue; - +/** + * @brief Create a task + * + * @param callback Callback function the given data should be ran with. Must be non-null + * @param freecb Callback function for freeing the given data. May be null + * @param data Data to be passed to the callback. May be null + * @retval (task*)[NULL, task*] Returns a task object with set parameters. Returns `null` and sets errno on error + */ task * task_init(gcallback callback, fcallback freecb, void *data) { if(callback == NULL) ERRRET(EINVAL, NULL); @@ -75,11 +85,16 @@ task * task_init(gcallback callback, fcallback freecb, void *data) { return tsk; } +/** + * @brief Free a task + * + * @param tsk A task object to free. Frees data associated with task via `freecb` value specified in its creation. May be null + */ void task_free(void *tsk) { task *real = (task *)tsk; if(!real) return; - + if(real->freecb != NULL) real->freecb(real->data); free(real); @@ -87,11 +102,23 @@ void task_free(void *tsk) { return; } +/** + * @brief Fire a task. Passes the `data` member to the specified `callback` + * + * @param tsk A task to be fired. Must be non-null + * @retval (int) Returns value of the fired callback. Returns -1 and sets errno on error + */ int task_fire(task *tsk) { if(!tsk) ERRRET(EINVAL, -1); return tsk->callback(tsk->data); } +/** + * @brief Fire and destroy a task simultaneously. Calls specified callback and free-callback on associated data + * + * @param tsk Task to be fired and destroyed. Must be non-null + * @retval (int) Returns value of the callback. Returns -1 and sets errno on error + */ int task_fired(task *tsk) { int retval = task_fire(tsk); if(errno == EINVAL && retval == -1) {return -1;} @@ -125,7 +152,11 @@ void tqnode_free(void *tqn) { - +/** + * @brief Create a FIFO queue of tasks + * + * @retval (taskqueue*)[NULL, taskqueue*] Returns a new taskqueue object. Returns `null` and sets errno on error + */ taskqueue * taskqueue_init(void) { taskqueue *tq = calloc(1, sizeof(*tq)); if(!tq) @@ -138,6 +169,11 @@ taskqueue * taskqueue_init(void) { return tq; } +/** + * @brief Free a taskqueue + * + * @param tq A taskqueue to be freed. May be null + */ void taskqueue_free(void *tq) { if(!tq) return; @@ -148,7 +184,7 @@ void taskqueue_free(void *tq) { p = n; } free(tq); - + return; } @@ -167,6 +203,13 @@ int taskqueue_handlefirst(taskqueue *tq, task *tsk) { return 1; } +/** + * @brief Push a task onto a taskqueue + * + * @param tq The taskqueue to be modified. Must be non-null + * @param tsk The task to push. Must be non-null + * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error + */ int taskqueue_push(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); @@ -184,12 +227,18 @@ int taskqueue_push(taskqueue *tq, task *tsk) { return 0; } +/** + * @brief Pop a task from a taskqueue + * + * @param tq A taskqueue to grab a task from. Must be non-null + * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error + */ task * taskqueue_pop(taskqueue *tq) { if(!tq) ERRRET(EINVAL, NULL); if(tq->size <= 0) ERRRET(ENODATA, NULL); tqnode *end = tq->end; - task *ret = end->task; + task *ret = end->task; if(tq->size == 1) { tq->end = NULL; @@ -204,6 +253,13 @@ task * taskqueue_pop(taskqueue *tq) { return ret; } +/** + * @brief Append a task to the front of a taskqueue + * + * @param tq The taskqueue to be modified. Must be non-null + * @param tsk The task to be appended. Must be non-null + * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error + */ int taskqueue_pushfront(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); @@ -221,6 +277,12 @@ int taskqueue_pushfront(taskqueue *tq, task *tsk) { return 0; } +/** + * @brief Pop a task from the back (most recently pushed task) of a taskqueue + * + * @param tq A taskqueue to pop from. Must be non-null + * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error + */ task * taskqueue_popback(taskqueue *tq) { if(!tq) ERRRET(EINVAL, NULL); if(tq->size <= 0) ERRRET(ENODATA, NULL); @@ -273,6 +335,12 @@ static void ___ucl_cnddestroy(void *cond) { return; } +/** + * @brief Create a concurrent taskqueue with `size` allocated threads + * + * @param size Number of threads in the threadpool. Must be greater than zero + * @retval (ctqueue*)[NULL, ctqueue*] Returns a new ctqueue, sets errno and returns `null` on error + */ ctqueue * ctqueue_init(int nthreads) { if(nthreads <= 0) ERRRET(EINVAL, NULL); cleanup_CREATE(6); @@ -291,7 +359,7 @@ ctqueue * ctqueue_init(int nthreads) { cleanup_MARK(); cleanup_CNDREGISTER(taskqueue_free, ctq->tq); ); - + cleanup_CNDEXEC( if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) cleanup_MARK(); @@ -303,7 +371,7 @@ ctqueue * ctqueue_init(int nthreads) { cleanup_MARK(); cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond); ); - + cleanup_CNDEXEC( ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); if(!ctq->thrdarr) @@ -318,10 +386,16 @@ ctqueue * ctqueue_init(int nthreads) { return ctq; } +/** + * @brief Cancel all tasks being processed in a currently running concurrent taskqueue + * + * @param ctq The concurrent taskqueue to be canceled. Must be non-null + * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error + */ int ctqueue_cancel(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, -1); - __CTQ_INLOCK(ctq, 1, + __CTQ_INLOCK(ctq, 1, ctq->canceled = 1; ); cnd_broadcast(&ctq->cond); @@ -329,6 +403,12 @@ int ctqueue_cancel(ctqueue *ctq) { return 0; } +/** + * @brief Free a concurrent taskqueue + * @attention This cancels all currently running threads via `ctqueue_cancel` + * + * @param ctq The concurrent taskqueue to free. May be null + */ void ctqueue_free(void *ctq) { if(!ctq) return; @@ -351,11 +431,19 @@ void ctqueue_free(void *ctq) { return; } +/** + * @brief Push a task onto a concurrent taskqueue + * @attention May block for an indefinite amount of time to push the task + * + * @param ctq The concurrent taskqueue to modify. Must be non-null + * @param tsk The task to push. Must be non-null + * @retval (int) Returns `thrd_success` on success, returns `thrd_error` or `thrd_nomem` on error + */ int ctqueue_waitpush(ctqueue *ctq, task *tsk) { if(!ctq || !tsk) ERRRET(EINVAL, -1); int retval = 0; - __CTQ_INLOCK(ctq, -1, + __CTQ_INLOCK(ctq, -1, retval = taskqueue_push(ctq->tq, tsk); ); if(retval == 0) @@ -364,11 +452,18 @@ int ctqueue_waitpush(ctqueue *ctq, task *tsk) { return retval; } +/** + * @brief Pop a task from the concurrent taskqueue + * @attention May block for an indefinite amount of time to pop the task + * + * @param ctq The concurrent taskqueue to pop from. Must be non-null + * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error + */ task * ctqueue_waitpop(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, NULL); task *retval = NULL; - __CTQ_INLOCK(ctq, NULL, + __CTQ_INLOCK(ctq, NULL, while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled) cnd_wait(&ctq->cond, &ctq->mutex); @@ -386,7 +481,7 @@ task * ctqueue_waitpop(ctqueue *ctq) { //! Simple consumer for eating and executing tasks from the ctq static int __CTQ_CONSUMER(void *ctq) { if(!ctq) {errno = EINVAL; thrd_exit(-1);} - ctqueue *real = (ctqueue *)ctq; + ctqueue *real = (ctqueue *)ctq; for(task *ctask = NULL;;) { ctask = ctqueue_waitpop(real); @@ -399,14 +494,21 @@ static int __CTQ_CONSUMER(void *ctq) { thrd_exit(1); } -// TODO: Make this function return 0 or -1 depending on whether the overall ctq has been canceled or not. Canceling shouldn't +// TODO: Make this function return 0 or -1 depending on whether the overall ctq has been canceled or not. Canceling shouldn't // be treated as an error +/** + * @brief Start the threads allocated to a concurrent taskqueue + * @attention Threads will not consume pushed tasks until this function is ran + * + * @param ctq A concurrent taskqueue to start. Must be non-null + * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error + */ int ctqueue_start(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, -1); ctq->canceled = 0; - + int retval = 0; for(int i = 0; i < ctq->talen; i++) if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success) @@ -416,4 +518,6 @@ int ctqueue_start(ctqueue *ctq) { ctqueue_cancel(ctq); return (retval == thrd_success) ? 0 : -1; -} \ No newline at end of file +} + +#endif \ No newline at end of file -- cgit v1.2.3