diff options
Diffstat (limited to 'src/threadpool.c')
| -rw-r--r-- | src/threadpool.c | 148 |
1 files changed, 126 insertions, 22 deletions
diff --git a/src/threadpool.c b/src/threadpool.c index 02cd945..c266964 100644 --- a/src/threadpool.c +++ b/src/threadpool.c | |||
| @@ -1,15 +1,18 @@ | |||
| 1 | /** | 1 | /** |
| 2 | * @file threadpool.c | 2 | * @file threadpool.c |
| 3 | * @author syxhe (https://t.me/syxhe) | 3 | * @author syxhe (https://t.me/syxhe) |
| 4 | * @brief *Implementing `threadpool.h`* | 4 | * @brief An implementation of a threadpool using libc threads |
| 5 | * @version 0.1 | 5 | * @version 0.1 |
| 6 | * @date 2025-06-09 | 6 | * @date 2025-06-09 |
| 7 | * | 7 | * |
| 8 | * @copyright Copyright (c) 2025 | 8 | * @copyright Copyright (c) 2025 |
| 9 | * | 9 | * |
| 10 | */ | 10 | */ |
| 11 | 11 | ||
| 12 | #include "threadpool.h" | 12 | #ifndef __VXGG_REWRITE___THREADPOOL_C___193271180830131___ |
| 13 | #define __VXGG_REWRITE___THREADPOOL_C___193271180830131___ 1 | ||
| 14 | |||
| 15 | #include "shared.c" | ||
| 13 | 16 | ||
| 14 | #include <threads.h> | 17 | #include <threads.h> |
| 15 | #include <stdlib.h> | 18 | #include <stdlib.h> |
| @@ -18,7 +21,7 @@ | |||
| 18 | 21 | ||
| 19 | /** | 22 | /** |
| 20 | * @brief A generic task - A function, data for that function, and a way to free the data | 23 | * @brief A generic task - A function, data for that function, and a way to free the data |
| 21 | * | 24 | * |
| 22 | */ | 25 | */ |
| 23 | typedef struct task { | 26 | typedef struct task { |
| 24 | gcallback callback; //!< A generic callback to be ran when executing the task | 27 | gcallback callback; //!< A generic callback to be ran when executing the task |
| @@ -28,7 +31,7 @@ typedef struct task { | |||
| 28 | 31 | ||
| 29 | /** | 32 | /** |
| 30 | * @brief An internal structure used for the `taskqueue`. Analogous to a doubly-linked list's internal node | 33 | * @brief An internal structure used for the `taskqueue`. Analogous to a doubly-linked list's internal node |
| 31 | * | 34 | * |
| 32 | */ | 35 | */ |
| 33 | typedef struct tqnode { | 36 | typedef struct tqnode { |
| 34 | struct tqnode *next; //!< The next element in the `taskqueue` | 37 | struct tqnode *next; //!< The next element in the `taskqueue` |
| @@ -38,7 +41,7 @@ typedef struct tqnode { | |||
| 38 | 41 | ||
| 39 | /** | 42 | /** |
| 40 | * @brief A FIFO queue of tasks | 43 | * @brief A FIFO queue of tasks |
| 41 | * | 44 | * |
| 42 | */ | 45 | */ |
| 43 | typedef struct taskqueue { | 46 | typedef struct taskqueue { |
| 44 | tqnode *start; //!< The first element of the queue | 47 | tqnode *start; //!< The first element of the queue |
| @@ -48,7 +51,7 @@ typedef struct taskqueue { | |||
| 48 | 51 | ||
| 49 | /** | 52 | /** |
| 50 | * @brief A `taskqueue` built for concurrent access. Essentially a threadpool | 53 | * @brief A `taskqueue` built for concurrent access. Essentially a threadpool |
| 51 | * | 54 | * |
| 52 | */ | 55 | */ |
| 53 | typedef struct ctqueue { | 56 | typedef struct ctqueue { |
| 54 | mtx_t mutex; //!< A mutex for locking sensitive resources | 57 | mtx_t mutex; //!< A mutex for locking sensitive resources |
| @@ -60,7 +63,14 @@ typedef struct ctqueue { | |||
| 60 | int talen; //!< The length of the thread array | 63 | int talen; //!< The length of the thread array |
| 61 | } ctqueue; | 64 | } ctqueue; |
| 62 | 65 | ||
| 63 | 66 | /** | |
| 67 | * @brief Create a task | ||
| 68 | * | ||
| 69 | * @param callback Callback function the given data should be ran with. Must be non-null | ||
| 70 | * @param freecb Callback function for freeing the given data. May be null | ||
| 71 | * @param data Data to be passed to the callback. May be null | ||
| 72 | * @retval (task*)[NULL, task*] Returns a task object with set parameters. Returns `null` and sets errno on error | ||
| 73 | */ | ||
| 64 | task * task_init(gcallback callback, fcallback freecb, void *data) { | 74 | task * task_init(gcallback callback, fcallback freecb, void *data) { |
| 65 | if(callback == NULL) ERRRET(EINVAL, NULL); | 75 | if(callback == NULL) ERRRET(EINVAL, NULL); |
| 66 | 76 | ||
| @@ -75,11 +85,16 @@ task * task_init(gcallback callback, fcallback freecb, void *data) { | |||
| 75 | return tsk; | 85 | return tsk; |
| 76 | } | 86 | } |
| 77 | 87 | ||
| 88 | /** | ||
| 89 | * @brief Free a task | ||
| 90 | * | ||
| 91 | * @param tsk A task object to free. Frees data associated with task via `freecb` value specified in its creation. May be null | ||
| 92 | */ | ||
| 78 | void task_free(void *tsk) { | 93 | void task_free(void *tsk) { |
| 79 | task *real = (task *)tsk; | 94 | task *real = (task *)tsk; |
| 80 | if(!real) | 95 | if(!real) |
| 81 | return; | 96 | return; |
| 82 | 97 | ||
| 83 | if(real->freecb != NULL) | 98 | if(real->freecb != NULL) |
| 84 | real->freecb(real->data); | 99 | real->freecb(real->data); |
| 85 | free(real); | 100 | free(real); |
| @@ -87,11 +102,23 @@ void task_free(void *tsk) { | |||
| 87 | return; | 102 | return; |
| 88 | } | 103 | } |
| 89 | 104 | ||
| 105 | /** | ||
| 106 | * @brief Fire a task. Passes the `data` member to the specified `callback` | ||
| 107 | * | ||
| 108 | * @param tsk A task to be fired. Must be non-null | ||
| 109 | * @retval (int) Returns value of the fired callback. Returns -1 and sets errno on error | ||
| 110 | */ | ||
| 90 | int task_fire(task *tsk) { | 111 | int task_fire(task *tsk) { |
| 91 | if(!tsk) ERRRET(EINVAL, -1); | 112 | if(!tsk) ERRRET(EINVAL, -1); |
| 92 | return tsk->callback(tsk->data); | 113 | return tsk->callback(tsk->data); |
| 93 | } | 114 | } |
| 94 | 115 | ||
| 116 | /** | ||
| 117 | * @brief Fire and destroy a task simultaneously. Calls specified callback and free-callback on associated data | ||
| 118 | * | ||
| 119 | * @param tsk Task to be fired and destroyed. Must be non-null | ||
| 120 | * @retval (int) Returns value of the callback. Returns -1 and sets errno on error | ||
| 121 | */ | ||
| 95 | int task_fired(task *tsk) { | 122 | int task_fired(task *tsk) { |
| 96 | int retval = task_fire(tsk); | 123 | int retval = task_fire(tsk); |
| 97 | if(errno == EINVAL && retval == -1) {return -1;} | 124 | if(errno == EINVAL && retval == -1) {return -1;} |
| @@ -125,7 +152,11 @@ void tqnode_free(void *tqn) { | |||
| 125 | 152 | ||
| 126 | 153 | ||
| 127 | 154 | ||
| 128 | 155 | /** | |
| 156 | * @brief Create a FIFO queue of tasks | ||
| 157 | * | ||
| 158 | * @retval (taskqueue*)[NULL, taskqueue*] Returns a new taskqueue object. Returns `null` and sets errno on error | ||
| 159 | */ | ||
| 129 | taskqueue * taskqueue_init(void) { | 160 | taskqueue * taskqueue_init(void) { |
| 130 | taskqueue *tq = calloc(1, sizeof(*tq)); | 161 | taskqueue *tq = calloc(1, sizeof(*tq)); |
| 131 | if(!tq) | 162 | if(!tq) |
| @@ -138,6 +169,11 @@ taskqueue * taskqueue_init(void) { | |||
| 138 | return tq; | 169 | return tq; |
| 139 | } | 170 | } |
| 140 | 171 | ||
| 172 | /** | ||
| 173 | * @brief Free a taskqueue | ||
| 174 | * | ||
| 175 | * @param tq A taskqueue to be freed. May be null | ||
| 176 | */ | ||
| 141 | void taskqueue_free(void *tq) { | 177 | void taskqueue_free(void *tq) { |
| 142 | if(!tq) | 178 | if(!tq) |
| 143 | return; | 179 | return; |
| @@ -148,7 +184,7 @@ void taskqueue_free(void *tq) { | |||
| 148 | p = n; | 184 | p = n; |
| 149 | } | 185 | } |
| 150 | free(tq); | 186 | free(tq); |
| 151 | 187 | ||
| 152 | return; | 188 | return; |
| 153 | } | 189 | } |
| 154 | 190 | ||
| @@ -167,6 +203,13 @@ int taskqueue_handlefirst(taskqueue *tq, task *tsk) { | |||
| 167 | return 1; | 203 | return 1; |
| 168 | } | 204 | } |
| 169 | 205 | ||
| 206 | /** | ||
| 207 | * @brief Push a task onto a taskqueue | ||
| 208 | * | ||
| 209 | * @param tq The taskqueue to be modified. Must be non-null | ||
| 210 | * @param tsk The task to push. Must be non-null | ||
| 211 | * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error | ||
| 212 | */ | ||
| 170 | int taskqueue_push(taskqueue *tq, task *tsk) { | 213 | int taskqueue_push(taskqueue *tq, task *tsk) { |
| 171 | if(!tq || !tsk) ERRRET(EINVAL, -1); | 214 | if(!tq || !tsk) ERRRET(EINVAL, -1); |
| 172 | 215 | ||
| @@ -184,12 +227,18 @@ int taskqueue_push(taskqueue *tq, task *tsk) { | |||
| 184 | return 0; | 227 | return 0; |
| 185 | } | 228 | } |
| 186 | 229 | ||
| 230 | /** | ||
| 231 | * @brief Pop a task from a taskqueue | ||
| 232 | * | ||
| 233 | * @param tq A taskqueue to grab a task from. Must be non-null | ||
| 234 | * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error | ||
| 235 | */ | ||
| 187 | task * taskqueue_pop(taskqueue *tq) { | 236 | task * taskqueue_pop(taskqueue *tq) { |
| 188 | if(!tq) ERRRET(EINVAL, NULL); | 237 | if(!tq) ERRRET(EINVAL, NULL); |
| 189 | if(tq->size <= 0) ERRRET(ENODATA, NULL); | 238 | if(tq->size <= 0) ERRRET(ENODATA, NULL); |
| 190 | 239 | ||
| 191 | tqnode *end = tq->end; | 240 | tqnode *end = tq->end; |
| 192 | task *ret = end->task; | 241 | task *ret = end->task; |
| 193 | 242 | ||
| 194 | if(tq->size == 1) { | 243 | if(tq->size == 1) { |
| 195 | tq->end = NULL; | 244 | tq->end = NULL; |
| @@ -204,6 +253,13 @@ task * taskqueue_pop(taskqueue *tq) { | |||
| 204 | return ret; | 253 | return ret; |
| 205 | } | 254 | } |
| 206 | 255 | ||
| 256 | /** | ||
| 257 | * @brief Append a task to the front of a taskqueue | ||
| 258 | * | ||
| 259 | * @param tq The taskqueue to be modified. Must be non-null | ||
| 260 | * @param tsk The task to be appended. Must be non-null | ||
| 261 | * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error | ||
| 262 | */ | ||
| 207 | int taskqueue_pushfront(taskqueue *tq, task *tsk) { | 263 | int taskqueue_pushfront(taskqueue *tq, task *tsk) { |
| 208 | if(!tq || !tsk) ERRRET(EINVAL, -1); | 264 | if(!tq || !tsk) ERRRET(EINVAL, -1); |
| 209 | 265 | ||
| @@ -221,6 +277,12 @@ int taskqueue_pushfront(taskqueue *tq, task *tsk) { | |||
| 221 | return 0; | 277 | return 0; |
| 222 | } | 278 | } |
| 223 | 279 | ||
| 280 | /** | ||
| 281 | * @brief Pop a task from the back (most recently pushed task) of a taskqueue | ||
| 282 | * | ||
| 283 | * @param tq A taskqueue to pop from. Must be non-null | ||
| 284 | * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error | ||
| 285 | */ | ||
| 224 | task * taskqueue_popback(taskqueue *tq) { | 286 | task * taskqueue_popback(taskqueue *tq) { |
| 225 | if(!tq) ERRRET(EINVAL, NULL); | 287 | if(!tq) ERRRET(EINVAL, NULL); |
| 226 | if(tq->size <= 0) ERRRET(ENODATA, NULL); | 288 | if(tq->size <= 0) ERRRET(ENODATA, NULL); |
| @@ -273,6 +335,12 @@ static void ___ucl_cnddestroy(void *cond) { | |||
| 273 | return; | 335 | return; |
| 274 | } | 336 | } |
| 275 | 337 | ||
| 338 | /** | ||
| 339 | * @brief Create a concurrent taskqueue with `size` allocated threads | ||
| 340 | * | ||
| 341 | * @param size Number of threads in the threadpool. Must be greater than zero | ||
| 342 | * @retval (ctqueue*)[NULL, ctqueue*] Returns a new ctqueue, sets errno and returns `null` on error | ||
| 343 | */ | ||
| 276 | ctqueue * ctqueue_init(int nthreads) { | 344 | ctqueue * ctqueue_init(int nthreads) { |
| 277 | if(nthreads <= 0) ERRRET(EINVAL, NULL); | 345 | if(nthreads <= 0) ERRRET(EINVAL, NULL); |
| 278 | cleanup_CREATE(6); | 346 | cleanup_CREATE(6); |
| @@ -291,7 +359,7 @@ ctqueue * ctqueue_init(int nthreads) { | |||
| 291 | cleanup_MARK(); | 359 | cleanup_MARK(); |
| 292 | cleanup_CNDREGISTER(taskqueue_free, ctq->tq); | 360 | cleanup_CNDREGISTER(taskqueue_free, ctq->tq); |
| 293 | ); | 361 | ); |
| 294 | 362 | ||
| 295 | cleanup_CNDEXEC( | 363 | cleanup_CNDEXEC( |
| 296 | if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) | 364 | if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) |
| 297 | cleanup_MARK(); | 365 | cleanup_MARK(); |
| @@ -303,7 +371,7 @@ ctqueue * ctqueue_init(int nthreads) { | |||
| 303 | cleanup_MARK(); | 371 | cleanup_MARK(); |
| 304 | cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond); | 372 | cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond); |
| 305 | ); | 373 | ); |
| 306 | 374 | ||
| 307 | cleanup_CNDEXEC( | 375 | cleanup_CNDEXEC( |
| 308 | ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); | 376 | ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); |
| 309 | if(!ctq->thrdarr) | 377 | if(!ctq->thrdarr) |
| @@ -318,10 +386,16 @@ ctqueue * ctqueue_init(int nthreads) { | |||
| 318 | return ctq; | 386 | return ctq; |
| 319 | } | 387 | } |
| 320 | 388 | ||
| 389 | /** | ||
| 390 | * @brief Cancel all tasks being processed in a currently running concurrent taskqueue | ||
| 391 | * | ||
| 392 | * @param ctq The concurrent taskqueue to be canceled. Must be non-null | ||
| 393 | * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error | ||
| 394 | */ | ||
| 321 | int ctqueue_cancel(ctqueue *ctq) { | 395 | int ctqueue_cancel(ctqueue *ctq) { |
| 322 | if(!ctq) ERRRET(EINVAL, -1); | 396 | if(!ctq) ERRRET(EINVAL, -1); |
| 323 | 397 | ||
| 324 | __CTQ_INLOCK(ctq, 1, | 398 | __CTQ_INLOCK(ctq, 1, |
| 325 | ctq->canceled = 1; | 399 | ctq->canceled = 1; |
| 326 | ); | 400 | ); |
| 327 | cnd_broadcast(&ctq->cond); | 401 | cnd_broadcast(&ctq->cond); |
| @@ -329,6 +403,12 @@ int ctqueue_cancel(ctqueue *ctq) { | |||
| 329 | return 0; | 403 | return 0; |
| 330 | } | 404 | } |
| 331 | 405 | ||
| 406 | /** | ||
| 407 | * @brief Free a concurrent taskqueue | ||
| 408 | * @attention This cancels all currently running threads via `ctqueue_cancel` | ||
| 409 | * | ||
| 410 | * @param ctq The concurrent taskqueue to free. May be null | ||
| 411 | */ | ||
| 332 | void ctqueue_free(void *ctq) { | 412 | void ctqueue_free(void *ctq) { |
| 333 | if(!ctq) | 413 | if(!ctq) |
| 334 | return; | 414 | return; |
| @@ -351,11 +431,19 @@ void ctqueue_free(void *ctq) { | |||
| 351 | return; | 431 | return; |
| 352 | } | 432 | } |
| 353 | 433 | ||
| 434 | /** | ||
| 435 | * @brief Push a task onto a concurrent taskqueue | ||
| 436 | * @attention May block for an indefinite amount of time to push the task | ||
| 437 | * | ||
| 438 | * @param ctq The concurrent taskqueue to modify. Must be non-null | ||
| 439 | * @param tsk The task to push. Must be non-null | ||
| 440 | * @retval (int) Returns `thrd_success` on success, returns `thrd_error` or `thrd_nomem` on error | ||
| 441 | */ | ||
| 354 | int ctqueue_waitpush(ctqueue *ctq, task *tsk) { | 442 | int ctqueue_waitpush(ctqueue *ctq, task *tsk) { |
| 355 | if(!ctq || !tsk) ERRRET(EINVAL, -1); | 443 | if(!ctq || !tsk) ERRRET(EINVAL, -1); |
| 356 | int retval = 0; | 444 | int retval = 0; |
| 357 | 445 | ||
| 358 | __CTQ_INLOCK(ctq, -1, | 446 | __CTQ_INLOCK(ctq, -1, |
| 359 | retval = taskqueue_push(ctq->tq, tsk); | 447 | retval = taskqueue_push(ctq->tq, tsk); |
| 360 | ); | 448 | ); |
| 361 | if(retval == 0) | 449 | if(retval == 0) |
| @@ -364,11 +452,18 @@ int ctqueue_waitpush(ctqueue *ctq, task *tsk) { | |||
| 364 | return retval; | 452 | return retval; |
| 365 | } | 453 | } |
| 366 | 454 | ||
| 455 | /** | ||
| 456 | * @brief Pop a task from the concurrent taskqueue | ||
| 457 | * @attention May block for an indefinite amount of time to pop the task | ||
| 458 | * | ||
| 459 | * @param ctq The concurrent taskqueue to pop from. Must be non-null | ||
| 460 | * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error | ||
| 461 | */ | ||
| 367 | task * ctqueue_waitpop(ctqueue *ctq) { | 462 | task * ctqueue_waitpop(ctqueue *ctq) { |
| 368 | if(!ctq) ERRRET(EINVAL, NULL); | 463 | if(!ctq) ERRRET(EINVAL, NULL); |
| 369 | task *retval = NULL; | 464 | task *retval = NULL; |
| 370 | 465 | ||
| 371 | __CTQ_INLOCK(ctq, NULL, | 466 | __CTQ_INLOCK(ctq, NULL, |
| 372 | while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled) | 467 | while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled) |
| 373 | cnd_wait(&ctq->cond, &ctq->mutex); | 468 | cnd_wait(&ctq->cond, &ctq->mutex); |
| 374 | 469 | ||
| @@ -386,7 +481,7 @@ task * ctqueue_waitpop(ctqueue *ctq) { | |||
| 386 | //! Simple consumer for eating and executing tasks from the ctq | 481 | //! Simple consumer for eating and executing tasks from the ctq |
| 387 | static int __CTQ_CONSUMER(void *ctq) { | 482 | static int __CTQ_CONSUMER(void *ctq) { |
| 388 | if(!ctq) {errno = EINVAL; thrd_exit(-1);} | 483 | if(!ctq) {errno = EINVAL; thrd_exit(-1);} |
| 389 | ctqueue *real = (ctqueue *)ctq; | 484 | ctqueue *real = (ctqueue *)ctq; |
| 390 | 485 | ||
| 391 | for(task *ctask = NULL;;) { | 486 | for(task *ctask = NULL;;) { |
| 392 | ctask = ctqueue_waitpop(real); | 487 | ctask = ctqueue_waitpop(real); |
| @@ -399,14 +494,21 @@ static int __CTQ_CONSUMER(void *ctq) { | |||
| 399 | 494 | ||
| 400 | thrd_exit(1); | 495 | thrd_exit(1); |
| 401 | } | 496 | } |
| 402 | // TODO: Make this function return 0 or -1 depending on whether the overall ctq has been canceled or not. Canceling shouldn't | 497 | // TODO: Make this function return 0 or -1 depending on whether the overall ctq has been canceled or not. Canceling shouldn't |
| 403 | // be treated as an error | 498 | // be treated as an error |
| 404 | 499 | ||
| 500 | /** | ||
| 501 | * @brief Start the threads allocated to a concurrent taskqueue | ||
| 502 | * @attention Threads will not consume pushed tasks until this function is ran | ||
| 503 | * | ||
| 504 | * @param ctq A concurrent taskqueue to start. Must be non-null | ||
| 505 | * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error | ||
| 506 | */ | ||
| 405 | int ctqueue_start(ctqueue *ctq) { | 507 | int ctqueue_start(ctqueue *ctq) { |
| 406 | if(!ctq) ERRRET(EINVAL, -1); | 508 | if(!ctq) ERRRET(EINVAL, -1); |
| 407 | 509 | ||
| 408 | ctq->canceled = 0; | 510 | ctq->canceled = 0; |
| 409 | 511 | ||
| 410 | int retval = 0; | 512 | int retval = 0; |
| 411 | for(int i = 0; i < ctq->talen; i++) | 513 | for(int i = 0; i < ctq->talen; i++) |
| 412 | if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success) | 514 | if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success) |
| @@ -416,4 +518,6 @@ int ctqueue_start(ctqueue *ctq) { | |||
| 416 | ctqueue_cancel(ctq); | 518 | ctqueue_cancel(ctq); |
| 417 | 519 | ||
| 418 | return (retval == thrd_success) ? 0 : -1; | 520 | return (retval == thrd_success) ? 0 : -1; |
| 419 | } \ No newline at end of file | 521 | } |
| 522 | |||
| 523 | #endif \ No newline at end of file | ||
