summaryrefslogtreecommitdiff
path: root/src/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/threadpool.c')
-rw-r--r--src/threadpool.c148
1 files changed, 126 insertions, 22 deletions
diff --git a/src/threadpool.c b/src/threadpool.c
index 02cd945..c266964 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -1,15 +1,18 @@
1/** 1/**
2 * @file threadpool.c 2 * @file threadpool.c
3 * @author syxhe (https://t.me/syxhe) 3 * @author syxhe (https://t.me/syxhe)
4 * @brief *Implementing `threadpool.h`* 4 * @brief An implementation of a threadpool using libc threads
5 * @version 0.1 5 * @version 0.1
6 * @date 2025-06-09 6 * @date 2025-06-09
7 * 7 *
8 * @copyright Copyright (c) 2025 8 * @copyright Copyright (c) 2025
9 * 9 *
10 */ 10 */
11 11
12#include "threadpool.h" 12#ifndef __VXGG_REWRITE___THREADPOOL_C___193271180830131___
13#define __VXGG_REWRITE___THREADPOOL_C___193271180830131___ 1
14
15#include "shared.c"
13 16
14#include <threads.h> 17#include <threads.h>
15#include <stdlib.h> 18#include <stdlib.h>
@@ -18,7 +21,7 @@
18 21
19/** 22/**
20 * @brief A generic task - A function, data for that function, and a way to free the data 23 * @brief A generic task - A function, data for that function, and a way to free the data
21 * 24 *
22 */ 25 */
23typedef struct task { 26typedef struct task {
24 gcallback callback; //!< A generic callback to be ran when executing the task 27 gcallback callback; //!< A generic callback to be ran when executing the task
@@ -28,7 +31,7 @@ typedef struct task {
28 31
29/** 32/**
30 * @brief An internal structure used for the `taskqueue`. Analogous to a doubly-linked list's internal node 33 * @brief An internal structure used for the `taskqueue`. Analogous to a doubly-linked list's internal node
31 * 34 *
32 */ 35 */
33typedef struct tqnode { 36typedef struct tqnode {
34 struct tqnode *next; //!< The next element in the `taskqueue` 37 struct tqnode *next; //!< The next element in the `taskqueue`
@@ -38,7 +41,7 @@ typedef struct tqnode {
38 41
39/** 42/**
40 * @brief A FIFO queue of tasks 43 * @brief A FIFO queue of tasks
41 * 44 *
42 */ 45 */
43typedef struct taskqueue { 46typedef struct taskqueue {
44 tqnode *start; //!< The first element of the queue 47 tqnode *start; //!< The first element of the queue
@@ -48,7 +51,7 @@ typedef struct taskqueue {
48 51
49/** 52/**
50 * @brief A `taskqueue` built for concurrent access. Essentially a threadpool 53 * @brief A `taskqueue` built for concurrent access. Essentially a threadpool
51 * 54 *
52 */ 55 */
53typedef struct ctqueue { 56typedef struct ctqueue {
54 mtx_t mutex; //!< A mutex for locking sensitive resources 57 mtx_t mutex; //!< A mutex for locking sensitive resources
@@ -60,7 +63,14 @@ typedef struct ctqueue {
60 int talen; //!< The length of the thread array 63 int talen; //!< The length of the thread array
61} ctqueue; 64} ctqueue;
62 65
63 66/**
67 * @brief Create a task
68 *
69 * @param callback Callback function the given data should be ran with. Must be non-null
70 * @param freecb Callback function for freeing the given data. May be null
71 * @param data Data to be passed to the callback. May be null
72 * @retval (task*)[NULL, task*] Returns a task object with set parameters. Returns `null` and sets errno on error
73 */
64task * task_init(gcallback callback, fcallback freecb, void *data) { 74task * task_init(gcallback callback, fcallback freecb, void *data) {
65 if(callback == NULL) ERRRET(EINVAL, NULL); 75 if(callback == NULL) ERRRET(EINVAL, NULL);
66 76
@@ -75,11 +85,16 @@ task * task_init(gcallback callback, fcallback freecb, void *data) {
75 return tsk; 85 return tsk;
76} 86}
77 87
88/**
89 * @brief Free a task
90 *
91 * @param tsk A task object to free. Frees data associated with task via `freecb` value specified in its creation. May be null
92 */
78void task_free(void *tsk) { 93void task_free(void *tsk) {
79 task *real = (task *)tsk; 94 task *real = (task *)tsk;
80 if(!real) 95 if(!real)
81 return; 96 return;
82 97
83 if(real->freecb != NULL) 98 if(real->freecb != NULL)
84 real->freecb(real->data); 99 real->freecb(real->data);
85 free(real); 100 free(real);
@@ -87,11 +102,23 @@ void task_free(void *tsk) {
87 return; 102 return;
88} 103}
89 104
105/**
106 * @brief Fire a task. Passes the `data` member to the specified `callback`
107 *
108 * @param tsk A task to be fired. Must be non-null
109 * @retval (int) Returns value of the fired callback. Returns -1 and sets errno on error
110 */
90int task_fire(task *tsk) { 111int task_fire(task *tsk) {
91 if(!tsk) ERRRET(EINVAL, -1); 112 if(!tsk) ERRRET(EINVAL, -1);
92 return tsk->callback(tsk->data); 113 return tsk->callback(tsk->data);
93} 114}
94 115
116/**
117 * @brief Fire and destroy a task simultaneously. Calls specified callback and free-callback on associated data
118 *
119 * @param tsk Task to be fired and destroyed. Must be non-null
120 * @retval (int) Returns value of the callback. Returns -1 and sets errno on error
121 */
95int task_fired(task *tsk) { 122int task_fired(task *tsk) {
96 int retval = task_fire(tsk); 123 int retval = task_fire(tsk);
97 if(errno == EINVAL && retval == -1) {return -1;} 124 if(errno == EINVAL && retval == -1) {return -1;}
@@ -125,7 +152,11 @@ void tqnode_free(void *tqn) {
125 152
126 153
127 154
128 155/**
156 * @brief Create a FIFO queue of tasks
157 *
158 * @retval (taskqueue*)[NULL, taskqueue*] Returns a new taskqueue object. Returns `null` and sets errno on error
159 */
129taskqueue * taskqueue_init(void) { 160taskqueue * taskqueue_init(void) {
130 taskqueue *tq = calloc(1, sizeof(*tq)); 161 taskqueue *tq = calloc(1, sizeof(*tq));
131 if(!tq) 162 if(!tq)
@@ -138,6 +169,11 @@ taskqueue * taskqueue_init(void) {
138 return tq; 169 return tq;
139} 170}
140 171
172/**
173 * @brief Free a taskqueue
174 *
175 * @param tq A taskqueue to be freed. May be null
176 */
141void taskqueue_free(void *tq) { 177void taskqueue_free(void *tq) {
142 if(!tq) 178 if(!tq)
143 return; 179 return;
@@ -148,7 +184,7 @@ void taskqueue_free(void *tq) {
148 p = n; 184 p = n;
149 } 185 }
150 free(tq); 186 free(tq);
151 187
152 return; 188 return;
153} 189}
154 190
@@ -167,6 +203,13 @@ int taskqueue_handlefirst(taskqueue *tq, task *tsk) {
167 return 1; 203 return 1;
168} 204}
169 205
206/**
207 * @brief Push a task onto a taskqueue
208 *
209 * @param tq The taskqueue to be modified. Must be non-null
210 * @param tsk The task to push. Must be non-null
211 * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error
212 */
170int taskqueue_push(taskqueue *tq, task *tsk) { 213int taskqueue_push(taskqueue *tq, task *tsk) {
171 if(!tq || !tsk) ERRRET(EINVAL, -1); 214 if(!tq || !tsk) ERRRET(EINVAL, -1);
172 215
@@ -184,12 +227,18 @@ int taskqueue_push(taskqueue *tq, task *tsk) {
184 return 0; 227 return 0;
185} 228}
186 229
230/**
231 * @brief Pop a task from a taskqueue
232 *
233 * @param tq A taskqueue to grab a task from. Must be non-null
234 * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error
235 */
187task * taskqueue_pop(taskqueue *tq) { 236task * taskqueue_pop(taskqueue *tq) {
188 if(!tq) ERRRET(EINVAL, NULL); 237 if(!tq) ERRRET(EINVAL, NULL);
189 if(tq->size <= 0) ERRRET(ENODATA, NULL); 238 if(tq->size <= 0) ERRRET(ENODATA, NULL);
190 239
191 tqnode *end = tq->end; 240 tqnode *end = tq->end;
192 task *ret = end->task; 241 task *ret = end->task;
193 242
194 if(tq->size == 1) { 243 if(tq->size == 1) {
195 tq->end = NULL; 244 tq->end = NULL;
@@ -204,6 +253,13 @@ task * taskqueue_pop(taskqueue *tq) {
204 return ret; 253 return ret;
205} 254}
206 255
256/**
257 * @brief Append a task to the front of a taskqueue
258 *
259 * @param tq The taskqueue to be modified. Must be non-null
260 * @param tsk The task to be appended. Must be non-null
261 * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error
262 */
207int taskqueue_pushfront(taskqueue *tq, task *tsk) { 263int taskqueue_pushfront(taskqueue *tq, task *tsk) {
208 if(!tq || !tsk) ERRRET(EINVAL, -1); 264 if(!tq || !tsk) ERRRET(EINVAL, -1);
209 265
@@ -221,6 +277,12 @@ int taskqueue_pushfront(taskqueue *tq, task *tsk) {
221 return 0; 277 return 0;
222} 278}
223 279
280/**
281 * @brief Pop a task from the back (most recently pushed task) of a taskqueue
282 *
283 * @param tq A taskqueue to pop from. Must be non-null
284 * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error
285 */
224task * taskqueue_popback(taskqueue *tq) { 286task * taskqueue_popback(taskqueue *tq) {
225 if(!tq) ERRRET(EINVAL, NULL); 287 if(!tq) ERRRET(EINVAL, NULL);
226 if(tq->size <= 0) ERRRET(ENODATA, NULL); 288 if(tq->size <= 0) ERRRET(ENODATA, NULL);
@@ -273,6 +335,12 @@ static void ___ucl_cnddestroy(void *cond) {
273 return; 335 return;
274} 336}
275 337
338/**
339 * @brief Create a concurrent taskqueue with `size` allocated threads
340 *
341 * @param size Number of threads in the threadpool. Must be greater than zero
342 * @retval (ctqueue*)[NULL, ctqueue*] Returns a new ctqueue, sets errno and returns `null` on error
343 */
276ctqueue * ctqueue_init(int nthreads) { 344ctqueue * ctqueue_init(int nthreads) {
277 if(nthreads <= 0) ERRRET(EINVAL, NULL); 345 if(nthreads <= 0) ERRRET(EINVAL, NULL);
278 cleanup_CREATE(6); 346 cleanup_CREATE(6);
@@ -291,7 +359,7 @@ ctqueue * ctqueue_init(int nthreads) {
291 cleanup_MARK(); 359 cleanup_MARK();
292 cleanup_CNDREGISTER(taskqueue_free, ctq->tq); 360 cleanup_CNDREGISTER(taskqueue_free, ctq->tq);
293 ); 361 );
294 362
295 cleanup_CNDEXEC( 363 cleanup_CNDEXEC(
296 if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) 364 if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success)
297 cleanup_MARK(); 365 cleanup_MARK();
@@ -303,7 +371,7 @@ ctqueue * ctqueue_init(int nthreads) {
303 cleanup_MARK(); 371 cleanup_MARK();
304 cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond); 372 cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond);
305 ); 373 );
306 374
307 cleanup_CNDEXEC( 375 cleanup_CNDEXEC(
308 ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); 376 ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t));
309 if(!ctq->thrdarr) 377 if(!ctq->thrdarr)
@@ -318,10 +386,16 @@ ctqueue * ctqueue_init(int nthreads) {
318 return ctq; 386 return ctq;
319} 387}
320 388
389/**
390 * @brief Cancel all tasks being processed in a currently running concurrent taskqueue
391 *
392 * @param ctq The concurrent taskqueue to be canceled. Must be non-null
393 * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error
394 */
321int ctqueue_cancel(ctqueue *ctq) { 395int ctqueue_cancel(ctqueue *ctq) {
322 if(!ctq) ERRRET(EINVAL, -1); 396 if(!ctq) ERRRET(EINVAL, -1);
323 397
324 __CTQ_INLOCK(ctq, 1, 398 __CTQ_INLOCK(ctq, 1,
325 ctq->canceled = 1; 399 ctq->canceled = 1;
326 ); 400 );
327 cnd_broadcast(&ctq->cond); 401 cnd_broadcast(&ctq->cond);
@@ -329,6 +403,12 @@ int ctqueue_cancel(ctqueue *ctq) {
329 return 0; 403 return 0;
330} 404}
331 405
406/**
407 * @brief Free a concurrent taskqueue
408 * @attention This cancels all currently running threads via `ctqueue_cancel`
409 *
410 * @param ctq The concurrent taskqueue to free. May be null
411 */
332void ctqueue_free(void *ctq) { 412void ctqueue_free(void *ctq) {
333 if(!ctq) 413 if(!ctq)
334 return; 414 return;
@@ -351,11 +431,19 @@ void ctqueue_free(void *ctq) {
351 return; 431 return;
352} 432}
353 433
434/**
435 * @brief Push a task onto a concurrent taskqueue
436 * @attention May block for an indefinite amount of time to push the task
437 *
438 * @param ctq The concurrent taskqueue to modify. Must be non-null
439 * @param tsk The task to push. Must be non-null
440 * @retval (int) Returns `thrd_success` on success, returns `thrd_error` or `thrd_nomem` on error
441 */
354int ctqueue_waitpush(ctqueue *ctq, task *tsk) { 442int ctqueue_waitpush(ctqueue *ctq, task *tsk) {
355 if(!ctq || !tsk) ERRRET(EINVAL, -1); 443 if(!ctq || !tsk) ERRRET(EINVAL, -1);
356 int retval = 0; 444 int retval = 0;
357 445
358 __CTQ_INLOCK(ctq, -1, 446 __CTQ_INLOCK(ctq, -1,
359 retval = taskqueue_push(ctq->tq, tsk); 447 retval = taskqueue_push(ctq->tq, tsk);
360 ); 448 );
361 if(retval == 0) 449 if(retval == 0)
@@ -364,11 +452,18 @@ int ctqueue_waitpush(ctqueue *ctq, task *tsk) {
364 return retval; 452 return retval;
365} 453}
366 454
455/**
456 * @brief Pop a task from the concurrent taskqueue
457 * @attention May block for an indefinite amount of time to pop the task
458 *
459 * @param ctq The concurrent taskqueue to pop from. Must be non-null
460 * @retval (task*)[NULL, task*] Returns a task on success, sets errno and returns `null` on error
461 */
367task * ctqueue_waitpop(ctqueue *ctq) { 462task * ctqueue_waitpop(ctqueue *ctq) {
368 if(!ctq) ERRRET(EINVAL, NULL); 463 if(!ctq) ERRRET(EINVAL, NULL);
369 task *retval = NULL; 464 task *retval = NULL;
370 465
371 __CTQ_INLOCK(ctq, NULL, 466 __CTQ_INLOCK(ctq, NULL,
372 while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled) 467 while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled)
373 cnd_wait(&ctq->cond, &ctq->mutex); 468 cnd_wait(&ctq->cond, &ctq->mutex);
374 469
@@ -386,7 +481,7 @@ task * ctqueue_waitpop(ctqueue *ctq) {
386//! Simple consumer for eating and executing tasks from the ctq 481//! Simple consumer for eating and executing tasks from the ctq
387static int __CTQ_CONSUMER(void *ctq) { 482static int __CTQ_CONSUMER(void *ctq) {
388 if(!ctq) {errno = EINVAL; thrd_exit(-1);} 483 if(!ctq) {errno = EINVAL; thrd_exit(-1);}
389 ctqueue *real = (ctqueue *)ctq; 484 ctqueue *real = (ctqueue *)ctq;
390 485
391 for(task *ctask = NULL;;) { 486 for(task *ctask = NULL;;) {
392 ctask = ctqueue_waitpop(real); 487 ctask = ctqueue_waitpop(real);
@@ -399,14 +494,21 @@ static int __CTQ_CONSUMER(void *ctq) {
399 494
400 thrd_exit(1); 495 thrd_exit(1);
401} 496}
402// TODO: Make this function return 0 or -1 depending on whether the overall ctq has been canceled or not. Canceling shouldn't 497// TODO: Make this function return 0 or -1 depending on whether the overall ctq has been canceled or not. Canceling shouldn't
403// be treated as an error 498// be treated as an error
404 499
500/**
501 * @brief Start the threads allocated to a concurrent taskqueue
502 * @attention Threads will not consume pushed tasks until this function is ran
503 *
504 * @param ctq A concurrent taskqueue to start. Must be non-null
505 * @retval (int)[-1, 0] Returns 0 on success, sets errno and returns -1 on error
506 */
405int ctqueue_start(ctqueue *ctq) { 507int ctqueue_start(ctqueue *ctq) {
406 if(!ctq) ERRRET(EINVAL, -1); 508 if(!ctq) ERRRET(EINVAL, -1);
407 509
408 ctq->canceled = 0; 510 ctq->canceled = 0;
409 511
410 int retval = 0; 512 int retval = 0;
411 for(int i = 0; i < ctq->talen; i++) 513 for(int i = 0; i < ctq->talen; i++)
412 if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success) 514 if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success)
@@ -416,4 +518,6 @@ int ctqueue_start(ctqueue *ctq) {
416 ctqueue_cancel(ctq); 518 ctqueue_cancel(ctq);
417 519
418 return (retval == thrd_success) ? 0 : -1; 520 return (retval == thrd_success) ? 0 : -1;
419} \ No newline at end of file 521}
522
523#endif \ No newline at end of file