diff options
Diffstat (limited to 'src/threadpool.c')
| -rw-r--r-- | src/threadpool.c | 120 |
1 files changed, 66 insertions, 54 deletions
diff --git a/src/threadpool.c b/src/threadpool.c index 31c300c..ab0733d 100644 --- a/src/threadpool.c +++ b/src/threadpool.c | |||
| @@ -1,5 +1,4 @@ | |||
| 1 | #include "threadpool.h" | 1 | #include "threadpool.h" |
| 2 | #include "arena.h" | ||
| 3 | #include "shared.h" | 2 | #include "shared.h" |
| 4 | 3 | ||
| 5 | #include "ll.h" | 4 | #include "ll.h" |
| @@ -102,8 +101,8 @@ typedef struct task { | |||
| 102 | 101 | ||
| 103 | typedef struct cq { | 102 | typedef struct cq { |
| 104 | dlinkedlist *list; | 103 | dlinkedlist *list; |
| 105 | mtx_t *mutex; | 104 | mtx_t *mtx; |
| 106 | cnd_t *conditional; | 105 | cnd_t *cnd; |
| 107 | unsigned char canceled; | 106 | unsigned char canceled; |
| 108 | } cqueue; | 107 | } cqueue; |
| 109 | 108 | ||
| @@ -168,32 +167,37 @@ cqueue * cqueue_init(int mtx_type) { | |||
| 168 | return NULL; | 167 | return NULL; |
| 169 | cleanup_REGISTER(free, cq); | 168 | cleanup_REGISTER(free, cq); |
| 170 | 169 | ||
| 171 | cq->mutex = VALLOC(1, sizeof(*(cq->mutex))); | 170 | cq->mtx = VALLOC(1, sizeof(*(cq->mtx))); |
| 172 | if(!(cq->mutex)) | 171 | if(!(cq->mtx)) |
| 173 | cleanup_MARK(); | 172 | cleanup_MARK(); |
| 174 | cleanup_CNDREGISTER(free, cq->mutex); | 173 | cleanup_CNDREGISTER(free, cq->mtx); |
| 175 | 174 | ||
| 176 | if(cleanup_ERRORFLAGGED && mtx_init(cq->mutex, mtx_type) != thrd_success) | 175 | if(!cleanup_ERRORFLAGGED) |
| 177 | cleanup_MARK(); | 176 | if(mtx_init(cq->mtx, mtx_type) != thrd_success) |
| 178 | cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mutex); | 177 | cleanup_MARK(); |
| 178 | cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx); | ||
| 179 | 179 | ||
| 180 | if(cleanup_ERRORFLAGGED && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional))))) | 180 | if(!cleanup_ERRORFLAGGED) |
| 181 | cleanup_MARK(); | 181 | if(!(cq->cnd = VALLOC(1, sizeof(*(cq->cnd))))) |
| 182 | cleanup_CNDREGISTER(free, cq->conditional); | 182 | cleanup_MARK(); |
| 183 | cleanup_CNDREGISTER(free, cq->cnd); | ||
| 183 | 184 | ||
| 184 | if(cleanup_ERRORFLAGGED && cnd_init(cq->conditional) != thrd_success) | 185 | if(!cleanup_ERRORFLAGGED) |
| 185 | cleanup_MARK(); | 186 | if(cnd_init(cq->cnd) != thrd_success) |
| 186 | cleanup_CNDREGISTER(___ucleanup_cndd, cq->conditional); | 187 | cleanup_MARK(); |
| 188 | cleanup_CNDREGISTER(___ucleanup_cndd, cq->cnd); | ||
| 187 | 189 | ||
| 188 | cq->list = dlinkedlist_init(); | 190 | if(!cleanup_ERRORFLAGGED) |
| 189 | if(cleanup_ERRORFLAGGED && !cq->list) | 191 | if(!(cq->list = dlinkedlist_init())) |
| 190 | cleanup_MARK(); | 192 | cleanup_MARK(); |
| 191 | cleanup_CNDREGISTER(___ucleanup_dll, cq->list); | 193 | cleanup_CNDREGISTER(___ucleanup_dll, cq->list); |
| 192 | 194 | ||
| 195 | |||
| 193 | if(cleanup_ERRORFLAGGED) | 196 | if(cleanup_ERRORFLAGGED) |
| 194 | cleanup_fire(&__CLEANUP); | 197 | cleanup_fire(&__CLEANUP); |
| 195 | 198 | ||
| 196 | // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions | 199 | // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions |
| 200 | // The implementation was not better lmao | ||
| 197 | 201 | ||
| 198 | cq->canceled = 0; | 202 | cq->canceled = 0; |
| 199 | return cq; | 203 | return cq; |
| @@ -207,10 +211,10 @@ void cqueue_free(cqueue *cq) { | |||
| 207 | cqueue_cancel(cq); | 211 | cqueue_cancel(cq); |
| 208 | 212 | ||
| 209 | dlinkedlist_free(cq->list); | 213 | dlinkedlist_free(cq->list); |
| 210 | cnd_destroy(cq->conditional); | 214 | cnd_destroy(cq->cnd); |
| 211 | mtx_destroy(cq->mutex); | 215 | mtx_destroy(cq->mtx); |
| 212 | free(cq->conditional); | 216 | free(cq->cnd); |
| 213 | free(cq->mutex); | 217 | free(cq->mtx); |
| 214 | free(cq); | 218 | free(cq); |
| 215 | 219 | ||
| 216 | return; | 220 | return; |
| @@ -220,15 +224,15 @@ int cqueue_append(cqueue * const cq, task *tsk) { | |||
| 220 | if(!cq || !tsk) | 224 | if(!cq || !tsk) |
| 221 | RETURNWERR(EINVAL, -1); | 225 | RETURNWERR(EINVAL, -1); |
| 222 | 226 | ||
| 223 | mtx_lock(cq->mutex); | 227 | mtx_lock(cq->mtx); |
| 224 | if(cq->canceled) { | 228 | if(cq->canceled) { |
| 225 | mtx_unlock(cq->mutex); | 229 | mtx_unlock(cq->mtx); |
| 226 | thrd_exit(thrd_timedout); | 230 | thrd_exit(thrd_timedout); |
| 227 | } | 231 | } |
| 228 | 232 | ||
| 229 | dlinkedlist_append(cq->list, tsk, free); | 233 | dlinkedlist_append(cq->list, tsk, free); |
| 230 | mtx_unlock(cq->mutex); | 234 | mtx_unlock(cq->mtx); |
| 231 | cnd_signal(cq->conditional); | 235 | cnd_signal(cq->cnd); |
| 232 | 236 | ||
| 233 | return 0; | 237 | return 0; |
| 234 | } | 238 | } |
| @@ -237,15 +241,15 @@ int cqueue_prepend(cqueue * const cq, task *tsk) { | |||
| 237 | if(!cq || !tsk) | 241 | if(!cq || !tsk) |
| 238 | RETURNWERR(EINVAL, -1); | 242 | RETURNWERR(EINVAL, -1); |
| 239 | 243 | ||
| 240 | mtx_lock(cq->mutex); | 244 | mtx_lock(cq->mtx); |
| 241 | if(cq->canceled) { | 245 | if(cq->canceled) { |
| 242 | mtx_unlock(cq->mutex); | 246 | mtx_unlock(cq->mtx); |
| 243 | thrd_exit(thrd_timedout); | 247 | thrd_exit(thrd_timedout); |
| 244 | } | 248 | } |
| 245 | 249 | ||
| 246 | dlinkedlist_prepend(cq->list, tsk, free); | 250 | dlinkedlist_prepend(cq->list, tsk, free); |
| 247 | mtx_unlock(cq->mutex); | 251 | mtx_unlock(cq->mtx); |
| 248 | cnd_signal(cq->conditional); | 252 | cnd_signal(cq->cnd); |
| 249 | 253 | ||
| 250 | return 0; | 254 | return 0; |
| 251 | } | 255 | } |
| @@ -254,15 +258,15 @@ int cqueue_insert(cqueue * const cq, task *tsk, int index) { | |||
| 254 | if(!cq || !tsk || index < 0) // Can't check to see if the index is too high without locking the mutex first | 258 | if(!cq || !tsk || index < 0) // Can't check to see if the index is too high without locking the mutex first |
| 255 | RETURNWERR(EINVAL, -1); | 259 | RETURNWERR(EINVAL, -1); |
| 256 | 260 | ||
| 257 | mtx_lock(cq->mutex); | 261 | mtx_lock(cq->mtx); |
| 258 | if(cq->canceled) { | 262 | if(cq->canceled) { |
| 259 | mtx_unlock(cq->mutex); | 263 | mtx_unlock(cq->mtx); |
| 260 | thrd_exit(thrd_timedout); | 264 | thrd_exit(thrd_timedout); |
| 261 | } | 265 | } |
| 262 | 266 | ||
| 263 | dlinkedlist_insert(cq->list, tsk, free, index); | 267 | dlinkedlist_insert(cq->list, tsk, free, index); |
| 264 | mtx_unlock(cq->mutex); | 268 | mtx_unlock(cq->mtx); |
| 265 | cnd_signal(cq->conditional); | 269 | cnd_signal(cq->cnd); |
| 266 | 270 | ||
| 267 | return 0; | 271 | return 0; |
| 268 | } | 272 | } |
| @@ -271,14 +275,14 @@ int cqueue_size(cqueue const * const cq) { | |||
| 271 | if(!cq) | 275 | if(!cq) |
| 272 | RETURNWERR(EINVAL, -1); | 276 | RETURNWERR(EINVAL, -1); |
| 273 | 277 | ||
| 274 | mtx_lock(cq->mutex); | 278 | mtx_lock(cq->mtx); |
| 275 | if(cq->canceled) { | 279 | if(cq->canceled) { |
| 276 | mtx_unlock(cq->mutex); | 280 | mtx_unlock(cq->mtx); |
| 277 | thrd_exit(thrd_timedout); | 281 | thrd_exit(thrd_timedout); |
| 278 | } | 282 | } |
| 279 | 283 | ||
| 280 | int retval = dlinkedlist_size(cq->list); | 284 | int retval = dlinkedlist_size(cq->list); |
| 281 | mtx_unlock(cq->mutex); | 285 | mtx_unlock(cq->mtx); |
| 282 | 286 | ||
| 283 | return retval; | 287 | return retval; |
| 284 | } | 288 | } |
| @@ -294,9 +298,9 @@ int cqueue_trypop(cqueue * const cq, task **ret) { | |||
| 294 | 298 | ||
| 295 | int retval = 0; | 299 | int retval = 0; |
| 296 | 300 | ||
| 297 | mtx_lock(cq->mutex); | 301 | mtx_lock(cq->mtx); |
| 298 | if(cq->canceled) { | 302 | if(cq->canceled) { |
| 299 | mtx_unlock(cq->mutex); | 303 | mtx_unlock(cq->mtx); |
| 300 | thrd_exit(thrd_timedout); | 304 | thrd_exit(thrd_timedout); |
| 301 | } | 305 | } |
| 302 | 306 | ||
| @@ -304,7 +308,7 @@ int cqueue_trypop(cqueue * const cq, task **ret) { | |||
| 304 | *ret = (task*)dlinkedlist_poplast(cq->list); | 308 | *ret = (task*)dlinkedlist_poplast(cq->list); |
| 305 | retval = 1; | 309 | retval = 1; |
| 306 | } | 310 | } |
| 307 | mtx_unlock(cq->mutex); | 311 | mtx_unlock(cq->mtx); |
| 308 | 312 | ||
| 309 | return retval; | 313 | return retval; |
| 310 | } | 314 | } |
| @@ -313,19 +317,19 @@ int cqueue_waitpop(cqueue * const cq, task **ret) { | |||
| 313 | if(!cq || !ret) | 317 | if(!cq || !ret) |
| 314 | RETURNWERR(EINVAL, -1); | 318 | RETURNWERR(EINVAL, -1); |
| 315 | 319 | ||
| 316 | mtx_lock(cq->mutex); | 320 | mtx_lock(cq->mtx); |
| 317 | 321 | ||
| 318 | while(!dlinkedlist_isempty(cq->list) && !cq->canceled) | 322 | while(!dlinkedlist_isempty(cq->list) && !cq->canceled) |
| 319 | cnd_wait(cq->conditional, cq->mutex); // Unlocks mutex while waiting, acquires lock once waiting is done | 323 | cnd_wait(cq->cnd, cq->mtx); // Unlocks mutex while waiting, acquires lock once waiting is done |
| 320 | 324 | ||
| 321 | if(cq->canceled) { | 325 | if(cq->canceled) { |
| 322 | mtx_unlock(cq->mutex); | 326 | mtx_unlock(cq->mtx); |
| 323 | thrd_exit(thrd_timedout); | 327 | thrd_exit(thrd_timedout); |
| 324 | } | 328 | } |
| 325 | 329 | ||
| 326 | *ret = dlinkedlist_poplast(cq->list); | 330 | *ret = dlinkedlist_poplast(cq->list); |
| 327 | 331 | ||
| 328 | mtx_unlock(cq->mutex); | 332 | mtx_unlock(cq->mtx); |
| 329 | 333 | ||
| 330 | return 0; | 334 | return 0; |
| 331 | } | 335 | } |
| @@ -336,14 +340,14 @@ int cqueue_cancel(cqueue * const cq) { | |||
| 336 | 340 | ||
| 337 | int retval = 0; | 341 | int retval = 0; |
| 338 | 342 | ||
| 339 | mtx_lock(cq->mutex); | 343 | mtx_lock(cq->mtx); |
| 340 | if(cq->canceled) | 344 | if(cq->canceled) |
| 341 | retval = -1; | 345 | retval = -1; |
| 342 | else | 346 | else |
| 343 | cq->canceled++; | 347 | cq->canceled++; |
| 344 | 348 | ||
| 345 | mtx_unlock(cq->mutex); | 349 | mtx_unlock(cq->mtx); |
| 346 | cnd_broadcast(cq->conditional); | 350 | cnd_broadcast(cq->cnd); |
| 347 | 351 | ||
| 348 | return retval; | 352 | return retval; |
| 349 | } | 353 | } |
| @@ -389,9 +393,9 @@ threadpool * threadpool_init(int threads) { | |||
| 389 | cleanup_MARK(); | 393 | cleanup_MARK(); |
| 390 | cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue); | 394 | cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue); |
| 391 | 395 | ||
| 392 | tp->threads = VALLOC(threads, sizeof(*tp->threads)); | 396 | if(!cleanup_ERRORFLAGGED) |
| 393 | if(!tp->threads) | 397 | if(!(tp->threads = VALLOC(threads, sizeof(*tp->threads)))) |
| 394 | cleanup_MARK(); | 398 | cleanup_MARK(); |
| 395 | cleanup_CNDREGISTER(free, tp->threads); | 399 | cleanup_CNDREGISTER(free, tp->threads); |
| 396 | 400 | ||
| 397 | for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) { | 401 | for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) { |
| @@ -420,13 +424,11 @@ void threadpool_free(threadpool *tp) { | |||
| 420 | return; | 424 | return; |
| 421 | 425 | ||
| 422 | cqueue_free(tp->taskqueue); | 426 | cqueue_free(tp->taskqueue); |
| 423 | for(int i = 0; i < tp->nthreads; i++) { | 427 | for(int i = 0; i < tp->nthreads; i++) |
| 424 | thrd_detach(*tp->threads[i]); | ||
| 425 | free(tp->threads[i]); | 428 | free(tp->threads[i]); |
| 426 | } | ||
| 427 | free(tp->threads); | 429 | free(tp->threads); |
| 428 | free(tp); | 430 | free(tp); |
| 429 | 431 | ||
| 430 | return; | 432 | return; |
| 431 | } | 433 | } |
| 432 | 434 | ||
| @@ -435,4 +437,14 @@ int threadpool_addtask(threadpool * const tp, task * const task) { | |||
| 435 | RETURNWERR(EINVAL, -1); | 437 | RETURNWERR(EINVAL, -1); |
| 436 | 438 | ||
| 437 | return cqueue_append(tp->taskqueue, task); | 439 | return cqueue_append(tp->taskqueue, task); |
| 440 | } | ||
| 441 | |||
| 442 | int threadpool_join(const threadpool * const tp) { | ||
| 443 | if(!tp) | ||
| 444 | RETURNWERR(EINVAL, -1); | ||
| 445 | |||
| 446 | for(int i = 0; i < tp->nthreads; i++) | ||
| 447 | thrd_join(*(tp->threads[i]), NULL); | ||
| 448 | |||
| 449 | return 0; | ||
| 438 | } \ No newline at end of file | 450 | } \ No newline at end of file |
