From 16528ac295215e788cb226f0cc49f11f82919741 Mon Sep 17 00:00:00 2001 From: "@syxhe" Date: Fri, 6 Jun 2025 13:30:34 -0500 Subject: Get threadpool implementation working --- src/Makefile | 6 +- src/main.c | 13 +- src/scanner.c | 2 +- src/shared.c | 127 ++++--------- src/shared.h | 92 ++++------ src/threadpool.c | 550 ++++++++++++++++++++++++++----------------------------- src/threadpool.h | 90 ++++++--- 7 files changed, 411 insertions(+), 469 deletions(-) (limited to 'src') diff --git a/src/Makefile b/src/Makefile index b045807..0ee5897 100644 --- a/src/Makefile +++ b/src/Makefile @@ -6,10 +6,12 @@ SHELL := /usr/bin/env # RELEASE_CFLAGS := -O3 -fipa-pta -fipa-cp -fuse-linker-plugin -flto=auto # RELEASE_LDFLAGS := -fuse-linker-plugin -flto=auto -CFLAGS = -std=c2x -Wall -Wextra -Wpedantic -pedantic-errors -fanalyzer -Wanalyzer-too-complex -ggdb -g3 -O0 $$(pkg-config --cflags libsodium) +CFLAGS = -std=c2x -Wall -Wextra -Wpedantic -pedantic-errors -fanalyzer -Wanalyzer-too-complex -ggdb -g3 -O0 +DEPFLAGS = -MT $@ -MMD -MP -MF $*.d + +CFLAGS += $$(pkg-config --cflags libsodium) LDLIBS += $$(pkg-config --libs-only-l libsodium) LDFLAGS += $$(pkg-config --libs-only-L libsodium) -DEPFLAGS = -MT $@ -MMD -MP -MF $*.d SOURCES := $(wildcard *.c) OBJECTS := $(patsubst %.c,%.o,$(SOURCES)) diff --git a/src/main.c b/src/main.c index 73169d1..1e4f960 100755 --- a/src/main.c +++ b/src/main.c @@ -20,18 +20,7 @@ int testcb(void *arg) { } int main() { - // error(-1, ENOTSUP, "lol"); - - cqueue *cq = cqueue_init(); - // if(!cq) - // abort(); - - cqueue_registerthreads(cq, 10); - cqueue_addtask(cq, task_init(testcb, "this is some data")); - - sleep(3); - - cqueue_free(cq); + error(-1, ENOTSUP, "lol"); return 0; } \ No newline at end of file diff --git a/src/scanner.c b/src/scanner.c index 1c4d2b1..d20c714 100644 --- a/src/scanner.c +++ b/src/scanner.c @@ -1,5 +1,5 @@ -#include "shared.h" #define _GNU_SOURCE +#include "shared.h" #include "ll.h" #include "scanner.h" diff --git a/src/shared.c b/src/shared.c index 02fe9a0..1d5aa7f 100644 --- a/src/shared.c +++ b/src/shared.c @@ -224,108 +224,61 @@ char * xdirname(const char * const path) { } -int cleanup_init(cleanup * const loc, int size, cleanup_callback funcs[], void *args[]) { - if(!loc) - RETURNWERR(EINVAL, -1); - if(size < 1) - RETURNWERR(EINVAL, -1); - if(!funcs) - RETURNWERR(EINVAL, -1); - if(!args) - RETURNWERR(EINVAL, -1); - - loc->funcs = funcs; - loc->args = args; - loc->size = size; - loc->used = 0; - - return 0; -} - -int cleanup_register(cleanup * const loc, cleanup_callback cb, void *arg) { - if(!loc) - RETURNWERR(EINVAL, -1); - if(loc->used >= loc->size) - RETURNWERR(ENOMEM, -1); - if(!cb) - RETURNWERR(EINVAL, -1); +int cleanup_init(cleanup *loc, fcallback callbacks[], void *arguments[], int size) { + if(!loc || !callbacks || !arguments || size <= 0) {errno = EINVAL; return -1;} - loc->funcs[loc->used] = cb; - loc->args[loc->used] = arg; - loc->used++; + loc->callbacks = callbacks; + loc->arguments = arguments; + loc->size = size; + loc->used = 0; - return 0; + return 0; } -int cleanup_cndregister(cleanup * const loc, unsigned char flag, cleanup_callback cb, void *arg) { - if(flag) - return 0; - - if(!loc) - RETURNWERR(EINVAL, -1); - if(loc->used >= loc->size) - RETURNWERR(ENOMEM, -1); - if(!cb) - RETURNWERR(EINVAL, -1); +// registers if flag is NOT set +int cleanup_register(cleanup *loc, fcallback cb, void *arg) { + if(!loc || !cb) {errno = EINVAL; return -1;} + if(loc->used >= loc->size || loc->used < 0) {errno = ENOMEM; return -1;} - loc->funcs[loc->used] = cb; - loc->args[loc->used] = arg; - loc->used++; + loc->callbacks[loc->used] = cb; + loc->arguments[loc->used] = arg; + loc->used++; - return 0; + return 0; } -int cleanup_clear(cleanup * const loc) { - if(!loc) - RETURNWERR(EINVAL, -1); - - loc->used = 0; - return 0; +int cleanup_cndregister(cleanup *loc, fcallback cb, void *arg, unsigned char flag) { + if(flag) + return 0; + return cleanup_register(loc, cb, arg); } -cleanup_callback cleanup_peekf(cleanup * const loc) { - if(!loc) - RETURNWERR(EINVAL, NULL); - if(loc->used == 0) - RETURNWERR(ENODATA, NULL); - - return loc->funcs[loc->used - 1]; +int cleanup_clear(cleanup *loc) { + if(!loc) {errno = EINVAL; return -1;} + + loc->used = 0; + return 0; } -cleanup_callback cleanup_popf(cleanup * const loc) { - cleanup_callback cb = cleanup_peekf(loc); - if(cb == NULL) - RETURNWERR(EINVAL, NULL); - loc->used--; - - return cb; -} +int cleanup_fire(cleanup *loc) { + if(!loc) {errno = EINVAL; return -1;} -void * cleanup_peeka(cleanup * const loc) { - if(!loc) - RETURNWERR(EINVAL, NULL); - if(loc->used == 0) - RETURNWERR(ENODATA, NULL); + for(int i = (loc->used - 1); i >= 0; i--) { + if(loc->callbacks[i] == NULL) { + error(0, EINVAL, "cleanup_fire: refusing to run null callback..."); + continue; + } - return loc->args[loc->used - 1]; -} -void * cleanup_popa(cleanup * const loc) { - void *mem = cleanup_peeka(loc); - if(!mem) - RETURNWERR(EINVAL, NULL); - - loc->used--; + loc->callbacks[i](loc->arguments[i]); + } + cleanup_clear(loc); - return mem; + return 0; } -int cleanup_fire(cleanup * const loc) { - if(!loc) - RETURNWERR(EINVAL, -1); - - for(int i = (loc->used - 1); i >= 0; i--) - loc->funcs[i](loc->args[i]); - loc->used = 0; - - return 0; +// Fires if flag is set +int cleanup_cndfire(cleanup *loc, unsigned char flag) { + if(flag) + return cleanup_fire(loc); + return 0; } \ No newline at end of file diff --git a/src/shared.h b/src/shared.h index 66d1af3..adaad4a 100644 --- a/src/shared.h +++ b/src/shared.h @@ -1,11 +1,9 @@ #ifndef __VXGG_REWRITE___SHARED_H___3880294315821___ -#define __VXGG_REWRITE___SHARED_H___3880294315821___ +#define __VXGG_REWRITE___SHARED_H___3880294315821___ 1 #include #define STATIC_ARRAY_LEN(arr) (sizeof((arr))/sizeof((arr)[0])) -#define FALSE 0 -#define TRUE 1 typedef int (*gcallback)(void*); // Generic callback signature typedef void (*fcallback)(void*); // free()-like callback signature @@ -44,82 +42,58 @@ typedef void (*fcallback)(void*); // free()-like callback signature // `malloc()` with error checking. Calls `exit()` or `abort()` on error, depending on the value of `___VXGG___XALLOC_EXIT_ON_ERROR___` void * xmalloc(size_t size); - // `calloc()` with error checking. Calls `exit()` or `abort()` on error, depending on the value of `___VXGG___XALLOC_EXIT_ON_ERROR___` void * xcalloc(size_t nmemb, size_t size); - // `reallocarray()` with error checking. Calls `exit()` or `abort()` on error, depending on the value of `___VXGG___XALLOC_EXIT_ON_ERROR___` void * xreallocarray(void *ptr, size_t nmemb, size_t size); - // Read the entire contents of a file descriptor into a malloc()'ed buffer int rwbuf(char **str, unsigned long int initsize, int fd); - // Write the entire contents of a buffer into a file descriptor int wwbuf(int fd, const unsigned char *buf, int len); - // `dirname()` reimplementation that returns a malloc()'ed string. According to the `x___` naming scheme, exits/aborts on alloc error. char * xdirname(const char * const path); -// Cleanup callback. Should act like `free()`, in that it doesn't crash if the pointer it's given is null -typedef fcallback cleanup_callback; - -// Cleanup struct. Stores a STATICALLY defined array of callbacks and void* arguments +// A locally defined structure designed for easier function cleanup typedef struct cl { - cleanup_callback *funcs; // Actual type: cleanup_callback funcs[] - void **args; // Actual type: void *args[] - - int size; - int used; + fcallback *callbacks; // Actual Type: fcallback callbacks[] + void * *arguments; // Actual Type: void *arguments[] + int size; + int used; } cleanup; -// Initialize a local cleanup stack. `loc`, `funcs` and `args` need to be locally defined, non allocated arrays, and must be at least `size` elements large -int cleanup_init(cleanup * const loc, int size, cleanup_callback funcs[], void *args[]); - -// Register a cleanup callback for a given cleanup object -int cleanup_register(cleanup * const loc, cleanup_callback cb, void *arg); - -// Register a cleanup callback, if and only if `flag == 0` -int cleanup_cndregister(cleanup * const loc, unsigned char flag, cleanup_callback cb, void *arg); - -// Clear the contents of a cleanup stack -int cleanup_clear(cleanup * const loc); - -// Get the top callback without removing it from the cleanup stack -cleanup_callback cleanup_peekf(cleanup * const loc); - -// Get and remove the top callback from the cleanup stack. Does not return the argument for the given callback -cleanup_callback cleanup_popf(cleanup * const loc); - -// Get the top argument without removing it from the cleanup stack -void * cleanup_peeka(cleanup * const loc); - -// Get and remove the top argument from the cleanup stack. Does not return the callback it was to be fed into -void * cleanup_popa(cleanup * const loc); - -// Fire all the callbacks in the cleanup stack -int cleanup_fire(cleanup * const loc); +int cleanup_init(cleanup *loc, fcallback callbacks[], void *arguments[], int size); +int cleanup_register(cleanup *loc, fcallback cb, void *arg); +int cleanup_cndregister(cleanup *loc, fcallback cb, void *arg, unsigned char flag); +int cleanup_clear(cleanup *loc); +int cleanup_fire(cleanup *loc); +int cleanup_cndfire(cleanup *loc, unsigned char flag); -/* Cleanup environment creator. Automatically defines the variables `__CLEANUP`, `__CLEANUP_FUNCS`, and `__CLEANUP_ARGS` and initializes -// via `cleanup_init()` using these variables. */ #define cleanup_CREATE(size) \ cleanup __CLEANUP; \ -cleanup_callback __CLEANUP_FUNCS[(size)]; \ +fcallback __CLEANUP_FUNCS[(size)]; \ void *__CLEANUP_ARGS[(size)]; \ unsigned char __FLAG = 0; \ -cleanup_init(&__CLEANUP, (size), __CLEANUP_FUNCS, __CLEANUP_ARGS) - -#define cleanup_REGISTER(cb, arg) cleanup_register(&__CLEANUP, (cb), (arg)) -#define cleanup_CNDREGISTER(cb, arg) cleanup_cndregister(&__CLEANUP, __FLAG, (cb), (arg)) -#define cleanup_CLEAR() cleanup_clear(&__CLEANUP) -#define cleanup_PEEKF() cleanup_peekf(&__CLEANUP) -#define cleanup_POPF() cleanup_popf(&__CLEANUP) -#define cleanup_PEEKA() cleanup_peeka(&__CLEANUP) -#define cleanup_POPA() cleanup_popa(&__CLEANUP) -#define cleanup_FIRE() cleanup_fire(&__CLEANUP) -#define cleanup_MARK() (__FLAG = 1) -#define cleanup_UNMARK() (__FLAG = 0) -#define cleanup_ERRORFLAGGED (__FLAG != 0) +cleanup_init(&__CLEANUP, __CLEANUP_FUNCS, __CLEANUP_ARGS, (size)) + +#define cleanup_REGISTER(cb, arg) cleanup_register(&__CLEANUP, (cb), (arg)) +#define cleanup_CNDREGISTER(cb, arg) cleanup_cndregister(&__CLEANUP, (cb), (arg), __FLAG) +#define cleanup_CLEAR() cleanup_clear(&__CLEANUP) +#define cleanup_FIRE() cleanup_fire(&__CLEANUP) +#define cleanup_CNDFIRE() cleanup_cndfire(&__CLEANUP, __FLAG) +#define cleanup_MARK() (__FLAG = 1) +#define cleanup_UNMARK() (__FLAG = 0) +#define cleanup_ERRORFLAGGED (__FLAG != 0) +#define cleanup_CNDEXEC(code) while(!cleanup_ERRORFLAGGED) {code; break;} + + + +// Generic task to be executed by a consumer +typedef struct task task; +// A queue of tasks +typedef struct taskqueue taskqueue; +// A concurrent queue of tasks, basically a threadpool tasks can be dispatched to +typedef struct ctqueue ctqueue; #endif \ No newline at end of file diff --git a/src/threadpool.c b/src/threadpool.c index 6912790..c4d8a5c 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -1,386 +1,362 @@ #include "threadpool.h" -#include "shared.h" -#include "ll.h" - -#include -#include #include #include #include +#include + -// Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) -typedef struct mtxp { - void *data; - mtx_t mtx; -} mtxpair; +task * task_init(gcallback callback, fcallback freecb, void *data) { + if(callback == NULL) {errno = EINVAL; return NULL;} -mtxpair * mtxpair_init(void * const data, int type) { - mtxpair *mtxp = VALLOC(1, sizeof(*mtxp)); - if(!mtxp) + task *tsk = calloc(1, sizeof(*tsk)); + if(!tsk) return NULL; - // Init the mutex - if(mtx_init(&mtxp->mtx, type) == thrd_error) { - free(mtxp); - RETURNWERR(errno, NULL); - } + tsk->callback = callback; + tsk->freecb = freecb; + tsk->data = data; - mtxp->data = data; - return mtxp; + return tsk; } -void mtxpair_free(mtxpair *mp) { - if(!mp) +void task_free(void *tsk) { + task *real = (task *)tsk; + if(!real) return; - - mtx_destroy(&mp->mtx); - free(mp); + + if(real->freecb != NULL) + real->freecb(real->data); + free(real); return; } -int mtxpair_setdata(mtxpair * const mp, void * const data) { - if(!mp) - RETURNWERR(EINVAL, -1); - - mp->data = data; - return 0; +int task_fire(task *tsk) { + if(!tsk) {errno = EINVAL; return -1;} + return tsk->callback(tsk->data); } - -// thrd_create which calls mtx_lock/unlock on `arg` automatically -int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) { - if(!thr) - RETURNWERR(EINVAL, thrd_error); - if(!func) - RETURNWERR(EINVAL, thrd_error); - if(!mtxd) - RETURNWERR(EINVAL, thrd_error); - - if(mtx_lock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} - int retval = thrd_create(thr, func, mtxd->data); - if(mtx_unlock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} - +int task_fired(task *tsk) { + int retval = task_fire(tsk); + if(errno == EINVAL && retval == -1) {return -1;} + task_free(tsk); return retval; } -/* Ok, after doing a little more research, the best way to do this is probaby via a producer/consumer architecture. Spawn a bunch of -// threads waiting on a queue (via semaphore) and when one is notified pop a task of the queue and execute it. In this case, the -// producer would be the filesystem scanner funciton providing new files to encrypt, and the consumers would be threads waiting -// to encrypt them */ - -// Threadpool: - // Array of threads - // Task Queue - // Readiness semaphore / conditional - // Mutex - // Linked List of Tasks - // Task: - // int (*callback)(void*) - // void *arg - -// Consumer: - // Wait for cqueue to pop - // Fire task - // Repeat - -// Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 - -typedef struct task { - task_callback cb; - void *arg; -} task; - -task * task_init(task_callback cb, void *arg) { - if(cb == NULL) - RETURNWERR(EINVAL, NULL); - task *task = VALLOC(1, sizeof(*task)); - if(!task) +tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) { + if(!tsk) {errno = EINVAL; return NULL;} + tqnode *node = calloc(1, sizeof(*node)); + if(!node) return NULL; - task->cb = cb; - task->arg = arg; + node->next = next; + node->prev = prev; + node->task = tsk; - return task; + return node; } -void task_free(void *ts) { - if(!ts) +void tqnode_free(void *tqn) { + tqnode *real = (tqnode *)tqn; + if(!real) return; - free(ts); // Not making any assumptions about the data in the task + task_free(real->task); + free(real); return; } -int task_fire(task *ts) { - if(!ts) - RETURNWERR(EINVAL, -1); - if(ts->cb == NULL) - RETURNWERR(EINVAL, -1); - - return ts->cb(ts->arg); -} -typedef struct cq { - dlinkedlist *taskqueue; - dlinkedlist *rthreads; +taskqueue * taskqueue_init(void) { + taskqueue *tq = calloc(1, sizeof(*tq)); + if(!tq) + return NULL; - mtx_t mtx; - cnd_t cnd; - - unsigned char canceled; -} cqueue; + tq->start = NULL; + tq->end = NULL; + tq->size = 0; + return tq; +} -static void ___ucleanup_mtxd(void *mtx) { - if(!mtx) +void taskqueue_free(void *tq) { + if(!tq) return; - mtx_destroy((mtx_t*)mtx); + for(tqnode *p = ((taskqueue*)tq)->start, *n; p != NULL;) { + n = p->next; + tqnode_free(p); + p = n; + } + free(tq); + return; } -static void ___ucleanup_cndd(void *cnd) { - if(!cnd) - return; +int taskqueue_handlefirst(taskqueue *tq, task *tsk) { + if(!tq || !tsk) {errno = EINVAL; return -1;} + if(tq->size) {return 0;} - cnd_destroy((cnd_t *)cnd); - return; -} + tqnode *first = tqnode_init(NULL, NULL, tsk); + if(!first) + return -1; -cqueue * cqueue_init() { - cleanup_CREATE(10); - - // Create base object - cqueue *cq = VALLOC(1, sizeof(*cq)); - if(!cq) - RETURNWERR(errno, NULL); - cleanup_REGISTER(free, cq); - cq->canceled = 0; - - // Initialize the mutex - if(mtx_init(&cq->mtx, mtx_plain) != thrd_success) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx); - - // Initialize the conditional - if(!cleanup_ERRORFLAGGED) - if(cnd_init(&cq->cnd) != thrd_success) - cleanup_MARK(); - cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd); + tq->start = first; + tq->end = first; + tq->size = 1; - // Create the taskqueue - if(!cleanup_ERRORFLAGGED) - if(!(cq->taskqueue = dlinkedlist_init())) - cleanup_MARK(); - cleanup_CNDREGISTER(dlinkedlist_free, cq->taskqueue); + return 1; +} - // Create the thread list - if(!cleanup_ERRORFLAGGED) - if(!(cq->rthreads = dlinkedlist_init())) - cleanup_MARK(); - cleanup_CNDREGISTER(dlinkedlist_free, cq->rthreads); +int taskqueue_push(taskqueue *tq, task *tsk) { + if(!tq || !tsk) {errno = EINVAL; return -1;} - if(cleanup_ERRORFLAGGED) - cleanup_FIRE(); + int hf; + if((hf = taskqueue_handlefirst(tq, tsk))) + return (hf >= 0) ? 0 : -1; - return cq; + tqnode *newstart = tqnode_init(tq->start, NULL, tsk); + if(!newstart) + return -1; + tq->start->prev = newstart; + tq->start = newstart; + tq->size++; - // Lambdas would make this a million times easier, as I could wrap this whole thing in a while loop then run a bunch of in-line - // callbacks that do these operations and I wouldn't need this badness. That or I could use a goto, but I also hate that idea + return 0; } -void cqueue_cancel(cqueue * const cq) { - if(!cq) - return; +task * taskqueue_pop(taskqueue *tq) { + if(!tq) {errno = EINVAL; return NULL;} + if(tq->size <= 0) {errno = ENODATA; return NULL;} - mtx_lock(&cq->mtx); - - if(cq->canceled) { - mtx_unlock(&cq->mtx); - return; + tqnode *end = tq->end; + task *ret = end->task; + + if(tq->size == 1) { + tq->end = NULL; + tq->start = NULL; + } else { + tq->end = end->prev; + tq->end->next = NULL; } - cq->canceled = 1; - mtx_unlock(&cq->mtx); - cnd_broadcast(&cq->cnd); - - return; + free(end); + tq->size--; + return ret; } -static int ___cqueue_join(void *t) { - if(!t) +int taskqueue_pushfront(taskqueue *tq, task *tsk) { + if(!tq || !tsk) {errno = EINVAL; return -1;} + + int hf; + if((hf = taskqueue_handlefirst(tq, tsk))) + return (hf >= 0) ? 0 : -1; + + tqnode *newend = tqnode_init(NULL, tq->end, tsk); + if(!newend) return -1; + tq->end->next = newend; + tq->end = newend; + tq->size++; - int retval = 0; - thrd_join(*((thrd_t*)t), &retval); - - return retval; + return 0; } -void cqueue_free(void *cq) { - if(!cq) - return; +task * taskqueue_popback(taskqueue *tq) { + if(!tq) {errno = EINVAL; return NULL;} + if(tq->size <= 0) {errno = ENODATA; return NULL;} - cqueue *real = (cqueue *)cq; + tqnode *start = tq->start; + task *ret = start->task; - // Cancel threads and wait for them to exit - cqueue_cancel(real); - dlinkedlist_foreach(real->rthreads, ___cqueue_join); + if(tq->size == 1) { + tq->start = NULL; + tq->end = NULL; + } else { + tq->start = start->next; + tq->start->prev = NULL; + } - // Threads are dead, no need to worry about concurrency anymore - mtx_destroy(&real->mtx); - cnd_destroy(&real->cnd); - dlinkedlist_free(real->rthreads); - dlinkedlist_free(real->taskqueue); + free(start); + tq->size--; + return ret; +} - return; +int taskqueue_size(taskqueue *tq) { + if(!tq) {errno = EINVAL; return -1;} + return tq->size; } -int cqueue_addtask(cqueue * const cq, task * const tsk) { - if(!cq || !tsk) - RETURNWERR(EINVAL, -1); - mtx_lock(&cq->mtx); - - if(cq->canceled) { - mtx_unlock(&cq->mtx); - RETURNWERR(ECANCELED, -1); - } - dlinkedlist_prepend(cq->taskqueue, tsk, task_free); - mtx_unlock(&cq->mtx); - cnd_signal(&cq->cnd); +// Internal helper macro for ctq functions. Acquires a lock via the ctq's mutex, checks to see if the queue has been canceled, then executes "code" as written +#define __CTQ_INLOCK(ctq, retval, code) do {\ + mtx_lock(&(ctq)->mutex); \ + if((ctq)->canceled) { \ + errno = ECANCELED; \ + mtx_unlock(&(ctq)->mutex); \ + return (retval); \ + } \ + \ + code \ + mtx_unlock(&(ctq)->mutex); \ +} while (0) - return 0; +static void ___ucl_mtxdestroy(void *mtx) { + if(!mtx) return; + mtx_destroy((mtx_t *)mtx); + return; } -task * cqueue_waitpop(cqueue * const cq) { - if(!cq) - RETURNWERR(EINVAL, NULL); +static void ___ucl_cnddestroy(void *cond) { + if(cond) return; + cnd_destroy((cnd_t *)cond); + return; +} - task *tsk = NULL; - int index = -1; +ctqueue * ctqueue_init(int nthreads) { + if(nthreads <= 0) {errno = EINVAL; return NULL;} + cleanup_CREATE(6); - mtx_lock(&cq->mtx); - while(dlinkedlist_size(cq->taskqueue) <= 0 && !cq->canceled) - cnd_wait(&cq->cnd, &cq->mtx); + ctqueue *ctq = calloc(1, sizeof(*ctq)); + if(!ctq) + return NULL; + cleanup_REGISTER(free, ctq); - if(cq->canceled) { - mtx_unlock(&cq->mtx); - thrd_exit(-1); - } + ctq->canceled = 0; + ctq->talen = nthreads; - tsk = dlinkedlist_get(cq->taskqueue, (index = dlinkedlist_size(cq->taskqueue) - 1)); - dlinkedlist_remove(cq->taskqueue, index); + cleanup_CNDEXEC( + ctq->tq = taskqueue_init(); + if(!ctq->tq) + cleanup_MARK(); + cleanup_CNDREGISTER(taskqueue_free, ctq->tq); + ); - mtx_unlock(&cq->mtx); + cleanup_CNDEXEC( + if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucl_mtxdestroy, (void*)&ctq->mutex); + ); - return tsk; + cleanup_CNDEXEC( + if(cnd_init(&ctq->cond) != thrd_success) + cleanup_MARK(); + cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond); + ); + + cleanup_CNDEXEC( + ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); + if(!ctq->thrdarr) + cleanup_MARK(); + cleanup_CNDREGISTER(free, ctq->thrdarr); + ) + + cleanup_CNDFIRE(); + if(cleanup_ERRORFLAGGED) + return NULL; + + return ctq; } -static int consumer(void *cq) { - if(!cq) - thrd_exit(-1); +int ctqueue_cancel(ctqueue *ctq) { + if(!ctq) {errno = EINVAL; return -1;} - cqueue *real = (cqueue *)cq; - for(task *ctask;;) { - ctask = cqueue_waitpop(real); - if(!ctask) - task_fire(ctask); - } + __CTQ_INLOCK(ctq, 1, + ctq->canceled = 1; + ); + cnd_broadcast(&ctq->cond); - thrd_exit(0); + return 0; } -int cqueue_registerthreads(cqueue * const cq, int threads) { - if(!cq || threads <= 0) - RETURNWERR(EINVAL, -1); +void ctqueue_free(void *ctq) { + if(!ctq) + return; - mtx_lock(&cq->mtx); - if(cq->canceled) { - mtx_unlock(&cq->mtx); - RETURNWERR(ECANCELED, -1); - } + ctqueue *real = (ctqueue *)ctq; + ctqueue_cancel(real); - thrd_t *newthreads[threads]; - for(int i = 0; i < threads; i++) { - newthreads[i] = VALLOC(1, sizeof(thrd_t)); - if(!newthreads[i]) { - for(int j = 0; j < i; j++) - free(newthreads[j]); + for(int i = 0; i < real->talen; i++) + thrd_join(real->thrdarr[i], NULL); - return -1; - } + // Threads are dead, everything's free game + mtx_destroy(&real->mutex); + cnd_destroy(&real->cond); + taskqueue_free(real->tq); + free(real->thrdarr); + free(real); - dlinkedlist_prepend(cq->rthreads, newthreads[i], free); - thrd_create(newthreads[i], consumer, cq); - } + // TODO: figure out if it's necessary / a good idea to do error handling on these functions - mtx_unlock(&cq->mtx); + return; +} - return 0; +int ctqueue_waitpush(ctqueue *ctq, task *tsk) { + if(!ctq || !tsk) {errno = EINVAL; return -1;} + int retval = 0; + + __CTQ_INLOCK(ctq, -1, + retval = taskqueue_push(ctq->tq, tsk); + ); + if(retval == 0) + cnd_signal(&ctq->cond); + + return retval; } -int cqueue_registerthread(cqueue * const cq) { - return cqueue_registerthreads(cq, 1); +task * ctqueue_waitpop(ctqueue *ctq) { + if(!ctq) {errno = EINVAL; return NULL;} + task *retval = NULL; + + __CTQ_INLOCK(ctq, NULL, + while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled) + cnd_wait(&ctq->cond, &ctq->mutex); + + if(ctq->canceled) { + errno = ECANCELED; + mtx_unlock(&ctq->mutex); + return NULL; + } + + retval = taskqueue_pop(ctq->tq); + ); + + return retval; } -enum __CQUEUE_STAT_OPTIONS { - __CQUEUE_STAT_NOTDEF, - - __CQUEUE_CANCELED, - __CQUEUE_THREADS_NUM, - __CQUEUE_TASKS_NUM, - - __CQUEUE_STAT_TOOBIG, -}; - -int cqueue_getstat(cqueue * const cq, enum __CQUEUE_STAT_OPTIONS opt) { - if(!cq || opt <= __CQUEUE_STAT_NOTDEF || opt >= __CQUEUE_STAT_TOOBIG) - RETURNWERR(EINVAL, -1); - - int retval = -1; - mtx_lock(&cq->mtx); - - switch(opt) { - case __CQUEUE_CANCELED: - retval = cq->canceled; - mtx_unlock(&cq->mtx); - return retval; - break; - - case __CQUEUE_THREADS_NUM: - retval = dlinkedlist_size(cq->rthreads); - mtx_unlock(&cq->mtx); - return retval; - break; - - case __CQUEUE_TASKS_NUM: - retval = dlinkedlist_size(cq->taskqueue); - mtx_unlock(&cq->mtx); - return retval; - break; - - default: - RETURNWERR(EINVAL, -1); - break; +static int __CTQ_CONSUMER(void *ctq) { + if(!ctq) {errno = EINVAL; thrd_exit(-1);} + ctqueue *real = (ctqueue *)ctq; + + for(task *ctask = NULL;;) { + ctask = ctqueue_waitpop(real); + if(!ctask) + break; + + task_fire(ctask); + task_free(ctask); } - // This should absolutely never run - RETURNWERR(ENOTRECOVERABLE, -1); + thrd_exit(1); // non-zero indicates error, -1 indicates invalid argument } -int cqueue_iscanceled(cqueue * const cq) { - return cqueue_getstat(cq, __CQUEUE_CANCELED); -} -int cqueue_numthreads(cqueue * const cq) { - return cqueue_getstat(cq, __CQUEUE_THREADS_NUM); -} -int cqueue_numtasks(cqueue * const cq) { - return cqueue_getstat(cq, __CQUEUE_TASKS_NUM); -} +int ctqueue_start(ctqueue *ctq) { + if(!ctq) {errno = EINVAL; return -1;} + + ctq->canceled = 0; + + int retval = 0; + for(int i = 0; i < ctq->talen; i++) + if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success) + break; + + if(retval != thrd_success) + ctqueue_cancel(ctq); + + return (retval == thrd_success) ? 0 : -1; +} \ No newline at end of file diff --git a/src/threadpool.h b/src/threadpool.h index d7b713c..3048eea 100644 --- a/src/threadpool.h +++ b/src/threadpool.h @@ -1,26 +1,74 @@ -#ifndef __VXGG_REWRITE___THREADPOOL_H___13601325413136___ -#define __VXGG_REWRITE___THREADPOOL_H___13601325413136___ +#ifndef __VXGG_REWRITE___THREADPOOL_H___193271180830131___ +#define __VXGG_REWRITE___THREADPOOL_H___193271180830131___ 1 +#include "shared.h" #include -typedef int (*task_callback)(void*); -typedef struct task task; -typedef struct cq cqueue; -typedef struct tp threadpool; - -task * task_init(task_callback cb, void *arg); -void task_free(void *ts); -int task_fire(task *ts); - -cqueue * cqueue_init(); -void cqueue_cancel(cqueue * const cq); -void cqueue_free(void *cq); -int cqueue_addtask(cqueue * const cq, task * const tsk); -task * cqueue_waitpop(cqueue * const cq); -int cqueue_registerthreads(cqueue * const cq, int threads); -int cqueue_registerthread(cqueue * const cq); -int cqueue_iscanceled(cqueue * const cq); -int cqueue_numthreads(cqueue * const cq); -int cqueue_numtasks(cqueue * const cq); +typedef struct task { + gcallback callback; + fcallback freecb; + void *data; +} task; + +typedef struct tqnode { + struct tqnode *next; + struct tqnode *prev; + task *task; +} tqnode; + +typedef struct taskqueue { + tqnode *start; + tqnode *end; + unsigned int size; +} taskqueue; + +typedef struct ctqueue { + mtx_t mutex; + cnd_t cond; + unsigned char canceled; + + taskqueue *tq; + thrd_t *thrdarr; + int talen; +} ctqueue; + +// Create a new task. Sets `errno` and returns `NULL` on error +task * task_init(gcallback callback, fcallback freecb, void *data); +// Free a task. Passes the `.data` member to specified fcallback as a function parameter. Does not return a value or set `errno` +void task_free(void *tsk); +// Fire a task. Passes the `.data` member to specified gcallback as a function parameter. Returns value of gcallback, or sets `errno` and returns `-1` on error +int task_fire(task *tsk); +// Fire and free a task simultaneously. Calls specified gcallback and fcallback on associated data. Returns value of gcallback, or sets `errno` and returns `-1` on error +int task_fired(task *tsk); + + + +// Create a FIFO queue of task objects. Returns a new taskqueue on success, sets `errno` and returns `NULL` on error +taskqueue * taskqueue_init(void); +// Free a taskqueue. Does not return a value or set `errno` +void taskqueue_free(void *tq); +// Push a task onto the queue. Returns 0 on success, sets `errno` and returns `-1` on error +int taskqueue_push(taskqueue *tq, task *tsk); +// Pop a task from the queue. Returns a task on success, sets `errno` and returns `NULL` on error +task * taskqueue_pop(taskqueue *tq); +// Push a task to the front of the queue (append, task becomes first in line to be popped). Returns 0 on success, sets `errno` and returns `-1` on error +int taskqueue_pushfront(taskqueue *tq, task *tsk); +// Pop a task from the back of the queue (pop the most recently (normally) pushed item). Returns a task on success, sets `errno` and returns `NULL` on error +task * taskqueue_popback(taskqueue *tq); + + + +// Create a concurrent taskqueue with `size` allocated threads +ctqueue * ctqueue_init(int size); +// Cancel a currently running ctq +int ctqueue_cancel(ctqueue *ctq); +// Free a ctq (cancels any remaining operations) +void ctqueue_free(void *ctq); +// Push a new task to the queue, waiting via mutex to do so +int ctqueue_waitpush(ctqueue *ctq, task *tsk); +// Pop a task from the queue, waiting via mutex to do so +task * ctqueue_waitpop(ctqueue *ctq); +// Spawn the allocated threads for a ctq +int ctqueue_start(ctqueue *ctq); #endif \ No newline at end of file -- cgit v1.2.3