diff options
| -rw-r--r-- | src/main.c | 32 | ||||
| -rw-r--r-- | src/shared.h | 2 | ||||
| -rw-r--r-- | src/threadpool.c | 314 | ||||
| -rw-r--r-- | src/threadpool.h | 24 |
4 files changed, 107 insertions, 265 deletions
| @@ -7,8 +7,9 @@ | |||
| 7 | 7 | ||
| 8 | #include <errno.h> | 8 | #include <errno.h> |
| 9 | #include <error.h> | 9 | #include <error.h> |
| 10 | #include <threads.h> | ||
| 11 | #include <unistd.h> | ||
| 10 | 12 | ||
| 11 | #include <stdio.h> | ||
| 12 | int testcb(void *data) { | 13 | int testcb(void *data) { |
| 13 | if(!data) | 14 | if(!data) |
| 14 | return -1; | 15 | return -1; |
| @@ -17,14 +18,31 @@ int testcb(void *data) { | |||
| 17 | return 0; | 18 | return 0; |
| 18 | } | 19 | } |
| 19 | 20 | ||
| 21 | int consumer(void *cq) { | ||
| 22 | if(!cq) | ||
| 23 | return -1; | ||
| 24 | |||
| 25 | cqueue *rcq = (cqueue*)cq; | ||
| 26 | for(task *tsk = NULL;;) { | ||
| 27 | tsk = cqueue_waitpop(rcq); | ||
| 28 | if(!tsk) | ||
| 29 | thrd_exit(-1); | ||
| 30 | |||
| 31 | task_fire(tsk); | ||
| 32 | } | ||
| 33 | |||
| 34 | return 0; | ||
| 35 | } | ||
| 36 | |||
| 20 | int main() { | 37 | int main() { |
| 21 | // error(1, ENOTSUP, "No main file lol"); | 38 | // error(1, ENOTSUP, "No main file lol"); |
| 22 | 39 | ||
| 23 | threadpool *tp = threadpool_init(2); | 40 | thrd_t thread; |
| 24 | task *tsk = task_init(testcb, "This is some data"); | 41 | cqueue *cq = cqueue_init(mtx_plain); |
| 25 | threadpool_addtask(tp, tsk); | 42 | thrd_create(&thread, consumer, cq); |
| 26 | threadpool_join(tp); | 43 | cqueue_addtask(cq, task_init(testcb, (void*)"This is some data")); |
| 27 | threadpool_free(tp); | 44 | sleep(10); |
| 45 | cqueue_free(cq); | ||
| 28 | 46 | ||
| 29 | return 0; | 47 | return 0; |
| 30 | } \ No newline at end of file | 48 | } \ No newline at end of file |
diff --git a/src/shared.h b/src/shared.h index 9e7eaa8..825814e 100644 --- a/src/shared.h +++ b/src/shared.h | |||
| @@ -4,6 +4,8 @@ | |||
| 4 | #include <stddef.h> | 4 | #include <stddef.h> |
| 5 | 5 | ||
| 6 | #define STATIC_ARRAY_LEN(arr) (sizeof((arr))/sizeof((arr)[0])) | 6 | #define STATIC_ARRAY_LEN(arr) (sizeof((arr))/sizeof((arr)[0])) |
| 7 | #define FALSE 0 | ||
| 8 | #define TRUE 1 | ||
| 7 | 9 | ||
| 8 | #define RETURNWERR(errval, retval) do {\ | 10 | #define RETURNWERR(errval, retval) do {\ |
| 9 | errno = (errval);\ | 11 | errno = (errval);\ |
diff --git a/src/threadpool.c b/src/threadpool.c index ab0733d..9d00030 100644 --- a/src/threadpool.c +++ b/src/threadpool.c | |||
| @@ -7,12 +7,6 @@ | |||
| 7 | #include <stdlib.h> | 7 | #include <stdlib.h> |
| 8 | #include <errno.h> | 8 | #include <errno.h> |
| 9 | 9 | ||
| 10 | /* Mutex: Lock a shared resource. Used to prevent race conditions when accessing / modifying some shared resource. A lock must | ||
| 11 | // always be followed by an unlock | ||
| 12 | |||
| 13 | // Semaphore: Send / wait on a signal; solves the consumer/producer problem. A function that sends should never wait, and a | ||
| 14 | // function that waits should never send */ | ||
| 15 | |||
| 16 | // Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) | 10 | // Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) |
| 17 | typedef struct mtxp { | 11 | typedef struct mtxp { |
| 18 | void *data; | 12 | void *data; |
| @@ -86,12 +80,18 @@ int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) | |||
| 86 | // Threadpool: | 80 | // Threadpool: |
| 87 | // Array of threads | 81 | // Array of threads |
| 88 | // Task Queue | 82 | // Task Queue |
| 89 | // Readiness semaphore | 83 | // Readiness semaphore / conditional |
| 84 | // Mutex | ||
| 90 | // Linked List of Tasks | 85 | // Linked List of Tasks |
| 91 | // Task: | 86 | // Task: |
| 92 | // int (*callback)(void*) | 87 | // int (*callback)(void*) |
| 93 | // void *arg | 88 | // void *arg |
| 94 | 89 | ||
| 90 | // Consumer: | ||
| 91 | // Wait for cqueue to pop | ||
| 92 | // Fire task | ||
| 93 | // Repeat | ||
| 94 | |||
| 95 | // Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 | 95 | // Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 |
| 96 | 96 | ||
| 97 | typedef struct task { | 97 | typedef struct task { |
| @@ -135,14 +135,28 @@ void task_free(task *ts) { | |||
| 135 | return; | 135 | return; |
| 136 | } | 136 | } |
| 137 | 137 | ||
| 138 | int task_fire(task *ts) { | ||
| 139 | if(!ts) | ||
| 140 | RETURNWERR(EINVAL, -1); | ||
| 138 | 141 | ||
| 139 | static void ___ucleanup_mtxd(void *mtx) { | 142 | return ts->cb(ts->arg); |
| 140 | if(!mtx) | 143 | } |
| 144 | |||
| 145 | |||
| 146 | /* Mutex: Lock a shared resource. Used to prevent race conditions when accessing / modifying some shared resource. A lock must | ||
| 147 | // always be followed by an unlock | ||
| 148 | |||
| 149 | // Semaphore: Send / wait on a signal; solves the consumer/producer problem. A function that sends should never wait, and a | ||
| 150 | // function that waits should never send */ | ||
| 151 | |||
| 152 | static void ___ucleanup_dfree(void *dll) { | ||
| 153 | if(!dll) | ||
| 141 | return; | 154 | return; |
| 142 | 155 | ||
| 143 | mtx_destroy((mtx_t *)mtx); | 156 | dlinkedlist_free((dlinkedlist *)dll); |
| 144 | return; | 157 | return; |
| 145 | } | 158 | } |
| 159 | |||
| 146 | static void ___ucleanup_cndd(void *cnd) { | 160 | static void ___ucleanup_cndd(void *cnd) { |
| 147 | if(!cnd) | 161 | if(!cnd) |
| 148 | return; | 162 | return; |
| @@ -150,15 +164,15 @@ static void ___ucleanup_cndd(void *cnd) { | |||
| 150 | cnd_destroy((cnd_t *)cnd); | 164 | cnd_destroy((cnd_t *)cnd); |
| 151 | return; | 165 | return; |
| 152 | } | 166 | } |
| 153 | static void ___ucleanup_dll(void *dll) { | 167 | |
| 154 | if(!dll) | 168 | static void ___ucleanup_mtxd(void *mtx) { |
| 169 | if(!mtx) | ||
| 155 | return; | 170 | return; |
| 156 | 171 | ||
| 157 | dlinkedlist_free((dlinkedlist *)dll); | 172 | mtx_destroy((mtx_t*)mtx); |
| 158 | return; | 173 | return; |
| 159 | } | 174 | } |
| 160 | 175 | ||
| 161 | |||
| 162 | cqueue * cqueue_init(int mtx_type) { | 176 | cqueue * cqueue_init(int mtx_type) { |
| 163 | cleanup_CREATE(10); | 177 | cleanup_CREATE(10); |
| 164 | 178 | ||
| @@ -167,284 +181,106 @@ cqueue * cqueue_init(int mtx_type) { | |||
| 167 | return NULL; | 181 | return NULL; |
| 168 | cleanup_REGISTER(free, cq); | 182 | cleanup_REGISTER(free, cq); |
| 169 | 183 | ||
| 170 | cq->mtx = VALLOC(1, sizeof(*(cq->mtx))); | 184 | cq->canceled = FALSE; |
| 171 | if(!(cq->mtx)) | 185 | cq->list = dlinkedlist_init(); |
| 186 | if(!cq->list) | ||
| 172 | cleanup_MARK(); | 187 | cleanup_MARK(); |
| 173 | cleanup_CNDREGISTER(free, cq->mtx); | 188 | cleanup_CNDREGISTER(___ucleanup_dfree, cq->list); |
| 174 | |||
| 175 | if(!cleanup_ERRORFLAGGED) | ||
| 176 | if(mtx_init(cq->mtx, mtx_type) != thrd_success) | ||
| 177 | cleanup_MARK(); | ||
| 178 | cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx); | ||
| 179 | 189 | ||
| 180 | if(!cleanup_ERRORFLAGGED) | 190 | if(!cleanup_ERRORFLAGGED) |
| 181 | if(!(cq->cnd = VALLOC(1, sizeof(*(cq->cnd))))) | 191 | if(!(cq->cnd = VALLOC(1, sizeof(*cq->cnd)))) |
| 182 | cleanup_MARK(); | 192 | cleanup_MARK(); |
| 183 | cleanup_CNDREGISTER(free, cq->cnd); | 193 | cleanup_CNDREGISTER(free, cq->cnd); |
| 184 | 194 | ||
| 185 | if(!cleanup_ERRORFLAGGED) | 195 | if(!cleanup_ERRORFLAGGED) |
| 186 | if(cnd_init(cq->cnd) != thrd_success) | 196 | if(cnd_init(cq->cnd) == thrd_error) |
| 187 | cleanup_MARK(); | 197 | cleanup_MARK(); |
| 188 | cleanup_CNDREGISTER(___ucleanup_cndd, cq->cnd); | 198 | cleanup_CNDREGISTER(___ucleanup_cndd, cq->cnd); |
| 189 | 199 | ||
| 190 | if(!cleanup_ERRORFLAGGED) | 200 | if(!cleanup_ERRORFLAGGED) |
| 191 | if(!(cq->list = dlinkedlist_init())) | 201 | if(!(cq->mtx = VALLOC(1, sizeof(*cq->mtx)))) |
| 192 | cleanup_MARK(); | 202 | cleanup_MARK(); |
| 193 | cleanup_CNDREGISTER(___ucleanup_dll, cq->list); | 203 | cleanup_CNDREGISTER(free, cq->mtx); |
| 194 | |||
| 195 | 204 | ||
| 205 | if(!cleanup_ERRORFLAGGED) | ||
| 206 | if(mtx_init(cq->mtx, mtx_type) != thrd_success) | ||
| 207 | cleanup_MARK(); | ||
| 208 | cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx); | ||
| 209 | |||
| 196 | if(cleanup_ERRORFLAGGED) | 210 | if(cleanup_ERRORFLAGGED) |
| 197 | cleanup_fire(&__CLEANUP); | 211 | cleanup_FIRE(); |
| 198 | |||
| 199 | // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions | ||
| 200 | // The implementation was not better lmao | ||
| 201 | 212 | ||
| 202 | cq->canceled = 0; | ||
| 203 | return cq; | 213 | return cq; |
| 204 | } | 214 | } |
| 205 | 215 | ||
| 206 | void cqueue_free(cqueue *cq) { | 216 | void cqueue_cancel(cqueue *cq) { |
| 207 | if(!cq) | 217 | if(!cq) |
| 208 | return; | 218 | return; |
| 209 | 219 | ||
| 210 | // Cancel any outstanding threads before freeing everything | ||
| 211 | cqueue_cancel(cq); | ||
| 212 | |||
| 213 | dlinkedlist_free(cq->list); | ||
| 214 | cnd_destroy(cq->cnd); | ||
| 215 | mtx_destroy(cq->mtx); | ||
| 216 | free(cq->cnd); | ||
| 217 | free(cq->mtx); | ||
| 218 | free(cq); | ||
| 219 | |||
| 220 | return; | ||
| 221 | } | ||
| 222 | |||
| 223 | int cqueue_append(cqueue * const cq, task *tsk) { | ||
| 224 | if(!cq || !tsk) | ||
| 225 | RETURNWERR(EINVAL, -1); | ||
| 226 | |||
| 227 | mtx_lock(cq->mtx); | 220 | mtx_lock(cq->mtx); |
| 228 | if(cq->canceled) { | 221 | if(cq->canceled) { |
| 229 | mtx_unlock(cq->mtx); | 222 | mtx_unlock(cq->mtx); |
| 230 | thrd_exit(thrd_timedout); | 223 | thrd_exit(-1); |
| 231 | } | 224 | } |
| 232 | 225 | ||
| 233 | dlinkedlist_append(cq->list, tsk, free); | 226 | cq->canceled++; |
| 234 | mtx_unlock(cq->mtx); | 227 | mtx_unlock(cq->mtx); |
| 235 | cnd_signal(cq->cnd); | 228 | cnd_broadcast(cq->cnd); |
| 236 | 229 | ||
| 237 | return 0; | 230 | return; |
| 238 | } | 231 | } |
| 239 | 232 | ||
| 240 | int cqueue_prepend(cqueue * const cq, task *tsk) { | 233 | void cqueue_free(cqueue *cq) { |
| 241 | if(!cq || !tsk) | 234 | if(!cq) |
| 242 | RETURNWERR(EINVAL, -1); | 235 | return; |
| 243 | |||
| 244 | mtx_lock(cq->mtx); | ||
| 245 | if(cq->canceled) { | ||
| 246 | mtx_unlock(cq->mtx); | ||
| 247 | thrd_exit(thrd_timedout); | ||
| 248 | } | ||
| 249 | 236 | ||
| 250 | dlinkedlist_prepend(cq->list, tsk, free); | 237 | cqueue_cancel(cq); |
| 251 | mtx_unlock(cq->mtx); | 238 | mtx_destroy(cq->mtx); |
| 252 | cnd_signal(cq->cnd); | 239 | cnd_destroy(cq->cnd); |
| 240 | free(cq->mtx); | ||
| 241 | free(cq->cnd); | ||
| 242 | dlinkedlist_free(cq->list); | ||
| 253 | 243 | ||
| 254 | return 0; | 244 | return; |
| 255 | } | 245 | } |
| 256 | 246 | ||
| 257 | int cqueue_insert(cqueue * const cq, task *tsk, int index) { | 247 | int cqueue_addtask(cqueue * const cq, task * const tsk) { |
| 258 | if(!cq || !tsk || index < 0) // Can't check to see if the index is too high without locking the mutex first | 248 | if(!cq || !tsk) |
| 259 | RETURNWERR(EINVAL, -1); | 249 | RETURNWERR(EINVAL, -1); |
| 260 | 250 | ||
| 261 | mtx_lock(cq->mtx); | 251 | mtx_lock(cq->mtx); |
| 252 | |||
| 253 | // TODO: Think about creating an "exception" via signal handling | ||
| 262 | if(cq->canceled) { | 254 | if(cq->canceled) { |
| 263 | mtx_unlock(cq->mtx); | 255 | mtx_unlock(cq->mtx); |
| 264 | thrd_exit(thrd_timedout); | 256 | thrd_exit(-1); |
| 265 | } | 257 | } |
| 266 | 258 | ||
| 267 | dlinkedlist_insert(cq->list, tsk, free, index); | 259 | dlinkedlist_prepend(cq->list, tsk, free); |
| 268 | mtx_unlock(cq->mtx); | 260 | mtx_unlock(cq->mtx); |
| 269 | cnd_signal(cq->cnd); | 261 | cnd_signal(cq->cnd); |
| 270 | 262 | ||
| 271 | return 0; | 263 | return 0; |
| 272 | } | 264 | } |
| 273 | 265 | ||
| 274 | int cqueue_size(cqueue const * const cq) { | 266 | task * cqueue_waitpop(cqueue * const cq) { |
| 275 | if(!cq) | 267 | if(!cq) |
| 276 | RETURNWERR(EINVAL, -1); | 268 | RETURNWERR(EINVAL, NULL); |
| 277 | |||
| 278 | mtx_lock(cq->mtx); | ||
| 279 | if(cq->canceled) { | ||
| 280 | mtx_unlock(cq->mtx); | ||
| 281 | thrd_exit(thrd_timedout); | ||
| 282 | } | ||
| 283 | |||
| 284 | int retval = dlinkedlist_size(cq->list); | ||
| 285 | mtx_unlock(cq->mtx); | ||
| 286 | |||
| 287 | return retval; | ||
| 288 | } | ||
| 289 | |||
| 290 | int cqueue_isempty(cqueue const * const cq) { | ||
| 291 | int val = cqueue_size(cq); | ||
| 292 | return (val < 0) ? -1 : (val == 0); | ||
| 293 | } | ||
| 294 | 269 | ||
| 295 | int cqueue_trypop(cqueue * const cq, task **ret) { | 270 | task *retval = NULL; |
| 296 | if(!cq || !ret || !*ret) | ||
| 297 | RETURNWERR(EINVAL, -1); | ||
| 298 | 271 | ||
| 299 | int retval = 0; | ||
| 300 | |||
| 301 | mtx_lock(cq->mtx); | 272 | mtx_lock(cq->mtx); |
| 302 | if(cq->canceled) { | 273 | while(dlinkedlist_isempty(cq->list) && !cq->canceled) |
| 303 | mtx_unlock(cq->mtx); | 274 | cnd_wait(cq->cnd, cq->mtx); |
| 304 | thrd_exit(thrd_timedout); | ||
| 305 | } | ||
| 306 | |||
| 307 | if(!dlinkedlist_isempty(cq->list)) { | ||
| 308 | *ret = (task*)dlinkedlist_poplast(cq->list); | ||
| 309 | retval = 1; | ||
| 310 | } | ||
| 311 | mtx_unlock(cq->mtx); | ||
| 312 | |||
| 313 | return retval; | ||
| 314 | } | ||
| 315 | |||
| 316 | int cqueue_waitpop(cqueue * const cq, task **ret) { | ||
| 317 | if(!cq || !ret) | ||
| 318 | RETURNWERR(EINVAL, -1); | ||
| 319 | 275 | ||
| 320 | mtx_lock(cq->mtx); | ||
| 321 | |||
| 322 | while(!dlinkedlist_isempty(cq->list) && !cq->canceled) | ||
| 323 | cnd_wait(cq->cnd, cq->mtx); // Unlocks mutex while waiting, acquires lock once waiting is done | ||
| 324 | |||
| 325 | if(cq->canceled) { | 276 | if(cq->canceled) { |
| 326 | mtx_unlock(cq->mtx); | 277 | mtx_unlock(cq->mtx); |
| 327 | thrd_exit(thrd_timedout); | 278 | thrd_exit(-1); |
| 328 | } | 279 | } |
| 329 | |||
| 330 | *ret = dlinkedlist_poplast(cq->list); | ||
| 331 | |||
| 332 | mtx_unlock(cq->mtx); | ||
| 333 | |||
| 334 | return 0; | ||
| 335 | } | ||
| 336 | 280 | ||
| 337 | int cqueue_cancel(cqueue * const cq) { | 281 | retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); |
| 338 | if(!cq) | 282 | dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); |
| 339 | RETURNWERR(EINVAL, -1); | ||
| 340 | |||
| 341 | int retval = 0; | ||
| 342 | |||
| 343 | mtx_lock(cq->mtx); | ||
| 344 | if(cq->canceled) | ||
| 345 | retval = -1; | ||
| 346 | else | ||
| 347 | cq->canceled++; | ||
| 348 | |||
| 349 | mtx_unlock(cq->mtx); | 283 | mtx_unlock(cq->mtx); |
| 350 | cnd_broadcast(cq->cnd); | ||
| 351 | |||
| 352 | return retval; | ||
| 353 | } | ||
| 354 | |||
| 355 | int cqueue_consumer(void *passed) { | ||
| 356 | if(!passed) | ||
| 357 | thrd_exit(thrd_error); | ||
| 358 | // Not setting errno because then I'd have to make a mutex for it | ||
| 359 | |||
| 360 | cqueue *cq = (cqueue *)passed; | ||
| 361 | |||
| 362 | for(task *current_task;;) { | ||
| 363 | cqueue_waitpop(cq, ¤t_task); | ||
| 364 | if(!current_task) | ||
| 365 | thrd_exit(thrd_error); | ||
| 366 | |||
| 367 | current_task->cb(current_task->arg); | ||
| 368 | } | ||
| 369 | |||
| 370 | thrd_exit(thrd_success); | ||
| 371 | } | ||
| 372 | |||
| 373 | static void ___ucleanup_cqfree(void *cq) { | ||
| 374 | if(!cq) | ||
| 375 | return; | ||
| 376 | |||
| 377 | cqueue_free(cq); | ||
| 378 | return; | ||
| 379 | } | ||
| 380 | |||
| 381 | threadpool * threadpool_init(int threads) { | ||
| 382 | if(threads < 1) | ||
| 383 | RETURNWERR(EINVAL, NULL); | ||
| 384 | cleanup_CREATE(10); | ||
| 385 | |||
| 386 | threadpool *tp = VALLOC(1, sizeof(*tp)); | ||
| 387 | if(!tp) | ||
| 388 | return NULL; | ||
| 389 | cleanup_REGISTER(free, tp); | ||
| 390 | |||
| 391 | tp->taskqueue = cqueue_init(mtx_plain); | ||
| 392 | if(!tp->taskqueue) | ||
| 393 | cleanup_MARK(); | ||
| 394 | cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue); | ||
| 395 | |||
| 396 | if(!cleanup_ERRORFLAGGED) | ||
| 397 | if(!(tp->threads = VALLOC(threads, sizeof(*tp->threads)))) | ||
| 398 | cleanup_MARK(); | ||
| 399 | cleanup_CNDREGISTER(free, tp->threads); | ||
| 400 | |||
| 401 | for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) { | ||
| 402 | tp->threads[i] = VALLOC(1, sizeof(**tp->threads)); | ||
| 403 | if(!tp->threads[i]) { | ||
| 404 | cleanup_MARK(); | ||
| 405 | for(int j = 0; j < i; j++) | ||
| 406 | free(tp->threads[j]); | ||
| 407 | } | ||
| 408 | |||
| 409 | if(!cleanup_ERRORFLAGGED) | ||
| 410 | thrd_create(tp->threads[i], cqueue_consumer, tp->taskqueue); | ||
| 411 | // TODO: Error Checking ^ | ||
| 412 | } | ||
| 413 | |||
| 414 | if(cleanup_ERRORFLAGGED) | ||
| 415 | cleanup_FIRE(); | ||
| 416 | else | ||
| 417 | tp->nthreads = threads; | ||
| 418 | |||
| 419 | return tp; | ||
| 420 | } | ||
| 421 | |||
| 422 | void threadpool_free(threadpool *tp) { | ||
| 423 | if(!tp) | ||
| 424 | return; | ||
| 425 | 284 | ||
| 426 | cqueue_free(tp->taskqueue); | 285 | return retval; |
| 427 | for(int i = 0; i < tp->nthreads; i++) | ||
| 428 | free(tp->threads[i]); | ||
| 429 | free(tp->threads); | ||
| 430 | free(tp); | ||
| 431 | |||
| 432 | return; | ||
| 433 | } | ||
| 434 | |||
| 435 | int threadpool_addtask(threadpool * const tp, task * const task) { | ||
| 436 | if(!tp || !task) | ||
| 437 | RETURNWERR(EINVAL, -1); | ||
| 438 | |||
| 439 | return cqueue_append(tp->taskqueue, task); | ||
| 440 | } | ||
| 441 | |||
| 442 | int threadpool_join(const threadpool * const tp) { | ||
| 443 | if(!tp) | ||
| 444 | RETURNWERR(EINVAL, -1); | ||
| 445 | |||
| 446 | for(int i = 0; i < tp->nthreads; i++) | ||
| 447 | thrd_join(*(tp->threads[i]), NULL); | ||
| 448 | |||
| 449 | return 0; | ||
| 450 | } \ No newline at end of file | 286 | } \ No newline at end of file |
diff --git a/src/threadpool.h b/src/threadpool.h index bc3ce06..bd1b787 100644 --- a/src/threadpool.h +++ b/src/threadpool.h | |||
| @@ -10,27 +10,13 @@ typedef struct tp threadpool; | |||
| 10 | 10 | ||
| 11 | task * task_init(task_callback cb, void *arg); | 11 | task * task_init(task_callback cb, void *arg); |
| 12 | void task_free(task *ts); | 12 | void task_free(task *ts); |
| 13 | int task_fire(task *ts); | ||
| 13 | 14 | ||
| 14 | cqueue * cqueue_init(int mtx_type); | 15 | cqueue * cqueue_init(int mtx_type); |
| 15 | void cqueue_free(cqueue *cq); | ||
| 16 | int cqueue_append(cqueue * const cq, task *tsk); | ||
| 17 | int cqueue_prepend(cqueue * const cq, task *tsk); | ||
| 18 | int cqueue_insert(cqueue * const cq, task *tsk, int index); | ||
| 19 | int cqueue_size(cqueue const * const cq); | ||
| 20 | int cqueue_isempty(cqueue const * const cq); | ||
| 21 | int cqueue_trypop(cqueue * const cq, task **ret); | ||
| 22 | int cqueue_waitpop(cqueue * const cq, task **ret); | ||
| 23 | int cqueue_cancel(cqueue * const cq); | ||
| 24 | |||
| 25 | threadpool * threadpool_init(int threads); | ||
| 26 | void threadpool_free(threadpool *tp); | ||
| 27 | int threadpool_addtask(threadpool * const tp, task * const task); | ||
| 28 | int threadpool_join(const threadpool * const tp); | ||
| 29 | 16 | ||
| 30 | typedef struct mtxp mtxpair; | 17 | void cqueue_cancel(cqueue *cq); |
| 31 | mtxpair * mtxpair_init(void * const data, int type); | 18 | void cqueue_free(cqueue *cq); |
| 32 | void mtxpair_free(mtxpair *mp); | 19 | int cqueue_addtask(cqueue * const cq, task * const tsk); |
| 33 | int mtxpair_setdata(mtxpair * const mp, void * const data); | 20 | task * cqueue_waitpop(cqueue * const cq); |
| 34 | int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd); | ||
| 35 | 21 | ||
| 36 | #endif \ No newline at end of file | 22 | #endif \ No newline at end of file |
