/** * @file threadpool.c * @author syxhe (https://t.me/syxhe) * @brief An implementation of a threadpool using libc threads * @version 0.1 * @date 2025-06-09 * * @copyright Copyright (c) 2025 * */ #ifndef __VXGG_REWRITE___THREADPOOL_C___193271180830131___ #define __VXGG_REWRITE___THREADPOOL_C___193271180830131___ 1 #include "shared.c" #include #include #include #include /** * @brief A generic task - A function, data for that function, and a way to free the data * */ typedef struct task { gcallback callback; //!< A generic callback to be ran when executing the task fcallback freecb; //!< A free()-like callback to deal with the data void *data; //!< Some generic data for the generic callback } task; /** * @brief An internal structure used for the `taskqueue`. Analogous to a doubly-linked list's internal node * */ typedef struct tqnode { struct tqnode *next; //!< The next element in the `taskqueue` struct tqnode *prev; //!< The previous element in the `taskqueue` task *task; //!< The current element's `task` } tqnode; /** * @brief A FIFO queue of tasks * */ typedef struct taskqueue { tqnode *start; //!< The first element of the queue tqnode *end; //!< The final element of the queue unsigned int size; //!< The number of elements in the queue } taskqueue; /** * @brief A `taskqueue` built for concurrent access. Essentially a threadpool * */ typedef struct ctqueue { mtx_t mutex; //!< A mutex for locking sensitive resources cnd_t cond; //!< A conditional for waiting on / sending a signal unsigned char canceled; //!< Whether the threads are currently canceled or not taskqueue *tq; //!< A taskqueue to be accessed concurrently thrd_t *thrdarr; //!< An array of threads to be dispatched as consumers int talen; //!< The length of the thread array } ctqueue; /** * @brief Create a task * * @param callback Callback function the given data should be ran with. Must be non-null * @param freecb Callback function for freeing the given data. May be null * @param data Data to be passed to the callback. May be null * @retval (task*)[NULL, task*] Returns a task object with set parameters. Returns `null` and sets errno on error */ task * task_init(gcallback callback, fcallback freecb, void *data) { if(callback == NULL) ERRRET(EINVAL, NULL); task *tsk = calloc(1, sizeof(*tsk)); if(!tsk) return NULL; tsk->callback = callback; tsk->freecb = freecb; tsk->data = data; return tsk; } /** * @brief Free a task * * @param tsk A task object to free. Frees data associated with task via `freecb` value specified in its creation. May be null */ void task_free(void *tsk) { task *real = (task *)tsk; if(!real) return; if(real->freecb != NULL) real->freecb(real->data); free(real); return; } /** * @brief Fire a task. Passes the `data` member to the specified `callback` * * @param tsk A task to be fired. Must be non-null * @retval (int) Returns value of the fired callback. Returns -1 and sets errno on error */ int task_fire(task *tsk) { if(!tsk) ERRRET(EINVAL, -1); return tsk->callback(tsk->data); } /** * @brief Fire and destroy a task simultaneously. Calls specified callback and free-callback on associated data * * @param tsk Task to be fired and destroyed. Must be non-null * @retval (int) Returns value of the callback. Returns -1 and sets errno on error */ int task_fired(task *tsk) { int retval = task_fire(tsk); if(errno == EINVAL && retval == -1) {return -1;} task_free(tsk); return retval; } tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) { if(!tsk) ERRRET(EINVAL, NULL); tqnode *node = calloc(1, sizeof(*node)); if(!node) return NULL; node->next = next; node->prev = prev; node->task = tsk; return node; } void tqnode_free(void *tqn) { tqnode *real = (tqnode *)tqn; if(!real) return; task_free(real->task); free(real); return; } /** * @brief Create a FIFO queue of tasks * * @retval (taskqueue*)[NULL, taskqueue*] Returns a new taskqueue object. Returns `null` and sets errno on error */ taskqueue * taskqueue_init(void) { taskqueue *tq = calloc(1, sizeof(*tq)); if(!tq) return NULL; tq->start = NULL; tq->end = NULL; tq->size = 0; return tq; } /** * @brief Free a taskqueue * * @param tq A taskqueue to be freed. May be null */ void taskqueue_free(void *tq) { if(!tq) return; for(tqnode *p = ((taskqueue*)tq)->start, *n; p != NULL;) { n = p->next; tqnode_free(p); p = n; } free(tq); return; } int taskqueue_handlefirst(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); if(tq->size) {return 0;} tqnode *first = tqnode_init(NULL, NULL, tsk); if(!first) return -1; tq->start = first; tq->end = first; tq->size = 1; return 1; } /** * @brief Push a task onto a taskqueue * * @param tq The taskqueue to be modified. Must be non-null * @param tsk The task to push. Must be non-null * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error */ int taskqueue_push(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); int hf; if((hf = taskqueue_handlefirst(tq, tsk))) return (hf >= 0) ? 0 : -1; tqnode *newstart = tqnode_init(tq->start, NULL, tsk); if(!newstart) return -1; tq->start->prev = newstart; tq->start = newstart; tq->size++; return 0; } /** * @brief Pop a task from a taskqueue * * @param tq A taskqueue to grab a task from. Must be non-null * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error */ task * taskqueue_pop(taskqueue *tq) { if(!tq) ERRRET(EINVAL, NULL); if(tq->size <= 0) ERRRET(ENODATA, NULL); tqnode *end = tq->end; task *ret = end->task; if(tq->size == 1) { tq->end = NULL; tq->start = NULL; } else { tq->end = end->prev; tq->end->next = NULL; } free(end); tq->size--; return ret; } /** * @brief Append a task to the front of a taskqueue * * @param tq The taskqueue to be modified. Must be non-null * @param tsk The task to be appended. Must be non-null * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error */ int taskqueue_pushfront(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); int hf; if((hf = taskqueue_handlefirst(tq, tsk))) return (hf >= 0) ? 0 : -1; tqnode *newend = tqnode_init(NULL, tq->end, tsk); if(!newend) return -1; tq->end->next = newend; tq->end = newend; tq->size++; return 0; } /** * @brief Pop a task from the back (most recently pushed task) of a taskqueue * * @param tq A taskqueue to pop from. Must be non-null * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error */ task * taskqueue_popback(taskqueue *tq) { if(!tq) ERRRET(EINVAL, NULL); if(tq->size <= 0) ERRRET(ENODATA, NULL); tqnode *start = tq->start; task *ret = start->task; if(tq->size == 1) { tq->start = NULL; tq->end = NULL; } else { tq->start = start->next; tq->start->prev = NULL; } free(start); tq->size--; return ret; } int taskqueue_size(taskqueue *tq) { if(!tq) ERRRET(EINVAL, -1); return tq->size; } //! Internal helper macro for ctq functions. Acquires a lock via the ctq's mutex, checks to see if the queue has been canceled, then executes "code" as written #define __CTQ_INLOCK(ctq, retval, code) do {\ mtx_lock(&(ctq)->mutex); \ if((ctq)->canceled) { \ errno = ECANCELED; \ mtx_unlock(&(ctq)->mutex); \ return (retval); \ } \ \ code \ mtx_unlock(&(ctq)->mutex); \ } while (0) static void ___ucl_mtxdestroy(void *mtx) { if(!mtx) return; mtx_destroy((mtx_t *)mtx); return; } static void ___ucl_cnddestroy(void *cond) { if(cond) return; cnd_destroy((cnd_t *)cond); return; } /** * @brief Create a concurrent taskqueue with `size` allocated threads * * @param size Number of threads in the threadpool. Must be greater than zero * @retval (ctqueue*)[NULL, ctqueue*] Returns a new ctqueue, sets errno and returns `null` on error */ ctqueue * ctqueue_init(int nthreads) { if(nthreads <= 0) ERRRET(EINVAL, NULL); cleanup_CREATE(6); ctqueue *ctq = calloc(1, sizeof(*ctq)); if(!ctq) return NULL; cleanup_REGISTER(free, ctq); ctq->canceled = 0; ctq->talen = nthreads; cleanup_CNDEXEC( ctq->tq = taskqueue_init(); if(!ctq->tq) cleanup_MARK(); cleanup_CNDREGISTER(taskqueue_free, ctq->tq); ); cleanup_CNDEXEC( if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) cleanup_MARK(); cleanup_CNDREGISTER(___ucl_mtxdestroy, (void*)&ctq->mutex); ); cleanup_CNDEXEC( if(cnd_init(&ctq->cond) != thrd_success) cleanup_MARK(); cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond); ); cleanup_CNDEXEC( ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); if(!ctq->thrdarr) cleanup_MARK(); cleanup_CNDREGISTER(free, ctq->thrdarr); ) cleanup_CNDFIRE(); if(cleanup_ERRORFLAGGED) return NULL; return ctq; } /** * @brief Cancel all tasks being processed in a currently running concurrent taskqueue * * @param ctq The concurrent taskqueue to be canceled. Must be non-null * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error */ int ctqueue_cancel(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, -1); __CTQ_INLOCK(ctq, 1, ctq->canceled = 1; ); cnd_broadcast(&ctq->cond); return 0; } /** * @brief Free a concurrent taskqueue * @attention This cancels all currently running threads via `ctqueue_cancel` * * @param ctq The concurrent taskqueue to free. May be null */ void ctqueue_free(void *ctq) { if(!ctq) return; ctqueue *real = (ctqueue *)ctq; ctqueue_cancel(real); for(int i = 0; i < real->talen; i++) thrd_join(real->thrdarr[i], NULL); // Threads are dead, everything's free game mtx_destroy(&real->mutex); cnd_destroy(&real->cond); taskqueue_free(real->tq); free(real->thrdarr); free(real); // TODO: figure out if it's necessary / a good idea to do error handling on these functions return; } /** * @brief Push a task onto a concurrent taskqueue * @attention May block for an indefinite amount of time to push the task * * @param ctq The concurrent taskqueue to modify. Must be non-null * @param tsk The task to push. Must be non-null * @retval (int) Returns `thrd_success` on success, returns `thrd_error` or `thrd_nomem` on error */ int ctqueue_waitpush(ctqueue *ctq, task *tsk) { if(!ctq || !tsk) ERRRET(EINVAL, -1); int retval = 0; __CTQ_INLOCK(ctq, -1, retval = taskqueue_push(ctq->tq, tsk); ); if(retval == 0) cnd_signal(&ctq->cond); return retval; } /** * @brief Pop a task from the concurrent taskqueue * @attention May block for an indefinite amount of time to pop the task * * @param ctq The concurrent taskqueue to pop from. Must be non-null * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error */ task * ctqueue_waitpop(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, NULL); task *retval = NULL; __CTQ_INLOCK(ctq, NULL, while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled) cnd_wait(&ctq->cond, &ctq->mutex); if(ctq->canceled) { mtx_unlock(&ctq->mutex); ERRRET(ECANCELED, NULL); } retval = taskqueue_pop(ctq->tq); ); return retval; } //! Simple consumer for eating and executing tasks from the ctq static int __CTQ_CONSUMER(void *ctq) { if(!ctq) {errno = EINVAL; thrd_exit(-1);} ctqueue *real = (ctqueue *)ctq; for(task *ctask = NULL;;) { ctask = ctqueue_waitpop(real); if(!ctask) break; task_fire(ctask); task_free(ctask); } thrd_exit(1); } // TODO: Make this function return 0 or -1 depending on whether the overall ctq has been canceled or not. Canceling shouldn't // be treated as an error /** * @brief Start the threads allocated to a concurrent taskqueue * @attention Threads will not consume pushed tasks until this function is ran * * @param ctq A concurrent taskqueue to start. Must be non-null * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error */ int ctqueue_start(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, -1); ctq->canceled = 0; int retval = 0; for(int i = 0; i < ctq->talen; i++) if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success) break; if(retval != thrd_success) ctqueue_cancel(ctq); return (retval == thrd_success) ? 0 : -1; } #endif