diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile | 6 | ||||
| -rwxr-xr-x | src/main.c | 13 | ||||
| -rw-r--r-- | src/scanner.c | 2 | ||||
| -rw-r--r-- | src/shared.c | 127 | ||||
| -rw-r--r-- | src/shared.h | 92 | ||||
| -rw-r--r-- | src/threadpool.c | 550 | ||||
| -rw-r--r-- | src/threadpool.h | 90 |
7 files changed, 411 insertions, 469 deletions
diff --git a/src/Makefile b/src/Makefile index b045807..0ee5897 100644 --- a/src/Makefile +++ b/src/Makefile | |||
| @@ -6,10 +6,12 @@ SHELL := /usr/bin/env | |||
| 6 | # RELEASE_CFLAGS := -O3 -fipa-pta -fipa-cp -fuse-linker-plugin -flto=auto | 6 | # RELEASE_CFLAGS := -O3 -fipa-pta -fipa-cp -fuse-linker-plugin -flto=auto |
| 7 | # RELEASE_LDFLAGS := -fuse-linker-plugin -flto=auto | 7 | # RELEASE_LDFLAGS := -fuse-linker-plugin -flto=auto |
| 8 | 8 | ||
| 9 | CFLAGS = -std=c2x -Wall -Wextra -Wpedantic -pedantic-errors -fanalyzer -Wanalyzer-too-complex -ggdb -g3 -O0 $$(pkg-config --cflags libsodium) | 9 | CFLAGS = -std=c2x -Wall -Wextra -Wpedantic -pedantic-errors -fanalyzer -Wanalyzer-too-complex -ggdb -g3 -O0 |
| 10 | DEPFLAGS = -MT $@ -MMD -MP -MF $*.d | ||
| 11 | |||
| 12 | CFLAGS += $$(pkg-config --cflags libsodium) | ||
| 10 | LDLIBS += $$(pkg-config --libs-only-l libsodium) | 13 | LDLIBS += $$(pkg-config --libs-only-l libsodium) |
| 11 | LDFLAGS += $$(pkg-config --libs-only-L libsodium) | 14 | LDFLAGS += $$(pkg-config --libs-only-L libsodium) |
| 12 | DEPFLAGS = -MT $@ -MMD -MP -MF $*.d | ||
| 13 | 15 | ||
| 14 | SOURCES := $(wildcard *.c) | 16 | SOURCES := $(wildcard *.c) |
| 15 | OBJECTS := $(patsubst %.c,%.o,$(SOURCES)) | 17 | OBJECTS := $(patsubst %.c,%.o,$(SOURCES)) |
| @@ -20,18 +20,7 @@ int testcb(void *arg) { | |||
| 20 | } | 20 | } |
| 21 | 21 | ||
| 22 | int main() { | 22 | int main() { |
| 23 | // error(-1, ENOTSUP, "lol"); | 23 | error(-1, ENOTSUP, "lol"); |
| 24 | |||
| 25 | cqueue *cq = cqueue_init(); | ||
| 26 | // if(!cq) | ||
| 27 | // abort(); | ||
| 28 | |||
| 29 | cqueue_registerthreads(cq, 10); | ||
| 30 | cqueue_addtask(cq, task_init(testcb, "this is some data")); | ||
| 31 | |||
| 32 | sleep(3); | ||
| 33 | |||
| 34 | cqueue_free(cq); | ||
| 35 | 24 | ||
| 36 | return 0; | 25 | return 0; |
| 37 | } \ No newline at end of file | 26 | } \ No newline at end of file |
diff --git a/src/scanner.c b/src/scanner.c index 1c4d2b1..d20c714 100644 --- a/src/scanner.c +++ b/src/scanner.c | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | #include "shared.h" | ||
| 2 | #define _GNU_SOURCE | 1 | #define _GNU_SOURCE |
| 2 | #include "shared.h" | ||
| 3 | 3 | ||
| 4 | #include "ll.h" | 4 | #include "ll.h" |
| 5 | #include "scanner.h" | 5 | #include "scanner.h" |
diff --git a/src/shared.c b/src/shared.c index 02fe9a0..1d5aa7f 100644 --- a/src/shared.c +++ b/src/shared.c | |||
| @@ -224,108 +224,61 @@ char * xdirname(const char * const path) { | |||
| 224 | } | 224 | } |
| 225 | 225 | ||
| 226 | 226 | ||
| 227 | int cleanup_init(cleanup * const loc, int size, cleanup_callback funcs[], void *args[]) { | 227 | int cleanup_init(cleanup *loc, fcallback callbacks[], void *arguments[], int size) { |
| 228 | if(!loc) | 228 | if(!loc || !callbacks || !arguments || size <= 0) {errno = EINVAL; return -1;} |
| 229 | RETURNWERR(EINVAL, -1); | ||
| 230 | if(size < 1) | ||
| 231 | RETURNWERR(EINVAL, -1); | ||
| 232 | if(!funcs) | ||
| 233 | RETURNWERR(EINVAL, -1); | ||
| 234 | if(!args) | ||
| 235 | RETURNWERR(EINVAL, -1); | ||
| 236 | |||
| 237 | loc->funcs = funcs; | ||
| 238 | loc->args = args; | ||
| 239 | loc->size = size; | ||
| 240 | loc->used = 0; | ||
| 241 | |||
| 242 | return 0; | ||
| 243 | } | ||
| 244 | |||
| 245 | int cleanup_register(cleanup * const loc, cleanup_callback cb, void *arg) { | ||
| 246 | if(!loc) | ||
| 247 | RETURNWERR(EINVAL, -1); | ||
| 248 | if(loc->used >= loc->size) | ||
| 249 | RETURNWERR(ENOMEM, -1); | ||
| 250 | if(!cb) | ||
| 251 | RETURNWERR(EINVAL, -1); | ||
| 252 | 229 | ||
| 253 | loc->funcs[loc->used] = cb; | 230 | loc->callbacks = callbacks; |
| 254 | loc->args[loc->used] = arg; | 231 | loc->arguments = arguments; |
| 255 | loc->used++; | 232 | loc->size = size; |
| 233 | loc->used = 0; | ||
| 256 | 234 | ||
| 257 | return 0; | 235 | return 0; |
| 258 | } | 236 | } |
| 259 | 237 | ||
| 260 | int cleanup_cndregister(cleanup * const loc, unsigned char flag, cleanup_callback cb, void *arg) { | 238 | // registers if flag is NOT set |
| 261 | if(flag) | 239 | int cleanup_register(cleanup *loc, fcallback cb, void *arg) { |
| 262 | return 0; | 240 | if(!loc || !cb) {errno = EINVAL; return -1;} |
| 263 | 241 | if(loc->used >= loc->size || loc->used < 0) {errno = ENOMEM; return -1;} | |
| 264 | if(!loc) | ||
| 265 | RETURNWERR(EINVAL, -1); | ||
| 266 | if(loc->used >= loc->size) | ||
| 267 | RETURNWERR(ENOMEM, -1); | ||
| 268 | if(!cb) | ||
| 269 | RETURNWERR(EINVAL, -1); | ||
| 270 | 242 | ||
| 271 | loc->funcs[loc->used] = cb; | 243 | loc->callbacks[loc->used] = cb; |
| 272 | loc->args[loc->used] = arg; | 244 | loc->arguments[loc->used] = arg; |
| 273 | loc->used++; | 245 | loc->used++; |
| 274 | 246 | ||
| 275 | return 0; | 247 | return 0; |
| 276 | } | 248 | } |
| 277 | 249 | ||
| 278 | int cleanup_clear(cleanup * const loc) { | 250 | int cleanup_cndregister(cleanup *loc, fcallback cb, void *arg, unsigned char flag) { |
| 279 | if(!loc) | 251 | if(flag) |
| 280 | RETURNWERR(EINVAL, -1); | 252 | return 0; |
| 281 | 253 | return cleanup_register(loc, cb, arg); | |
| 282 | loc->used = 0; | ||
| 283 | return 0; | ||
| 284 | } | 254 | } |
| 285 | 255 | ||
| 286 | cleanup_callback cleanup_peekf(cleanup * const loc) { | 256 | int cleanup_clear(cleanup *loc) { |
| 287 | if(!loc) | 257 | if(!loc) {errno = EINVAL; return -1;} |
| 288 | RETURNWERR(EINVAL, NULL); | 258 | |
| 289 | if(loc->used == 0) | 259 | loc->used = 0; |
| 290 | RETURNWERR(ENODATA, NULL); | 260 | return 0; |
| 291 | |||
| 292 | return loc->funcs[loc->used - 1]; | ||
| 293 | } | 261 | } |
| 294 | cleanup_callback cleanup_popf(cleanup * const loc) { | ||
| 295 | cleanup_callback cb = cleanup_peekf(loc); | ||
| 296 | if(cb == NULL) | ||
| 297 | RETURNWERR(EINVAL, NULL); | ||
| 298 | 262 | ||
| 299 | loc->used--; | 263 | int cleanup_fire(cleanup *loc) { |
| 300 | 264 | if(!loc) {errno = EINVAL; return -1;} | |
| 301 | return cb; | ||
| 302 | } | ||
| 303 | 265 | ||
| 304 | void * cleanup_peeka(cleanup * const loc) { | 266 | for(int i = (loc->used - 1); i >= 0; i--) { |
| 305 | if(!loc) | 267 | if(loc->callbacks[i] == NULL) { |
| 306 | RETURNWERR(EINVAL, NULL); | 268 | error(0, EINVAL, "cleanup_fire: refusing to run null callback..."); |
| 307 | if(loc->used == 0) | 269 | continue; |
| 308 | RETURNWERR(ENODATA, NULL); | 270 | } |
| 309 | 271 | ||
| 310 | return loc->args[loc->used - 1]; | 272 | loc->callbacks[i](loc->arguments[i]); |
| 311 | } | 273 | } |
| 312 | void * cleanup_popa(cleanup * const loc) { | 274 | cleanup_clear(loc); |
| 313 | void *mem = cleanup_peeka(loc); | ||
| 314 | if(!mem) | ||
| 315 | RETURNWERR(EINVAL, NULL); | ||
| 316 | |||
| 317 | loc->used--; | ||
| 318 | 275 | ||
| 319 | return mem; | 276 | return 0; |
| 320 | } | 277 | } |
| 321 | 278 | ||
| 322 | int cleanup_fire(cleanup * const loc) { | 279 | // Fires if flag is set |
| 323 | if(!loc) | 280 | int cleanup_cndfire(cleanup *loc, unsigned char flag) { |
| 324 | RETURNWERR(EINVAL, -1); | 281 | if(flag) |
| 325 | 282 | return cleanup_fire(loc); | |
| 326 | for(int i = (loc->used - 1); i >= 0; i--) | 283 | return 0; |
| 327 | loc->funcs[i](loc->args[i]); | ||
| 328 | loc->used = 0; | ||
| 329 | |||
| 330 | return 0; | ||
| 331 | } \ No newline at end of file | 284 | } \ No newline at end of file |
diff --git a/src/shared.h b/src/shared.h index 66d1af3..adaad4a 100644 --- a/src/shared.h +++ b/src/shared.h | |||
| @@ -1,11 +1,9 @@ | |||
| 1 | #ifndef __VXGG_REWRITE___SHARED_H___3880294315821___ | 1 | #ifndef __VXGG_REWRITE___SHARED_H___3880294315821___ |
| 2 | #define __VXGG_REWRITE___SHARED_H___3880294315821___ | 2 | #define __VXGG_REWRITE___SHARED_H___3880294315821___ 1 |
| 3 | 3 | ||
| 4 | #include <stddef.h> | 4 | #include <stddef.h> |
| 5 | 5 | ||
| 6 | #define STATIC_ARRAY_LEN(arr) (sizeof((arr))/sizeof((arr)[0])) | 6 | #define STATIC_ARRAY_LEN(arr) (sizeof((arr))/sizeof((arr)[0])) |
| 7 | #define FALSE 0 | ||
| 8 | #define TRUE 1 | ||
| 9 | 7 | ||
| 10 | typedef int (*gcallback)(void*); // Generic callback signature | 8 | typedef int (*gcallback)(void*); // Generic callback signature |
| 11 | typedef void (*fcallback)(void*); // free()-like callback signature | 9 | typedef void (*fcallback)(void*); // free()-like callback signature |
| @@ -44,82 +42,58 @@ typedef void (*fcallback)(void*); // free()-like callback signature | |||
| 44 | 42 | ||
| 45 | // `malloc()` with error checking. Calls `exit()` or `abort()` on error, depending on the value of `___VXGG___XALLOC_EXIT_ON_ERROR___` | 43 | // `malloc()` with error checking. Calls `exit()` or `abort()` on error, depending on the value of `___VXGG___XALLOC_EXIT_ON_ERROR___` |
| 46 | void * xmalloc(size_t size); | 44 | void * xmalloc(size_t size); |
| 47 | |||
| 48 | // `calloc()` with error checking. Calls `exit()` or `abort()` on error, depending on the value of `___VXGG___XALLOC_EXIT_ON_ERROR___` | 45 | // `calloc()` with error checking. Calls `exit()` or `abort()` on error, depending on the value of `___VXGG___XALLOC_EXIT_ON_ERROR___` |
| 49 | void * xcalloc(size_t nmemb, size_t size); | 46 | void * xcalloc(size_t nmemb, size_t size); |
| 50 | |||
| 51 | // `reallocarray()` with error checking. Calls `exit()` or `abort()` on error, depending on the value of `___VXGG___XALLOC_EXIT_ON_ERROR___` | 47 | // `reallocarray()` with error checking. Calls `exit()` or `abort()` on error, depending on the value of `___VXGG___XALLOC_EXIT_ON_ERROR___` |
| 52 | void * xreallocarray(void *ptr, size_t nmemb, size_t size); | 48 | void * xreallocarray(void *ptr, size_t nmemb, size_t size); |
| 53 | |||
| 54 | // Read the entire contents of a file descriptor into a malloc()'ed buffer | 49 | // Read the entire contents of a file descriptor into a malloc()'ed buffer |
| 55 | int rwbuf(char **str, unsigned long int initsize, int fd); | 50 | int rwbuf(char **str, unsigned long int initsize, int fd); |
| 56 | |||
| 57 | // Write the entire contents of a buffer into a file descriptor | 51 | // Write the entire contents of a buffer into a file descriptor |
| 58 | int wwbuf(int fd, const unsigned char *buf, int len); | 52 | int wwbuf(int fd, const unsigned char *buf, int len); |
| 59 | |||
| 60 | // `dirname()` reimplementation that returns a malloc()'ed string. According to the `x___` naming scheme, exits/aborts on alloc error. | 53 | // `dirname()` reimplementation that returns a malloc()'ed string. According to the `x___` naming scheme, exits/aborts on alloc error. |
| 61 | char * xdirname(const char * const path); | 54 | char * xdirname(const char * const path); |
| 62 | 55 | ||
| 63 | 56 | ||
| 64 | 57 | ||
| 65 | // Cleanup callback. Should act like `free()`, in that it doesn't crash if the pointer it's given is null | 58 | // A locally defined structure designed for easier function cleanup |
| 66 | typedef fcallback cleanup_callback; | ||
| 67 | |||
| 68 | // Cleanup struct. Stores a STATICALLY defined array of callbacks and void* arguments | ||
| 69 | typedef struct cl { | 59 | typedef struct cl { |
| 70 | cleanup_callback *funcs; // Actual type: cleanup_callback funcs[] | 60 | fcallback *callbacks; // Actual Type: fcallback callbacks[] |
| 71 | void **args; // Actual type: void *args[] | 61 | void * *arguments; // Actual Type: void *arguments[] |
| 72 | 62 | int size; | |
| 73 | int size; | 63 | int used; |
| 74 | int used; | ||
| 75 | } cleanup; | 64 | } cleanup; |
| 76 | 65 | ||
| 77 | // Initialize a local cleanup stack. `loc`, `funcs` and `args` need to be locally defined, non allocated arrays, and must be at least `size` elements large | 66 | int cleanup_init(cleanup *loc, fcallback callbacks[], void *arguments[], int size); |
| 78 | int cleanup_init(cleanup * const loc, int size, cleanup_callback funcs[], void *args[]); | 67 | int cleanup_register(cleanup *loc, fcallback cb, void *arg); |
| 79 | 68 | int cleanup_cndregister(cleanup *loc, fcallback cb, void *arg, unsigned char flag); | |
| 80 | // Register a cleanup callback for a given cleanup object | 69 | int cleanup_clear(cleanup *loc); |
| 81 | int cleanup_register(cleanup * const loc, cleanup_callback cb, void *arg); | 70 | int cleanup_fire(cleanup *loc); |
| 82 | 71 | int cleanup_cndfire(cleanup *loc, unsigned char flag); | |
| 83 | // Register a cleanup callback, if and only if `flag == 0` | ||
| 84 | int cleanup_cndregister(cleanup * const loc, unsigned char flag, cleanup_callback cb, void *arg); | ||
| 85 | |||
| 86 | // Clear the contents of a cleanup stack | ||
| 87 | int cleanup_clear(cleanup * const loc); | ||
| 88 | |||
| 89 | // Get the top callback without removing it from the cleanup stack | ||
| 90 | cleanup_callback cleanup_peekf(cleanup * const loc); | ||
| 91 | |||
| 92 | // Get and remove the top callback from the cleanup stack. Does not return the argument for the given callback | ||
| 93 | cleanup_callback cleanup_popf(cleanup * const loc); | ||
| 94 | |||
| 95 | // Get the top argument without removing it from the cleanup stack | ||
| 96 | void * cleanup_peeka(cleanup * const loc); | ||
| 97 | |||
| 98 | // Get and remove the top argument from the cleanup stack. Does not return the callback it was to be fed into | ||
| 99 | void * cleanup_popa(cleanup * const loc); | ||
| 100 | |||
| 101 | // Fire all the callbacks in the cleanup stack | ||
| 102 | int cleanup_fire(cleanup * const loc); | ||
| 103 | 72 | ||
| 104 | /* Cleanup environment creator. Automatically defines the variables `__CLEANUP`, `__CLEANUP_FUNCS`, and `__CLEANUP_ARGS` and initializes | ||
| 105 | // via `cleanup_init()` using these variables. */ | ||
| 106 | #define cleanup_CREATE(size) \ | 73 | #define cleanup_CREATE(size) \ |
| 107 | cleanup __CLEANUP; \ | 74 | cleanup __CLEANUP; \ |
| 108 | cleanup_callback __CLEANUP_FUNCS[(size)]; \ | 75 | fcallback __CLEANUP_FUNCS[(size)]; \ |
| 109 | void *__CLEANUP_ARGS[(size)]; \ | 76 | void *__CLEANUP_ARGS[(size)]; \ |
| 110 | unsigned char __FLAG = 0; \ | 77 | unsigned char __FLAG = 0; \ |
| 111 | cleanup_init(&__CLEANUP, (size), __CLEANUP_FUNCS, __CLEANUP_ARGS) | 78 | cleanup_init(&__CLEANUP, __CLEANUP_FUNCS, __CLEANUP_ARGS, (size)) |
| 112 | 79 | ||
| 113 | #define cleanup_REGISTER(cb, arg) cleanup_register(&__CLEANUP, (cb), (arg)) | 80 | #define cleanup_REGISTER(cb, arg) cleanup_register(&__CLEANUP, (cb), (arg)) |
| 114 | #define cleanup_CNDREGISTER(cb, arg) cleanup_cndregister(&__CLEANUP, __FLAG, (cb), (arg)) | 81 | #define cleanup_CNDREGISTER(cb, arg) cleanup_cndregister(&__CLEANUP, (cb), (arg), __FLAG) |
| 115 | #define cleanup_CLEAR() cleanup_clear(&__CLEANUP) | 82 | #define cleanup_CLEAR() cleanup_clear(&__CLEANUP) |
| 116 | #define cleanup_PEEKF() cleanup_peekf(&__CLEANUP) | 83 | #define cleanup_FIRE() cleanup_fire(&__CLEANUP) |
| 117 | #define cleanup_POPF() cleanup_popf(&__CLEANUP) | 84 | #define cleanup_CNDFIRE() cleanup_cndfire(&__CLEANUP, __FLAG) |
| 118 | #define cleanup_PEEKA() cleanup_peeka(&__CLEANUP) | 85 | #define cleanup_MARK() (__FLAG = 1) |
| 119 | #define cleanup_POPA() cleanup_popa(&__CLEANUP) | 86 | #define cleanup_UNMARK() (__FLAG = 0) |
| 120 | #define cleanup_FIRE() cleanup_fire(&__CLEANUP) | 87 | #define cleanup_ERRORFLAGGED (__FLAG != 0) |
| 121 | #define cleanup_MARK() (__FLAG = 1) | 88 | #define cleanup_CNDEXEC(code) while(!cleanup_ERRORFLAGGED) {code; break;} |
| 122 | #define cleanup_UNMARK() (__FLAG = 0) | 89 | |
| 123 | #define cleanup_ERRORFLAGGED (__FLAG != 0) | 90 | |
| 91 | |||
| 92 | // Generic task to be executed by a consumer | ||
| 93 | typedef struct task task; | ||
| 94 | // A queue of tasks | ||
| 95 | typedef struct taskqueue taskqueue; | ||
| 96 | // A concurrent queue of tasks, basically a threadpool tasks can be dispatched to | ||
| 97 | typedef struct ctqueue ctqueue; | ||
| 124 | 98 | ||
| 125 | #endif \ No newline at end of file | 99 | #endif \ No newline at end of file |
diff --git a/src/threadpool.c b/src/threadpool.c index 6912790..c4d8a5c 100644 --- a/src/threadpool.c +++ b/src/threadpool.c | |||
| @@ -1,386 +1,362 @@ | |||
| 1 | #include "threadpool.h" | 1 | #include "threadpool.h" |
| 2 | #include "shared.h" | ||
| 3 | 2 | ||
| 4 | #include "ll.h" | ||
| 5 | |||
| 6 | #include <asm-generic/errno-base.h> | ||
| 7 | #include <asm-generic/errno.h> | ||
| 8 | #include <threads.h> | 3 | #include <threads.h> |
| 9 | #include <stdlib.h> | 4 | #include <stdlib.h> |
| 10 | #include <errno.h> | 5 | #include <errno.h> |
| 6 | #include <error.h> | ||
| 7 | |||
| 11 | 8 | ||
| 12 | // Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) | 9 | task * task_init(gcallback callback, fcallback freecb, void *data) { |
| 13 | typedef struct mtxp { | 10 | if(callback == NULL) {errno = EINVAL; return NULL;} |
| 14 | void *data; | ||
| 15 | mtx_t mtx; | ||
| 16 | } mtxpair; | ||
| 17 | 11 | ||
| 18 | mtxpair * mtxpair_init(void * const data, int type) { | 12 | task *tsk = calloc(1, sizeof(*tsk)); |
| 19 | mtxpair *mtxp = VALLOC(1, sizeof(*mtxp)); | 13 | if(!tsk) |
| 20 | if(!mtxp) | ||
| 21 | return NULL; | 14 | return NULL; |
| 22 | 15 | ||
| 23 | // Init the mutex | 16 | tsk->callback = callback; |
| 24 | if(mtx_init(&mtxp->mtx, type) == thrd_error) { | 17 | tsk->freecb = freecb; |
| 25 | free(mtxp); | 18 | tsk->data = data; |
| 26 | RETURNWERR(errno, NULL); | ||
| 27 | } | ||
| 28 | 19 | ||
| 29 | mtxp->data = data; | 20 | return tsk; |
| 30 | return mtxp; | ||
| 31 | } | 21 | } |
| 32 | 22 | ||
| 33 | void mtxpair_free(mtxpair *mp) { | 23 | void task_free(void *tsk) { |
| 34 | if(!mp) | 24 | task *real = (task *)tsk; |
| 25 | if(!real) | ||
| 35 | return; | 26 | return; |
| 36 | 27 | ||
| 37 | mtx_destroy(&mp->mtx); | 28 | if(real->freecb != NULL) |
| 38 | free(mp); | 29 | real->freecb(real->data); |
| 30 | free(real); | ||
| 39 | 31 | ||
| 40 | return; | 32 | return; |
| 41 | } | 33 | } |
| 42 | 34 | ||
| 43 | int mtxpair_setdata(mtxpair * const mp, void * const data) { | 35 | int task_fire(task *tsk) { |
| 44 | if(!mp) | 36 | if(!tsk) {errno = EINVAL; return -1;} |
| 45 | RETURNWERR(EINVAL, -1); | 37 | return tsk->callback(tsk->data); |
| 46 | |||
| 47 | mp->data = data; | ||
| 48 | return 0; | ||
| 49 | } | 38 | } |
| 50 | 39 | ||
| 51 | 40 | int task_fired(task *tsk) { | |
| 52 | // thrd_create which calls mtx_lock/unlock on `arg` automatically | 41 | int retval = task_fire(tsk); |
| 53 | int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) { | 42 | if(errno == EINVAL && retval == -1) {return -1;} |
| 54 | if(!thr) | 43 | task_free(tsk); |
| 55 | RETURNWERR(EINVAL, thrd_error); | ||
| 56 | if(!func) | ||
| 57 | RETURNWERR(EINVAL, thrd_error); | ||
| 58 | if(!mtxd) | ||
| 59 | RETURNWERR(EINVAL, thrd_error); | ||
| 60 | |||
| 61 | if(mtx_lock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} | ||
| 62 | int retval = thrd_create(thr, func, mtxd->data); | ||
| 63 | if(mtx_unlock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} | ||
| 64 | |||
| 65 | return retval; | 44 | return retval; |
| 66 | } | 45 | } |
| 67 | 46 | ||
| 68 | 47 | ||
| 69 | /* Ok, after doing a little more research, the best way to do this is probaby via a producer/consumer architecture. Spawn a bunch of | 48 | tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) { |
| 70 | // threads waiting on a queue (via semaphore) and when one is notified pop a task of the queue and execute it. In this case, the | 49 | if(!tsk) {errno = EINVAL; return NULL;} |
| 71 | // producer would be the filesystem scanner funciton providing new files to encrypt, and the consumers would be threads waiting | 50 | tqnode *node = calloc(1, sizeof(*node)); |
| 72 | // to encrypt them */ | 51 | if(!node) |
| 73 | |||
| 74 | // Threadpool: | ||
| 75 | // Array of threads | ||
| 76 | // Task Queue | ||
| 77 | // Readiness semaphore / conditional | ||
| 78 | // Mutex | ||
| 79 | // Linked List of Tasks | ||
| 80 | // Task: | ||
| 81 | // int (*callback)(void*) | ||
| 82 | // void *arg | ||
| 83 | |||
| 84 | // Consumer: | ||
| 85 | // Wait for cqueue to pop | ||
| 86 | // Fire task | ||
| 87 | // Repeat | ||
| 88 | |||
| 89 | // Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 | ||
| 90 | |||
| 91 | typedef struct task { | ||
| 92 | task_callback cb; | ||
| 93 | void *arg; | ||
| 94 | } task; | ||
| 95 | |||
| 96 | task * task_init(task_callback cb, void *arg) { | ||
| 97 | if(cb == NULL) | ||
| 98 | RETURNWERR(EINVAL, NULL); | ||
| 99 | task *task = VALLOC(1, sizeof(*task)); | ||
| 100 | if(!task) | ||
| 101 | return NULL; | 52 | return NULL; |
| 102 | 53 | ||
| 103 | task->cb = cb; | 54 | node->next = next; |
| 104 | task->arg = arg; | 55 | node->prev = prev; |
| 56 | node->task = tsk; | ||
| 105 | 57 | ||
| 106 | return task; | 58 | return node; |
| 107 | } | 59 | } |
| 108 | 60 | ||
| 109 | void task_free(void *ts) { | 61 | void tqnode_free(void *tqn) { |
| 110 | if(!ts) | 62 | tqnode *real = (tqnode *)tqn; |
| 63 | if(!real) | ||
| 111 | return; | 64 | return; |
| 112 | 65 | ||
| 113 | free(ts); // Not making any assumptions about the data in the task | 66 | task_free(real->task); |
| 67 | free(real); | ||
| 114 | return; | 68 | return; |
| 115 | } | 69 | } |
| 116 | 70 | ||
| 117 | int task_fire(task *ts) { | ||
| 118 | if(!ts) | ||
| 119 | RETURNWERR(EINVAL, -1); | ||
| 120 | if(ts->cb == NULL) | ||
| 121 | RETURNWERR(EINVAL, -1); | ||
| 122 | |||
| 123 | return ts->cb(ts->arg); | ||
| 124 | } | ||
| 125 | 71 | ||
| 126 | 72 | ||
| 127 | 73 | ||
| 128 | typedef struct cq { | 74 | taskqueue * taskqueue_init(void) { |
| 129 | dlinkedlist *taskqueue; | 75 | taskqueue *tq = calloc(1, sizeof(*tq)); |
| 130 | dlinkedlist *rthreads; | 76 | if(!tq) |
| 77 | return NULL; | ||
| 131 | 78 | ||
| 132 | mtx_t mtx; | 79 | tq->start = NULL; |
| 133 | cnd_t cnd; | 80 | tq->end = NULL; |
| 134 | 81 | tq->size = 0; | |
| 135 | unsigned char canceled; | ||
| 136 | } cqueue; | ||
| 137 | 82 | ||
| 83 | return tq; | ||
| 84 | } | ||
| 138 | 85 | ||
| 139 | static void ___ucleanup_mtxd(void *mtx) { | 86 | void taskqueue_free(void *tq) { |
| 140 | if(!mtx) | 87 | if(!tq) |
| 141 | return; | 88 | return; |
| 142 | 89 | ||
| 143 | mtx_destroy((mtx_t*)mtx); | 90 | for(tqnode *p = ((taskqueue*)tq)->start, *n; p != NULL;) { |
| 91 | n = p->next; | ||
| 92 | tqnode_free(p); | ||
| 93 | p = n; | ||
| 94 | } | ||
| 95 | free(tq); | ||
| 96 | |||
| 144 | return; | 97 | return; |
| 145 | } | 98 | } |
| 146 | 99 | ||
| 147 | static void ___ucleanup_cndd(void *cnd) { | 100 | int taskqueue_handlefirst(taskqueue *tq, task *tsk) { |
| 148 | if(!cnd) | 101 | if(!tq || !tsk) {errno = EINVAL; return -1;} |
| 149 | return; | 102 | if(tq->size) {return 0;} |
| 150 | 103 | ||
| 151 | cnd_destroy((cnd_t *)cnd); | 104 | tqnode *first = tqnode_init(NULL, NULL, tsk); |
| 152 | return; | 105 | if(!first) |
| 153 | } | 106 | return -1; |
| 154 | 107 | ||
| 155 | cqueue * cqueue_init() { | 108 | tq->start = first; |
| 156 | cleanup_CREATE(10); | 109 | tq->end = first; |
| 157 | 110 | tq->size = 1; | |
| 158 | // Create base object | ||
| 159 | cqueue *cq = VALLOC(1, sizeof(*cq)); | ||
| 160 | if(!cq) | ||
| 161 | RETURNWERR(errno, NULL); | ||
| 162 | cleanup_REGISTER(free, cq); | ||
| 163 | cq->canceled = 0; | ||
| 164 | |||
| 165 | // Initialize the mutex | ||
| 166 | if(mtx_init(&cq->mtx, mtx_plain) != thrd_success) | ||
| 167 | cleanup_MARK(); | ||
| 168 | cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx); | ||
| 169 | |||
| 170 | // Initialize the conditional | ||
| 171 | if(!cleanup_ERRORFLAGGED) | ||
| 172 | if(cnd_init(&cq->cnd) != thrd_success) | ||
| 173 | cleanup_MARK(); | ||
| 174 | cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd); | ||
| 175 | 111 | ||
| 176 | // Create the taskqueue | 112 | return 1; |
| 177 | if(!cleanup_ERRORFLAGGED) | 113 | } |
| 178 | if(!(cq->taskqueue = dlinkedlist_init())) | ||
| 179 | cleanup_MARK(); | ||
| 180 | cleanup_CNDREGISTER(dlinkedlist_free, cq->taskqueue); | ||
| 181 | 114 | ||
| 182 | // Create the thread list | 115 | int taskqueue_push(taskqueue *tq, task *tsk) { |
| 183 | if(!cleanup_ERRORFLAGGED) | 116 | if(!tq || !tsk) {errno = EINVAL; return -1;} |
| 184 | if(!(cq->rthreads = dlinkedlist_init())) | ||
| 185 | cleanup_MARK(); | ||
| 186 | cleanup_CNDREGISTER(dlinkedlist_free, cq->rthreads); | ||
| 187 | 117 | ||
| 188 | if(cleanup_ERRORFLAGGED) | 118 | int hf; |
| 189 | cleanup_FIRE(); | 119 | if((hf = taskqueue_handlefirst(tq, tsk))) |
| 120 | return (hf >= 0) ? 0 : -1; | ||
| 190 | 121 | ||
| 191 | return cq; | 122 | tqnode *newstart = tqnode_init(tq->start, NULL, tsk); |
| 123 | if(!newstart) | ||
| 124 | return -1; | ||
| 125 | tq->start->prev = newstart; | ||
| 126 | tq->start = newstart; | ||
| 127 | tq->size++; | ||
| 192 | 128 | ||
| 193 | // Lambdas would make this a million times easier, as I could wrap this whole thing in a while loop then run a bunch of in-line | 129 | return 0; |
| 194 | // callbacks that do these operations and I wouldn't need this badness. That or I could use a goto, but I also hate that idea | ||
| 195 | } | 130 | } |
| 196 | 131 | ||
| 197 | void cqueue_cancel(cqueue * const cq) { | 132 | task * taskqueue_pop(taskqueue *tq) { |
| 198 | if(!cq) | 133 | if(!tq) {errno = EINVAL; return NULL;} |
| 199 | return; | 134 | if(tq->size <= 0) {errno = ENODATA; return NULL;} |
| 200 | 135 | ||
| 201 | mtx_lock(&cq->mtx); | 136 | tqnode *end = tq->end; |
| 202 | 137 | task *ret = end->task; | |
| 203 | if(cq->canceled) { | 138 | |
| 204 | mtx_unlock(&cq->mtx); | 139 | if(tq->size == 1) { |
| 205 | return; | 140 | tq->end = NULL; |
| 141 | tq->start = NULL; | ||
| 142 | } else { | ||
| 143 | tq->end = end->prev; | ||
| 144 | tq->end->next = NULL; | ||
| 206 | } | 145 | } |
| 207 | cq->canceled = 1; | ||
| 208 | 146 | ||
| 209 | mtx_unlock(&cq->mtx); | 147 | free(end); |
| 210 | cnd_broadcast(&cq->cnd); | 148 | tq->size--; |
| 211 | 149 | return ret; | |
| 212 | return; | ||
| 213 | } | 150 | } |
| 214 | 151 | ||
| 215 | static int ___cqueue_join(void *t) { | 152 | int taskqueue_pushfront(taskqueue *tq, task *tsk) { |
| 216 | if(!t) | 153 | if(!tq || !tsk) {errno = EINVAL; return -1;} |
| 154 | |||
| 155 | int hf; | ||
| 156 | if((hf = taskqueue_handlefirst(tq, tsk))) | ||
| 157 | return (hf >= 0) ? 0 : -1; | ||
| 158 | |||
| 159 | tqnode *newend = tqnode_init(NULL, tq->end, tsk); | ||
| 160 | if(!newend) | ||
| 217 | return -1; | 161 | return -1; |
| 162 | tq->end->next = newend; | ||
| 163 | tq->end = newend; | ||
| 164 | tq->size++; | ||
| 218 | 165 | ||
| 219 | int retval = 0; | 166 | return 0; |
| 220 | thrd_join(*((thrd_t*)t), &retval); | ||
| 221 | |||
| 222 | return retval; | ||
| 223 | } | 167 | } |
| 224 | 168 | ||
| 225 | void cqueue_free(void *cq) { | 169 | task * taskqueue_popback(taskqueue *tq) { |
| 226 | if(!cq) | 170 | if(!tq) {errno = EINVAL; return NULL;} |
| 227 | return; | 171 | if(tq->size <= 0) {errno = ENODATA; return NULL;} |
| 228 | 172 | ||
| 229 | cqueue *real = (cqueue *)cq; | 173 | tqnode *start = tq->start; |
| 174 | task *ret = start->task; | ||
| 230 | 175 | ||
| 231 | // Cancel threads and wait for them to exit | 176 | if(tq->size == 1) { |
| 232 | cqueue_cancel(real); | 177 | tq->start = NULL; |
| 233 | dlinkedlist_foreach(real->rthreads, ___cqueue_join); | 178 | tq->end = NULL; |
| 179 | } else { | ||
| 180 | tq->start = start->next; | ||
| 181 | tq->start->prev = NULL; | ||
| 182 | } | ||
| 234 | 183 | ||
| 235 | // Threads are dead, no need to worry about concurrency anymore | 184 | free(start); |
| 236 | mtx_destroy(&real->mtx); | 185 | tq->size--; |
| 237 | cnd_destroy(&real->cnd); | 186 | return ret; |
| 238 | dlinkedlist_free(real->rthreads); | 187 | } |
| 239 | dlinkedlist_free(real->taskqueue); | ||
| 240 | 188 | ||
| 241 | return; | 189 | int taskqueue_size(taskqueue *tq) { |
| 190 | if(!tq) {errno = EINVAL; return -1;} | ||
| 191 | return tq->size; | ||
| 242 | } | 192 | } |
| 243 | 193 | ||
| 244 | int cqueue_addtask(cqueue * const cq, task * const tsk) { | ||
| 245 | if(!cq || !tsk) | ||
| 246 | RETURNWERR(EINVAL, -1); | ||
| 247 | 194 | ||
| 248 | mtx_lock(&cq->mtx); | ||
| 249 | |||
| 250 | if(cq->canceled) { | ||
| 251 | mtx_unlock(&cq->mtx); | ||
| 252 | RETURNWERR(ECANCELED, -1); | ||
| 253 | } | ||
| 254 | 195 | ||
| 255 | dlinkedlist_prepend(cq->taskqueue, tsk, task_free); | 196 | // Internal helper macro for ctq functions. Acquires a lock via the ctq's mutex, checks to see if the queue has been canceled, then executes "code" as written |
| 256 | mtx_unlock(&cq->mtx); | 197 | #define __CTQ_INLOCK(ctq, retval, code) do {\ |
| 257 | cnd_signal(&cq->cnd); | 198 | mtx_lock(&(ctq)->mutex); \ |
| 199 | if((ctq)->canceled) { \ | ||
| 200 | errno = ECANCELED; \ | ||
| 201 | mtx_unlock(&(ctq)->mutex); \ | ||
| 202 | return (retval); \ | ||
| 203 | } \ | ||
| 204 | \ | ||
| 205 | code \ | ||
| 206 | mtx_unlock(&(ctq)->mutex); \ | ||
| 207 | } while (0) | ||
| 258 | 208 | ||
| 259 | return 0; | 209 | static void ___ucl_mtxdestroy(void *mtx) { |
| 210 | if(!mtx) return; | ||
| 211 | mtx_destroy((mtx_t *)mtx); | ||
| 212 | return; | ||
| 260 | } | 213 | } |
| 261 | 214 | ||
| 262 | task * cqueue_waitpop(cqueue * const cq) { | 215 | static void ___ucl_cnddestroy(void *cond) { |
| 263 | if(!cq) | 216 | if(cond) return; |
| 264 | RETURNWERR(EINVAL, NULL); | 217 | cnd_destroy((cnd_t *)cond); |
| 218 | return; | ||
| 219 | } | ||
| 265 | 220 | ||
| 266 | task *tsk = NULL; | 221 | ctqueue * ctqueue_init(int nthreads) { |
| 267 | int index = -1; | 222 | if(nthreads <= 0) {errno = EINVAL; return NULL;} |
| 223 | cleanup_CREATE(6); | ||
| 268 | 224 | ||
| 269 | mtx_lock(&cq->mtx); | 225 | ctqueue *ctq = calloc(1, sizeof(*ctq)); |
| 270 | while(dlinkedlist_size(cq->taskqueue) <= 0 && !cq->canceled) | 226 | if(!ctq) |
| 271 | cnd_wait(&cq->cnd, &cq->mtx); | 227 | return NULL; |
| 228 | cleanup_REGISTER(free, ctq); | ||
| 272 | 229 | ||
| 273 | if(cq->canceled) { | 230 | ctq->canceled = 0; |
| 274 | mtx_unlock(&cq->mtx); | 231 | ctq->talen = nthreads; |
| 275 | thrd_exit(-1); | ||
| 276 | } | ||
| 277 | 232 | ||
| 278 | tsk = dlinkedlist_get(cq->taskqueue, (index = dlinkedlist_size(cq->taskqueue) - 1)); | 233 | cleanup_CNDEXEC( |
| 279 | dlinkedlist_remove(cq->taskqueue, index); | 234 | ctq->tq = taskqueue_init(); |
| 235 | if(!ctq->tq) | ||
| 236 | cleanup_MARK(); | ||
| 237 | cleanup_CNDREGISTER(taskqueue_free, ctq->tq); | ||
| 238 | ); | ||
| 280 | 239 | ||
| 281 | mtx_unlock(&cq->mtx); | 240 | cleanup_CNDEXEC( |
| 241 | if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) | ||
| 242 | cleanup_MARK(); | ||
| 243 | cleanup_CNDREGISTER(___ucl_mtxdestroy, (void*)&ctq->mutex); | ||
| 244 | ); | ||
| 282 | 245 | ||
| 283 | return tsk; | 246 | cleanup_CNDEXEC( |
| 247 | if(cnd_init(&ctq->cond) != thrd_success) | ||
| 248 | cleanup_MARK(); | ||
| 249 | cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond); | ||
| 250 | ); | ||
| 251 | |||
| 252 | cleanup_CNDEXEC( | ||
| 253 | ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); | ||
| 254 | if(!ctq->thrdarr) | ||
| 255 | cleanup_MARK(); | ||
| 256 | cleanup_CNDREGISTER(free, ctq->thrdarr); | ||
| 257 | ) | ||
| 258 | |||
| 259 | cleanup_CNDFIRE(); | ||
| 260 | if(cleanup_ERRORFLAGGED) | ||
| 261 | return NULL; | ||
| 262 | |||
| 263 | return ctq; | ||
| 284 | } | 264 | } |
| 285 | 265 | ||
| 286 | static int consumer(void *cq) { | 266 | int ctqueue_cancel(ctqueue *ctq) { |
| 287 | if(!cq) | 267 | if(!ctq) {errno = EINVAL; return -1;} |
| 288 | thrd_exit(-1); | ||
| 289 | 268 | ||
| 290 | cqueue *real = (cqueue *)cq; | 269 | __CTQ_INLOCK(ctq, 1, |
| 291 | for(task *ctask;;) { | 270 | ctq->canceled = 1; |
| 292 | ctask = cqueue_waitpop(real); | 271 | ); |
| 293 | if(!ctask) | 272 | cnd_broadcast(&ctq->cond); |
| 294 | task_fire(ctask); | ||
| 295 | } | ||
| 296 | 273 | ||
| 297 | thrd_exit(0); | 274 | return 0; |
| 298 | } | 275 | } |
| 299 | 276 | ||
| 300 | int cqueue_registerthreads(cqueue * const cq, int threads) { | 277 | void ctqueue_free(void *ctq) { |
| 301 | if(!cq || threads <= 0) | 278 | if(!ctq) |
| 302 | RETURNWERR(EINVAL, -1); | 279 | return; |
| 303 | 280 | ||
| 304 | mtx_lock(&cq->mtx); | 281 | ctqueue *real = (ctqueue *)ctq; |
| 305 | if(cq->canceled) { | 282 | ctqueue_cancel(real); |
| 306 | mtx_unlock(&cq->mtx); | ||
| 307 | RETURNWERR(ECANCELED, -1); | ||
| 308 | } | ||
| 309 | 283 | ||
| 310 | thrd_t *newthreads[threads]; | 284 | for(int i = 0; i < real->talen; i++) |
| 311 | for(int i = 0; i < threads; i++) { | 285 | thrd_join(real->thrdarr[i], NULL); |
| 312 | newthreads[i] = VALLOC(1, sizeof(thrd_t)); | ||
| 313 | if(!newthreads[i]) { | ||
| 314 | for(int j = 0; j < i; j++) | ||
| 315 | free(newthreads[j]); | ||
| 316 | 286 | ||
| 317 | return -1; | 287 | // Threads are dead, everything's free game |
| 318 | } | 288 | mtx_destroy(&real->mutex); |
| 289 | cnd_destroy(&real->cond); | ||
| 290 | taskqueue_free(real->tq); | ||
| 291 | free(real->thrdarr); | ||
| 292 | free(real); | ||
| 319 | 293 | ||
| 320 | dlinkedlist_prepend(cq->rthreads, newthreads[i], free); | 294 | // TODO: figure out if it's necessary / a good idea to do error handling on these functions |
| 321 | thrd_create(newthreads[i], consumer, cq); | ||
| 322 | } | ||
| 323 | 295 | ||
| 324 | mtx_unlock(&cq->mtx); | 296 | return; |
| 297 | } | ||
| 325 | 298 | ||
| 326 | return 0; | 299 | int ctqueue_waitpush(ctqueue *ctq, task *tsk) { |
| 300 | if(!ctq || !tsk) {errno = EINVAL; return -1;} | ||
| 301 | int retval = 0; | ||
| 302 | |||
| 303 | __CTQ_INLOCK(ctq, -1, | ||
| 304 | retval = taskqueue_push(ctq->tq, tsk); | ||
| 305 | ); | ||
| 306 | if(retval == 0) | ||
| 307 | cnd_signal(&ctq->cond); | ||
| 308 | |||
| 309 | return retval; | ||
| 327 | } | 310 | } |
| 328 | 311 | ||
| 329 | int cqueue_registerthread(cqueue * const cq) { | 312 | task * ctqueue_waitpop(ctqueue *ctq) { |
| 330 | return cqueue_registerthreads(cq, 1); | 313 | if(!ctq) {errno = EINVAL; return NULL;} |
| 314 | task *retval = NULL; | ||
| 315 | |||
| 316 | __CTQ_INLOCK(ctq, NULL, | ||
| 317 | while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled) | ||
| 318 | cnd_wait(&ctq->cond, &ctq->mutex); | ||
| 319 | |||
| 320 | if(ctq->canceled) { | ||
| 321 | errno = ECANCELED; | ||
| 322 | mtx_unlock(&ctq->mutex); | ||
| 323 | return NULL; | ||
| 324 | } | ||
| 325 | |||
| 326 | retval = taskqueue_pop(ctq->tq); | ||
| 327 | ); | ||
| 328 | |||
| 329 | return retval; | ||
| 331 | } | 330 | } |
| 332 | 331 | ||
| 333 | enum __CQUEUE_STAT_OPTIONS { | 332 | static int __CTQ_CONSUMER(void *ctq) { |
| 334 | __CQUEUE_STAT_NOTDEF, | 333 | if(!ctq) {errno = EINVAL; thrd_exit(-1);} |
| 335 | 334 | ctqueue *real = (ctqueue *)ctq; | |
| 336 | __CQUEUE_CANCELED, | 335 | |
| 337 | __CQUEUE_THREADS_NUM, | 336 | for(task *ctask = NULL;;) { |
| 338 | __CQUEUE_TASKS_NUM, | 337 | ctask = ctqueue_waitpop(real); |
| 339 | 338 | if(!ctask) | |
| 340 | __CQUEUE_STAT_TOOBIG, | 339 | break; |
| 341 | }; | 340 | |
| 342 | 341 | task_fire(ctask); | |
| 343 | int cqueue_getstat(cqueue * const cq, enum __CQUEUE_STAT_OPTIONS opt) { | 342 | task_free(ctask); |
| 344 | if(!cq || opt <= __CQUEUE_STAT_NOTDEF || opt >= __CQUEUE_STAT_TOOBIG) | ||
| 345 | RETURNWERR(EINVAL, -1); | ||
| 346 | |||
| 347 | int retval = -1; | ||
| 348 | mtx_lock(&cq->mtx); | ||
| 349 | |||
| 350 | switch(opt) { | ||
| 351 | case __CQUEUE_CANCELED: | ||
| 352 | retval = cq->canceled; | ||
| 353 | mtx_unlock(&cq->mtx); | ||
| 354 | return retval; | ||
| 355 | break; | ||
| 356 | |||
| 357 | case __CQUEUE_THREADS_NUM: | ||
| 358 | retval = dlinkedlist_size(cq->rthreads); | ||
| 359 | mtx_unlock(&cq->mtx); | ||
| 360 | return retval; | ||
| 361 | break; | ||
| 362 | |||
| 363 | case __CQUEUE_TASKS_NUM: | ||
| 364 | retval = dlinkedlist_size(cq->taskqueue); | ||
| 365 | mtx_unlock(&cq->mtx); | ||
| 366 | return retval; | ||
| 367 | break; | ||
| 368 | |||
| 369 | default: | ||
| 370 | RETURNWERR(EINVAL, -1); | ||
| 371 | break; | ||
| 372 | } | 343 | } |
| 373 | 344 | ||
| 374 | // This should absolutely never run | 345 | thrd_exit(1); // non-zero indicates error, -1 indicates invalid argument |
| 375 | RETURNWERR(ENOTRECOVERABLE, -1); | ||
| 376 | } | 346 | } |
| 377 | 347 | ||
| 378 | int cqueue_iscanceled(cqueue * const cq) { | 348 | int ctqueue_start(ctqueue *ctq) { |
| 379 | return cqueue_getstat(cq, __CQUEUE_CANCELED); | 349 | if(!ctq) {errno = EINVAL; return -1;} |
| 380 | } | 350 | |
| 381 | int cqueue_numthreads(cqueue * const cq) { | 351 | ctq->canceled = 0; |
| 382 | return cqueue_getstat(cq, __CQUEUE_THREADS_NUM); | 352 | |
| 383 | } | 353 | int retval = 0; |
| 384 | int cqueue_numtasks(cqueue * const cq) { | 354 | for(int i = 0; i < ctq->talen; i++) |
| 385 | return cqueue_getstat(cq, __CQUEUE_TASKS_NUM); | 355 | if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success) |
| 386 | } | 356 | break; |
| 357 | |||
| 358 | if(retval != thrd_success) | ||
| 359 | ctqueue_cancel(ctq); | ||
| 360 | |||
| 361 | return (retval == thrd_success) ? 0 : -1; | ||
| 362 | } \ No newline at end of file | ||
diff --git a/src/threadpool.h b/src/threadpool.h index d7b713c..3048eea 100644 --- a/src/threadpool.h +++ b/src/threadpool.h | |||
| @@ -1,26 +1,74 @@ | |||
| 1 | #ifndef __VXGG_REWRITE___THREADPOOL_H___13601325413136___ | 1 | #ifndef __VXGG_REWRITE___THREADPOOL_H___193271180830131___ |
| 2 | #define __VXGG_REWRITE___THREADPOOL_H___13601325413136___ | 2 | #define __VXGG_REWRITE___THREADPOOL_H___193271180830131___ 1 |
| 3 | 3 | ||
| 4 | #include "shared.h" | ||
| 4 | #include <threads.h> | 5 | #include <threads.h> |
| 5 | 6 | ||
| 6 | typedef int (*task_callback)(void*); | 7 | typedef struct task { |
| 7 | typedef struct task task; | 8 | gcallback callback; |
| 8 | typedef struct cq cqueue; | 9 | fcallback freecb; |
| 9 | typedef struct tp threadpool; | 10 | void *data; |
| 10 | 11 | } task; | |
| 11 | task * task_init(task_callback cb, void *arg); | 12 | |
| 12 | void task_free(void *ts); | 13 | typedef struct tqnode { |
| 13 | int task_fire(task *ts); | 14 | struct tqnode *next; |
| 14 | 15 | struct tqnode *prev; | |
| 15 | cqueue * cqueue_init(); | 16 | task *task; |
| 16 | void cqueue_cancel(cqueue * const cq); | 17 | } tqnode; |
| 17 | void cqueue_free(void *cq); | 18 | |
| 18 | int cqueue_addtask(cqueue * const cq, task * const tsk); | 19 | typedef struct taskqueue { |
| 19 | task * cqueue_waitpop(cqueue * const cq); | 20 | tqnode *start; |
| 20 | int cqueue_registerthreads(cqueue * const cq, int threads); | 21 | tqnode *end; |
| 21 | int cqueue_registerthread(cqueue * const cq); | 22 | unsigned int size; |
| 22 | int cqueue_iscanceled(cqueue * const cq); | 23 | } taskqueue; |
| 23 | int cqueue_numthreads(cqueue * const cq); | 24 | |
| 24 | int cqueue_numtasks(cqueue * const cq); | 25 | typedef struct ctqueue { |
| 26 | mtx_t mutex; | ||
| 27 | cnd_t cond; | ||
| 28 | unsigned char canceled; | ||
| 29 | |||
| 30 | taskqueue *tq; | ||
| 31 | thrd_t *thrdarr; | ||
| 32 | int talen; | ||
| 33 | } ctqueue; | ||
| 34 | |||
| 35 | // Create a new task. Sets `errno` and returns `NULL` on error | ||
| 36 | task * task_init(gcallback callback, fcallback freecb, void *data); | ||
| 37 | // Free a task. Passes the `.data` member to specified fcallback as a function parameter. Does not return a value or set `errno` | ||
| 38 | void task_free(void *tsk); | ||
| 39 | // Fire a task. Passes the `.data` member to specified gcallback as a function parameter. Returns value of gcallback, or sets `errno` and returns `-1` on error | ||
| 40 | int task_fire(task *tsk); | ||
| 41 | // Fire and free a task simultaneously. Calls specified gcallback and fcallback on associated data. Returns value of gcallback, or sets `errno` and returns `-1` on error | ||
| 42 | int task_fired(task *tsk); | ||
| 43 | |||
| 44 | |||
| 45 | |||
| 46 | // Create a FIFO queue of task objects. Returns a new taskqueue on success, sets `errno` and returns `NULL` on error | ||
| 47 | taskqueue * taskqueue_init(void); | ||
| 48 | // Free a taskqueue. Does not return a value or set `errno` | ||
| 49 | void taskqueue_free(void *tq); | ||
| 50 | // Push a task onto the queue. Returns 0 on success, sets `errno` and returns `-1` on error | ||
| 51 | int taskqueue_push(taskqueue *tq, task *tsk); | ||
| 52 | // Pop a task from the queue. Returns a task on success, sets `errno` and returns `NULL` on error | ||
| 53 | task * taskqueue_pop(taskqueue *tq); | ||
| 54 | // Push a task to the front of the queue (append, task becomes first in line to be popped). Returns 0 on success, sets `errno` and returns `-1` on error | ||
| 55 | int taskqueue_pushfront(taskqueue *tq, task *tsk); | ||
| 56 | // Pop a task from the back of the queue (pop the most recently (normally) pushed item). Returns a task on success, sets `errno` and returns `NULL` on error | ||
| 57 | task * taskqueue_popback(taskqueue *tq); | ||
| 58 | |||
| 59 | |||
| 60 | |||
| 61 | // Create a concurrent taskqueue with `size` allocated threads | ||
| 62 | ctqueue * ctqueue_init(int size); | ||
| 63 | // Cancel a currently running ctq | ||
| 64 | int ctqueue_cancel(ctqueue *ctq); | ||
| 65 | // Free a ctq (cancels any remaining operations) | ||
| 66 | void ctqueue_free(void *ctq); | ||
| 67 | // Push a new task to the queue, waiting via mutex to do so | ||
| 68 | int ctqueue_waitpush(ctqueue *ctq, task *tsk); | ||
| 69 | // Pop a task from the queue, waiting via mutex to do so | ||
| 70 | task * ctqueue_waitpop(ctqueue *ctq); | ||
| 71 | // Spawn the allocated threads for a ctq | ||
| 72 | int ctqueue_start(ctqueue *ctq); | ||
| 25 | 73 | ||
| 26 | #endif \ No newline at end of file | 74 | #endif \ No newline at end of file |
