/** * @file threadpool.c * @author syxhe (https://t.me/syxhe) * @brief An implementation of a threadpool using libc threads * @version 0.1 * @date 2025-06-09 * * @copyright Copyright (c) 2025 * */ #ifndef __VXGG_REWRITE___THREADPOOL_C___193271180830131___ #define __VXGG_REWRITE___THREADPOOL_C___193271180830131___ 1 #include "shared.c" #include #include #include #include #include /** * @brief A generic task - A function, data for that function, and a way to free the data * */ typedef struct task { gcallback callback; //!< A generic callback to be ran when executing the task fcallback freecb; //!< A free()-like callback to deal with the data void *data; //!< Some generic data for the generic callback } task; /** * @brief An internal structure used for the `taskqueue`. Analogous to a doubly-linked list's internal node * */ typedef struct tqnode { struct tqnode *next; //!< The next element in the `taskqueue` struct tqnode *prev; //!< The previous element in the `taskqueue` task *task; //!< The current element's `task` } tqnode; /** * @brief A FIFO queue of tasks * */ typedef struct taskqueue { tqnode *start; //!< The first element of the queue tqnode *end; //!< The final element of the queue size_t size; //!< The number of elements in the queue } taskqueue; /** * @brief A `taskqueue` built for concurrent access. Essentially a threadpool * */ typedef struct ctqueue { mtx_t mutex; //!< A mutex for locking sensitive resources cnd_t cond; //!< A conditional for waiting on / sending a signal uint8_t canceled; //!< Whether the threads are currently canceled or not taskqueue *tq; //!< A taskqueue to be accessed concurrently thrd_t *thrdarr; //!< An array of threads to be dispatched as consumers int talen; //!< The length of the thread array // Consider making these another linked list or stack or something } ctqueue; /** * @brief Create a task * * @param callback Callback function the given data should be ran with. Must be non-null * @param freecb Callback function for freeing the given data. May be null * @param data Data to be passed to the callback. May be null * @retval (task*)[NULL, task*] Returns a task object with set parameters. Returns `null` and sets errno on error */ task * task_new(gcallback callback, fcallback freecb, void *data) { if(callback == NULL) ERRRET(EINVAL, NULL); task *tsk = calloc(1, sizeof(*tsk)); if(!tsk) return NULL; tsk->callback = callback; tsk->freecb = freecb; tsk->data = data; return tsk; } /** * @brief Free a task * * @param tsk A task object to free. Frees data associated with task via `freecb` value specified in its creation. May be null */ void task_free(void *tsk) { task *real = tsk; if(!real) return; if(real->freecb) real->freecb(real->data); free(real); return; } /** * @brief Fire a task. Passes the `data` member to the specified `callback` * * @param tsk A task to be fired. Must be non-null * @retval (int) Returns value of the fired callback. Returns -1 and sets errno on error */ int task_fire(task *tsk) { if(!tsk) ERRRET(EINVAL, -1); return tsk->callback(tsk->data); } /** * @brief Fire and destroy a task simultaneously. Calls specified callback and free-callback on associated data * * @param tsk Task to be fired and destroyed. Must be non-null * @retval (int) Returns value of the callback. Returns -1 and sets errno on error */ int task_fired(task *tsk) { if(!tsk) return -1; int retval = task_fire(tsk); task_free(tsk); return retval; } tqnode * tqnode_new(tqnode *next, tqnode *prev, task *tsk) { if(!tsk) ERRRET(EINVAL, NULL); tqnode *node = calloc(1, sizeof(*node)); if(!node) return NULL; node->next = next; node->prev = prev; node->task = tsk; return node; } // Create a tqnode and task at the same time. Returns a valid tqnode with a valid task on success, NULL on error. Does not call task_free on error tqnode * tqnode_newtask(tqnode *next, tqnode *prev, gcallback callback, fcallback freecb, void *data) { task *tsk = task_new(callback, freecb, data); if(!tsk) return NULL; tqnode *node = tqnode_new(next, prev, tsk); if(!node) free(tsk); return node; } void tqnode_free(void *tqn) { tqnode *real = tqn; if(!real) return; task_free(real->task); free(real); return; } /** * @brief Create a FIFO queue of tasks * * @retval (taskqueue*)[NULL, taskqueue*] Returns a new taskqueue object. Returns `null` and sets errno on error */ taskqueue * taskqueue_new(void) { taskqueue *tq = calloc(1, sizeof(*tq)); if(!tq) return NULL; tq->start = NULL; tq->end = NULL; tq->size = 0; return tq; } /** * @brief Free a taskqueue * * @param tq A taskqueue to be freed. May be null */ void taskqueue_free(void *tq) { taskqueue *real = tq; if(!real) return; for(tqnode *p = real->start, *n; p != NULL;) { n = p->next; tqnode_free(p); p = n; } free(real); return; } int taskqueue_handlefirst(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); if(tq->size) {return 0;} tqnode *first = tqnode_new(NULL, NULL, tsk); if(!first) return -1; tq->start = first; tq->end = first; tq->size = 1; return 1; } /** * @brief Push a task onto a taskqueue * * @param tq The taskqueue to be modified. Must be non-null * @param tsk The task to push. Must be non-null * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error */ int taskqueue_push(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); int hf; if((hf = taskqueue_handlefirst(tq, tsk))) return (hf >= 0) ? 0 : -1; tqnode *curstart = tq->start; tqnode *newstart = tqnode_new(curstart, NULL, tsk); if(!newstart) return -1; curstart->prev = newstart; tq->start = newstart; tq->size++; return 0; } /** * @brief Pop a task from a taskqueue * * @param tq A taskqueue to grab a task from. Must be non-null * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error */ task * taskqueue_pop(taskqueue *tq) { if(!tq) ERRRET(EINVAL, NULL); if(tq->size <= 0) ERRRET(ENODATA, NULL); tqnode *curend = tq->end; task *ret = curend->task; if(tq->size == 1) { tq->end = NULL; tq->start = NULL; } else { tq->end = curend->prev; tq->end->next = NULL; } free(curend); tq->size--; return ret; } /** * @brief Append a task to the front of a taskqueue * * @param tq The taskqueue to be modified. Must be non-null * @param tsk The task to be appended. Must be non-null * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error */ int taskqueue_pushfront(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); int hf; if((hf = taskqueue_handlefirst(tq, tsk))) return (hf >= 0) ? 0 : -1; tqnode *end = tq->end; tqnode *newend = tqnode_new(NULL, end, tsk); if(!newend) return -1; end->next = newend; tq->end = newend; tq->size++; return 0; } /** * @brief Pop a task from the back (most recently pushed task) of a taskqueue * * @param tq A taskqueue to pop from. Must be non-null * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error */ task * taskqueue_popback(taskqueue *tq) { if(!tq) ERRRET(EINVAL, NULL); if(tq->size <= 0) ERRRET(ENODATA, NULL); tqnode *curstart = tq->start; task *ret = curstart->task; if(tq->size == 1) { tq->start = NULL; tq->end = NULL; } else { tq->start = curstart->next; tq->start->prev = NULL; } free(curstart); tq->size--; return ret; } int taskqueue_size(taskqueue *tq) { if(!tq) ERRRET(EINVAL, -1); return tq->size; } //! Internal helper macro for ctq functions. Acquires a lock via the ctq's mutex, checks to see if the queue has been canceled, then executes "code" as written #define __CTQ_INLOCK(ctq, retval, code) do {\ mtx_lock(&(ctq)->mutex); \ if((ctq)->canceled) { \ errno = ECANCELED; \ mtx_unlock(&(ctq)->mutex); \ return (retval); \ } \ code \ mtx_unlock(&(ctq)->mutex); \ } while (0) static void mtxd_helper(mtx_t *mutex) { if(!mutex) return; mtx_destroy(mutex); return; } static void cndd_helper(cnd_t *cond) { if(!cond) return; cnd_destroy(cond); return; } /** * @brief Create a concurrent taskqueue with `size` allocated threads * * @param size Number of threads in the threadpool. Must be greater than zero * @retval (ctqueue*)[NULL, ctqueue*] Returns a new ctqueue, sets errno and returns `null` on error */ ctqueue * ctqueue_init(int nthreads) { if(nthreads <= 0) ERRRET(EINVAL, NULL); ctqueue *ctq = calloc(1, sizeof(*ctq)); if(!ctq) return NULL; ctq->canceled = 0; ctq->talen = nthreads; ctq->tq = taskqueue_new(); if(!ctq->tq) goto ERR_ctqueue_init; if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) goto ERR_ctqueue_init; if(cnd_init(&ctq->cond) != thrd_success) goto ERR_ctqueue_init; ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); if(!ctq->thrdarr) goto ERR_ctqueue_init; return ctq; ERR_ctqueue_init: free(ctq->thrdarr); cndd_helper(&ctq->cond); mtxd_helper(&ctq->mutex); taskqueue_free(ctq->tq); free(ctq); return NULL; } /** * @brief Cancel all tasks being processed in a currently running concurrent taskqueue * * @param ctq The concurrent taskqueue to be canceled. Must be non-null * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error */ int ctqueue_cancel(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, -1); __CTQ_INLOCK(ctq, 1, ctq->canceled = 1; ); cnd_broadcast(&ctq->cond); return 0; } /** * @brief Free a concurrent taskqueue * @attention This cancels all currently running threads via `ctqueue_cancel` * * @param ctq The concurrent taskqueue to free. May be null */ void ctqueue_free(void *ctq) { if(!ctq) return; ctqueue *real = (ctqueue *)ctq; ctqueue_cancel(real); for(int i = 0; i < real->talen; i++) thrd_join(real->thrdarr[i], NULL); // Threads are dead, everything's free game mtx_destroy(&real->mutex); cnd_destroy(&real->cond); taskqueue_free(real->tq); free(real->thrdarr); free(real); return; } /** * @brief Push a task onto a concurrent taskqueue * @attention May block for an indefinite amount of time to push the task * * @param ctq The concurrent taskqueue to modify. Must be non-null * @param tsk The task to push. Must be non-null * @retval (int)[`thrd_error` | `thrd_nomem`, `thrd_success`] */ int ctqueue_waitpush(ctqueue *ctq, task *tsk) { if(!ctq || !tsk) ERRRET(EINVAL, -1); int retval = 0; __CTQ_INLOCK(ctq, -1, retval = taskqueue_push(ctq->tq, tsk); ); if(retval == 0) cnd_signal(&ctq->cond); return retval; } /** * @brief Pop a task from the concurrent taskqueue * @attention May block for an indefinite amount of time to pop the task * * @param ctq The concurrent taskqueue to pop from. Must be non-null * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error */ task * ctqueue_waitpop(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, NULL); task *retval = NULL; __CTQ_INLOCK(ctq, NULL, while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled) cnd_wait(&ctq->cond, &ctq->mutex); if(ctq->canceled) { mtx_unlock(&ctq->mutex); ERRRET(ECANCELED, NULL); } retval = taskqueue_pop(ctq->tq); ); return retval; } //! Simple consumer for eating and executing tasks from the ctq static int __CTQ_CONSUMER(void *ctq) { if(!ctq) {errno = EINVAL; thrd_exit(-1);} ctqueue *real = (ctqueue *)ctq; for(task *ctask = NULL;;) { ctask = ctqueue_waitpop(real); if(!ctask) break; task_fire(ctask); task_free(ctask); } thrd_exit(1); } // TODO: Make this function return 0 or -1 depending on whether the overall ctq has been canceled or not. Canceling shouldn't // be treated as an error /** * @brief Start the threads allocated to a concurrent taskqueue * @attention Threads will not consume pushed tasks until this function is ran * * @param ctq A concurrent taskqueue to start. Must be non-null * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error */ int ctqueue_start(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, -1); ctq->canceled = 0; int retval = 0; for(int i = 0; i < ctq->talen; i++) if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success) break; if(retval != thrd_success) ctqueue_cancel(ctq); return (retval == thrd_success) ? 0 : -1; } #endif