diff options
| author | @syxhe <https://t.me/syxhe> | 2025-05-16 19:09:02 -0500 |
|---|---|---|
| committer | @syxhe <https://t.me/syxhe> | 2025-05-16 19:09:02 -0500 |
| commit | 60715c7785e7fbe4759df64258ad58a8e2a0769d (patch) | |
| tree | 7471bc9816853518bdbc8baa02f817d5cf73594b | |
| parent | 5a5ff094101d63935f7fc65c124b3e56d553522d (diff) | |
Do some more work on the concurrent queue
| -rw-r--r-- | src/shared.h | 9 | ||||
| -rw-r--r-- | src/threadpool.c | 171 | ||||
| -rw-r--r-- | src/threadpool.h | 10 |
3 files changed, 147 insertions, 43 deletions
diff --git a/src/shared.h b/src/shared.h index 825814e..66d1af3 100644 --- a/src/shared.h +++ b/src/shared.h | |||
| @@ -7,6 +7,9 @@ | |||
| 7 | #define FALSE 0 | 7 | #define FALSE 0 |
| 8 | #define TRUE 1 | 8 | #define TRUE 1 |
| 9 | 9 | ||
| 10 | typedef int (*gcallback)(void*); // Generic callback signature | ||
| 11 | typedef void (*fcallback)(void*); // free()-like callback signature | ||
| 12 | |||
| 10 | #define RETURNWERR(errval, retval) do {\ | 13 | #define RETURNWERR(errval, retval) do {\ |
| 11 | errno = (errval);\ | 14 | errno = (errval);\ |
| 12 | return (retval);\ | 15 | return (retval);\ |
| @@ -60,12 +63,12 @@ char * xdirname(const char * const path); | |||
| 60 | 63 | ||
| 61 | 64 | ||
| 62 | // Cleanup callback. Should act like `free()`, in that it doesn't crash if the pointer it's given is null | 65 | // Cleanup callback. Should act like `free()`, in that it doesn't crash if the pointer it's given is null |
| 63 | typedef void (*cleanup_callback)(void*); | 66 | typedef fcallback cleanup_callback; |
| 64 | 67 | ||
| 65 | // Cleanup struct. Stores a STATICALLY defined array of callbacks and void* arguments | 68 | // Cleanup struct. Stores a STATICALLY defined array of callbacks and void* arguments |
| 66 | typedef struct cl { | 69 | typedef struct cl { |
| 67 | cleanup_callback *funcs; | 70 | cleanup_callback *funcs; // Actual type: cleanup_callback funcs[] |
| 68 | void **args; | 71 | void **args; // Actual type: void *args[] |
| 69 | 72 | ||
| 70 | int size; | 73 | int size; |
| 71 | int used; | 74 | int used; |
diff --git a/src/threadpool.c b/src/threadpool.c index 0baa024..6912790 100644 --- a/src/threadpool.c +++ b/src/threadpool.c | |||
| @@ -3,6 +3,8 @@ | |||
| 3 | 3 | ||
| 4 | #include "ll.h" | 4 | #include "ll.h" |
| 5 | 5 | ||
| 6 | #include <asm-generic/errno-base.h> | ||
| 7 | #include <asm-generic/errno.h> | ||
| 6 | #include <threads.h> | 8 | #include <threads.h> |
| 7 | #include <stdlib.h> | 9 | #include <stdlib.h> |
| 8 | #include <errno.h> | 10 | #include <errno.h> |
| @@ -104,7 +106,7 @@ task * task_init(task_callback cb, void *arg) { | |||
| 104 | return task; | 106 | return task; |
| 105 | } | 107 | } |
| 106 | 108 | ||
| 107 | void task_free(task *ts) { | 109 | void task_free(void *ts) { |
| 108 | if(!ts) | 110 | if(!ts) |
| 109 | return; | 111 | return; |
| 110 | 112 | ||
| @@ -215,8 +217,7 @@ static int ___cqueue_join(void *t) { | |||
| 215 | return -1; | 217 | return -1; |
| 216 | 218 | ||
| 217 | int retval = 0; | 219 | int retval = 0; |
| 218 | thrd_t thread = *((thrd_t*)t); | 220 | thrd_join(*((thrd_t*)t), &retval); |
| 219 | thrd_join(thread, &retval); | ||
| 220 | 221 | ||
| 221 | return retval; | 222 | return retval; |
| 222 | } | 223 | } |
| @@ -240,52 +241,146 @@ void cqueue_free(void *cq) { | |||
| 240 | return; | 241 | return; |
| 241 | } | 242 | } |
| 242 | 243 | ||
| 243 | // int cqueue_addtask(cqueue * const cq, task * const tsk) { | 244 | int cqueue_addtask(cqueue * const cq, task * const tsk) { |
| 244 | // if(!cq || !tsk) | 245 | if(!cq || !tsk) |
| 245 | // RETURNWERR(EINVAL, -1); | 246 | RETURNWERR(EINVAL, -1); |
| 246 | 247 | ||
| 247 | // mtx_lock(cq->mtx); | 248 | mtx_lock(&cq->mtx); |
| 248 | 249 | ||
| 249 | // // TODO: Think about creating an "exception" via signal handling | 250 | if(cq->canceled) { |
| 250 | // if(cq->canceled) { | 251 | mtx_unlock(&cq->mtx); |
| 251 | // mtx_unlock(cq->mtx); | 252 | RETURNWERR(ECANCELED, -1); |
| 252 | // thrd_exit(-1); | 253 | } |
| 253 | // } | ||
| 254 | 254 | ||
| 255 | // dlinkedlist_prepend(cq->list, tsk, free); | 255 | dlinkedlist_prepend(cq->taskqueue, tsk, task_free); |
| 256 | // mtx_unlock(cq->mtx); | 256 | mtx_unlock(&cq->mtx); |
| 257 | // cnd_signal(cq->cnd); | 257 | cnd_signal(&cq->cnd); |
| 258 | 258 | ||
| 259 | // return 0; | 259 | return 0; |
| 260 | // } | 260 | } |
| 261 | 261 | ||
| 262 | // task * cqueue_waitpop(cqueue * const cq) { | 262 | task * cqueue_waitpop(cqueue * const cq) { |
| 263 | // if(!cq) | 263 | if(!cq) |
| 264 | // RETURNWERR(EINVAL, NULL); | 264 | RETURNWERR(EINVAL, NULL); |
| 265 | 265 | ||
| 266 | // task *retval = NULL; | 266 | task *tsk = NULL; |
| 267 | int index = -1; | ||
| 267 | 268 | ||
| 268 | // mtx_lock(cq->mtx); | 269 | mtx_lock(&cq->mtx); |
| 269 | // while(dlinkedlist_isempty(cq->list) && !cq->canceled) | 270 | while(dlinkedlist_size(cq->taskqueue) <= 0 && !cq->canceled) |
| 270 | // cnd_wait(cq->cnd, cq->mtx); | 271 | cnd_wait(&cq->cnd, &cq->mtx); |
| 271 | 272 | ||
| 272 | // if(cq->canceled) { | 273 | if(cq->canceled) { |
| 273 | // mtx_unlock(cq->mtx); | 274 | mtx_unlock(&cq->mtx); |
| 274 | // thrd_exit(-1); | 275 | thrd_exit(-1); |
| 275 | // } | 276 | } |
| 276 | 277 | ||
| 277 | // retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); | 278 | tsk = dlinkedlist_get(cq->taskqueue, (index = dlinkedlist_size(cq->taskqueue) - 1)); |
| 278 | // dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); | 279 | dlinkedlist_remove(cq->taskqueue, index); |
| 279 | // mtx_unlock(cq->mtx); | ||
| 280 | 280 | ||
| 281 | // return retval; | 281 | mtx_unlock(&cq->mtx); |
| 282 | // } | ||
| 283 | 282 | ||
| 283 | return tsk; | ||
| 284 | } | ||
| 285 | |||
| 286 | static int consumer(void *cq) { | ||
| 287 | if(!cq) | ||
| 288 | thrd_exit(-1); | ||
| 289 | |||
| 290 | cqueue *real = (cqueue *)cq; | ||
| 291 | for(task *ctask;;) { | ||
| 292 | ctask = cqueue_waitpop(real); | ||
| 293 | if(!ctask) | ||
| 294 | task_fire(ctask); | ||
| 295 | } | ||
| 296 | |||
| 297 | thrd_exit(0); | ||
| 298 | } | ||
| 299 | |||
| 300 | int cqueue_registerthreads(cqueue * const cq, int threads) { | ||
| 301 | if(!cq || threads <= 0) | ||
| 302 | RETURNWERR(EINVAL, -1); | ||
| 303 | |||
| 304 | mtx_lock(&cq->mtx); | ||
| 305 | if(cq->canceled) { | ||
| 306 | mtx_unlock(&cq->mtx); | ||
| 307 | RETURNWERR(ECANCELED, -1); | ||
| 308 | } | ||
| 284 | 309 | ||
| 310 | thrd_t *newthreads[threads]; | ||
| 311 | for(int i = 0; i < threads; i++) { | ||
| 312 | newthreads[i] = VALLOC(1, sizeof(thrd_t)); | ||
| 313 | if(!newthreads[i]) { | ||
| 314 | for(int j = 0; j < i; j++) | ||
| 315 | free(newthreads[j]); | ||
| 285 | 316 | ||
| 286 | typedef struct tp { | 317 | return -1; |
| 287 | thrd_t **threads; // thrd_t *threads[] | 318 | } |
| 288 | int nthreads; | ||
| 289 | 319 | ||
| 290 | cqueue *taskqueue; | 320 | dlinkedlist_prepend(cq->rthreads, newthreads[i], free); |
| 291 | } threadpool; | 321 | thrd_create(newthreads[i], consumer, cq); |
| 322 | } | ||
| 323 | |||
| 324 | mtx_unlock(&cq->mtx); | ||
| 325 | |||
| 326 | return 0; | ||
| 327 | } | ||
| 328 | |||
| 329 | int cqueue_registerthread(cqueue * const cq) { | ||
| 330 | return cqueue_registerthreads(cq, 1); | ||
| 331 | } | ||
| 332 | |||
| 333 | enum __CQUEUE_STAT_OPTIONS { | ||
| 334 | __CQUEUE_STAT_NOTDEF, | ||
| 335 | |||
| 336 | __CQUEUE_CANCELED, | ||
| 337 | __CQUEUE_THREADS_NUM, | ||
| 338 | __CQUEUE_TASKS_NUM, | ||
| 339 | |||
| 340 | __CQUEUE_STAT_TOOBIG, | ||
| 341 | }; | ||
| 342 | |||
| 343 | int cqueue_getstat(cqueue * const cq, enum __CQUEUE_STAT_OPTIONS opt) { | ||
| 344 | if(!cq || opt <= __CQUEUE_STAT_NOTDEF || opt >= __CQUEUE_STAT_TOOBIG) | ||
| 345 | RETURNWERR(EINVAL, -1); | ||
| 346 | |||
| 347 | int retval = -1; | ||
| 348 | mtx_lock(&cq->mtx); | ||
| 349 | |||
| 350 | switch(opt) { | ||
| 351 | case __CQUEUE_CANCELED: | ||
| 352 | retval = cq->canceled; | ||
| 353 | mtx_unlock(&cq->mtx); | ||
| 354 | return retval; | ||
| 355 | break; | ||
| 356 | |||
| 357 | case __CQUEUE_THREADS_NUM: | ||
| 358 | retval = dlinkedlist_size(cq->rthreads); | ||
| 359 | mtx_unlock(&cq->mtx); | ||
| 360 | return retval; | ||
| 361 | break; | ||
| 362 | |||
| 363 | case __CQUEUE_TASKS_NUM: | ||
| 364 | retval = dlinkedlist_size(cq->taskqueue); | ||
| 365 | mtx_unlock(&cq->mtx); | ||
| 366 | return retval; | ||
| 367 | break; | ||
| 368 | |||
| 369 | default: | ||
| 370 | RETURNWERR(EINVAL, -1); | ||
| 371 | break; | ||
| 372 | } | ||
| 373 | |||
| 374 | // This should absolutely never run | ||
| 375 | RETURNWERR(ENOTRECOVERABLE, -1); | ||
| 376 | } | ||
| 377 | |||
| 378 | int cqueue_iscanceled(cqueue * const cq) { | ||
| 379 | return cqueue_getstat(cq, __CQUEUE_CANCELED); | ||
| 380 | } | ||
| 381 | int cqueue_numthreads(cqueue * const cq) { | ||
| 382 | return cqueue_getstat(cq, __CQUEUE_THREADS_NUM); | ||
| 383 | } | ||
| 384 | int cqueue_numtasks(cqueue * const cq) { | ||
| 385 | return cqueue_getstat(cq, __CQUEUE_TASKS_NUM); | ||
| 386 | } | ||
diff --git a/src/threadpool.h b/src/threadpool.h index db6fa2e..d7b713c 100644 --- a/src/threadpool.h +++ b/src/threadpool.h | |||
| @@ -9,12 +9,18 @@ typedef struct cq cqueue; | |||
| 9 | typedef struct tp threadpool; | 9 | typedef struct tp threadpool; |
| 10 | 10 | ||
| 11 | task * task_init(task_callback cb, void *arg); | 11 | task * task_init(task_callback cb, void *arg); |
| 12 | void task_free(task *ts); | 12 | void task_free(void *ts); |
| 13 | int task_fire(task *ts); | 13 | int task_fire(task *ts); |
| 14 | 14 | ||
| 15 | cqueue * cqueue_init(); | 15 | cqueue * cqueue_init(); |
| 16 | void cqueue_cancel(cqueue * const cq); | 16 | void cqueue_cancel(cqueue * const cq); |
| 17 | void cqueue_free(void *cq); | 17 | void cqueue_free(void *cq); |
| 18 | 18 | int cqueue_addtask(cqueue * const cq, task * const tsk); | |
| 19 | task * cqueue_waitpop(cqueue * const cq); | ||
| 20 | int cqueue_registerthreads(cqueue * const cq, int threads); | ||
| 21 | int cqueue_registerthread(cqueue * const cq); | ||
| 22 | int cqueue_iscanceled(cqueue * const cq); | ||
| 23 | int cqueue_numthreads(cqueue * const cq); | ||
| 24 | int cqueue_numtasks(cqueue * const cq); | ||
| 19 | 25 | ||
| 20 | #endif \ No newline at end of file | 26 | #endif \ No newline at end of file |
