diff options
| author | @syxhe <https://t.me/syxhe> | 2025-04-22 16:35:44 -0500 |
|---|---|---|
| committer | @syxhe <https://t.me/syxhe> | 2025-04-22 16:35:44 -0500 |
| commit | 0ee5044805c8d157d5023fc1322f980e3a480df7 (patch) | |
| tree | 0cbdcbb5286a3c776c12d490afb1e24c40771294 /src/threadpool.c | |
| parent | c704a8a382c231e066a7fbf0402a33455c40b8f5 (diff) | |
Start work on threadpool implementation
Diffstat (limited to 'src/threadpool.c')
| -rw-r--r-- | src/threadpool.c | 160 |
1 files changed, 139 insertions, 21 deletions
diff --git a/src/threadpool.c b/src/threadpool.c index 56dcd6b..66c0d06 100644 --- a/src/threadpool.c +++ b/src/threadpool.c | |||
| @@ -21,12 +21,12 @@ typedef struct mtxp { | |||
| 21 | } mtxpair; | 21 | } mtxpair; |
| 22 | 22 | ||
| 23 | mtxpair * mtxpair_init(void * const data, int type) { | 23 | mtxpair * mtxpair_init(void * const data, int type) { |
| 24 | mtxpair *mtxp = malloc(1 * sizeof(*mtxp)); | 24 | mtxpair *mtxp = VALLOC(1, sizeof(*mtxp)); |
| 25 | if(!mtxp) | 25 | if(!mtxp) |
| 26 | return NULL; | 26 | return NULL; |
| 27 | 27 | ||
| 28 | // Make room for the mutex | 28 | // Make room for the mutex |
| 29 | mtxp->mtx = malloc(1 * sizeof(*mtxp->mtx)); | 29 | mtxp->mtx = VALLOC(1, sizeof(*mtxp->mtx)); |
| 30 | if(!mtxp->mtx) { | 30 | if(!mtxp->mtx) { |
| 31 | free(mtxp); | 31 | free(mtxp); |
| 32 | return NULL; | 32 | return NULL; |
| @@ -95,7 +95,6 @@ int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) | |||
| 95 | 95 | ||
| 96 | // Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 | 96 | // Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 |
| 97 | 97 | ||
| 98 | typedef int (*task_callback)(void*); | ||
| 99 | typedef struct task { | 98 | typedef struct task { |
| 100 | task_callback cb; | 99 | task_callback cb; |
| 101 | void *arg; | 100 | void *arg; |
| @@ -105,6 +104,7 @@ typedef struct cq { | |||
| 105 | dlinkedlist *list; | 104 | dlinkedlist *list; |
| 106 | mtx_t *mutex; | 105 | mtx_t *mutex; |
| 107 | cnd_t *conditional; | 106 | cnd_t *conditional; |
| 107 | unsigned char canceled; | ||
| 108 | } cqueue; | 108 | } cqueue; |
| 109 | 109 | ||
| 110 | typedef struct tp { | 110 | typedef struct tp { |
| @@ -114,6 +114,7 @@ typedef struct tp { | |||
| 114 | cqueue *taskqueue; | 114 | cqueue *taskqueue; |
| 115 | } threadpool; | 115 | } threadpool; |
| 116 | 116 | ||
| 117 | |||
| 117 | task * task_init(task_callback cb, void *arg) { | 118 | task * task_init(task_callback cb, void *arg) { |
| 118 | if(cb == NULL) | 119 | if(cb == NULL) |
| 119 | RETURNWERR(EINVAL, NULL); | 120 | RETURNWERR(EINVAL, NULL); |
| @@ -160,7 +161,6 @@ static void ___ucleanup_dll(void *dll) { | |||
| 160 | 161 | ||
| 161 | 162 | ||
| 162 | cqueue * cqueue_init(int mtx_type) { | 163 | cqueue * cqueue_init(int mtx_type) { |
| 163 | unsigned char flag = 0; | ||
| 164 | cleanup_CREATE(10); | 164 | cleanup_CREATE(10); |
| 165 | 165 | ||
| 166 | cqueue *cq = VALLOC(1, sizeof(*cq)); | 166 | cqueue *cq = VALLOC(1, sizeof(*cq)); |
| @@ -170,31 +170,32 @@ cqueue * cqueue_init(int mtx_type) { | |||
| 170 | 170 | ||
| 171 | cq->mutex = VALLOC(1, sizeof(*(cq->mutex))); | 171 | cq->mutex = VALLOC(1, sizeof(*(cq->mutex))); |
| 172 | if(!(cq->mutex)) | 172 | if(!(cq->mutex)) |
| 173 | flag++; | 173 | cleanup_MARK(); |
| 174 | cleanup_CNDREGISTER(flag, free, cq->mutex); | 174 | cleanup_CNDREGISTER(free, cq->mutex); |
| 175 | 175 | ||
| 176 | if(!flag && mtx_init(cq->mutex, mtx_type) != thrd_success) | 176 | if(cleanup_ERRORFLAGGED && mtx_init(cq->mutex, mtx_type) != thrd_success) |
| 177 | flag++; | 177 | cleanup_MARK(); |
| 178 | cleanup_CNDREGISTER(flag, ___ucleanup_mtxd, cq->mutex); | 178 | cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mutex); |
| 179 | 179 | ||
| 180 | if(!flag && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional))))) | 180 | if(cleanup_ERRORFLAGGED && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional))))) |
| 181 | flag++; | 181 | cleanup_MARK(); |
| 182 | cleanup_CNDREGISTER(flag, free, cq->conditional); | 182 | cleanup_CNDREGISTER(free, cq->conditional); |
| 183 | 183 | ||
| 184 | if(!flag && cnd_init(cq->conditional) != thrd_success) | 184 | if(cleanup_ERRORFLAGGED && cnd_init(cq->conditional) != thrd_success) |
| 185 | flag++; | 185 | cleanup_MARK(); |
| 186 | cleanup_CNDREGISTER(flag, ___ucleanup_cndd, cq->conditional); | 186 | cleanup_CNDREGISTER(___ucleanup_cndd, cq->conditional); |
| 187 | 187 | ||
| 188 | cq->list = dlinkedlist_init(); | 188 | cq->list = dlinkedlist_init(); |
| 189 | if(!flag && !cq->list) | 189 | if(cleanup_ERRORFLAGGED && !cq->list) |
| 190 | flag++; | 190 | cleanup_MARK(); |
| 191 | cleanup_CNDREGISTER(flag, ___ucleanup_dll, cq->list); | 191 | cleanup_CNDREGISTER(___ucleanup_dll, cq->list); |
| 192 | 192 | ||
| 193 | if(flag) | 193 | if(cleanup_ERRORFLAGGED) |
| 194 | cleanup_fire(&__CLEANUP); | 194 | cleanup_fire(&__CLEANUP); |
| 195 | 195 | ||
| 196 | // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions | 196 | // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions |
| 197 | 197 | ||
| 198 | cq->canceled = 0; | ||
| 198 | return cq; | 199 | return cq; |
| 199 | } | 200 | } |
| 200 | 201 | ||
| @@ -202,6 +203,9 @@ void cqueue_free(cqueue *cq) { | |||
| 202 | if(!cq) | 203 | if(!cq) |
| 203 | return; | 204 | return; |
| 204 | 205 | ||
| 206 | // Cancel any outstanding threads before freeing everything | ||
| 207 | cqueue_cancel(cq); | ||
| 208 | |||
| 205 | dlinkedlist_free(cq->list); | 209 | dlinkedlist_free(cq->list); |
| 206 | cnd_destroy(cq->conditional); | 210 | cnd_destroy(cq->conditional); |
| 207 | mtx_destroy(cq->mutex); | 211 | mtx_destroy(cq->mutex); |
| @@ -217,6 +221,11 @@ int cqueue_append(cqueue * const cq, task *tsk) { | |||
| 217 | RETURNWERR(EINVAL, -1); | 221 | RETURNWERR(EINVAL, -1); |
| 218 | 222 | ||
| 219 | mtx_lock(cq->mutex); | 223 | mtx_lock(cq->mutex); |
| 224 | if(cq->canceled) { | ||
| 225 | mtx_unlock(cq->mutex); | ||
| 226 | thrd_exit(thrd_timedout); | ||
| 227 | } | ||
| 228 | |||
| 220 | dlinkedlist_append(cq->list, tsk, free); | 229 | dlinkedlist_append(cq->list, tsk, free); |
| 221 | mtx_unlock(cq->mutex); | 230 | mtx_unlock(cq->mutex); |
| 222 | cnd_signal(cq->conditional); | 231 | cnd_signal(cq->conditional); |
| @@ -229,6 +238,11 @@ int cqueue_prepend(cqueue * const cq, task *tsk) { | |||
| 229 | RETURNWERR(EINVAL, -1); | 238 | RETURNWERR(EINVAL, -1); |
| 230 | 239 | ||
| 231 | mtx_lock(cq->mutex); | 240 | mtx_lock(cq->mutex); |
| 241 | if(cq->canceled) { | ||
| 242 | mtx_unlock(cq->mutex); | ||
| 243 | thrd_exit(thrd_timedout); | ||
| 244 | } | ||
| 245 | |||
| 232 | dlinkedlist_prepend(cq->list, tsk, free); | 246 | dlinkedlist_prepend(cq->list, tsk, free); |
| 233 | mtx_unlock(cq->mutex); | 247 | mtx_unlock(cq->mutex); |
| 234 | cnd_signal(cq->conditional); | 248 | cnd_signal(cq->conditional); |
| @@ -241,6 +255,11 @@ int cqueue_insert(cqueue * const cq, task *tsk, int index) { | |||
| 241 | RETURNWERR(EINVAL, -1); | 255 | RETURNWERR(EINVAL, -1); |
| 242 | 256 | ||
| 243 | mtx_lock(cq->mutex); | 257 | mtx_lock(cq->mutex); |
| 258 | if(cq->canceled) { | ||
| 259 | mtx_unlock(cq->mutex); | ||
| 260 | thrd_exit(thrd_timedout); | ||
| 261 | } | ||
| 262 | |||
| 244 | dlinkedlist_insert(cq->list, tsk, free, index); | 263 | dlinkedlist_insert(cq->list, tsk, free, index); |
| 245 | mtx_unlock(cq->mutex); | 264 | mtx_unlock(cq->mutex); |
| 246 | cnd_signal(cq->conditional); | 265 | cnd_signal(cq->conditional); |
| @@ -253,6 +272,11 @@ int cqueue_size(cqueue const * const cq) { | |||
| 253 | RETURNWERR(EINVAL, -1); | 272 | RETURNWERR(EINVAL, -1); |
| 254 | 273 | ||
| 255 | mtx_lock(cq->mutex); | 274 | mtx_lock(cq->mutex); |
| 275 | if(cq->canceled) { | ||
| 276 | mtx_unlock(cq->mutex); | ||
| 277 | thrd_exit(thrd_timedout); | ||
| 278 | } | ||
| 279 | |||
| 256 | int retval = dlinkedlist_size(cq->list); | 280 | int retval = dlinkedlist_size(cq->list); |
| 257 | mtx_unlock(cq->mutex); | 281 | mtx_unlock(cq->mutex); |
| 258 | 282 | ||
| @@ -260,7 +284,8 @@ int cqueue_size(cqueue const * const cq) { | |||
| 260 | } | 284 | } |
| 261 | 285 | ||
| 262 | int cqueue_isempty(cqueue const * const cq) { | 286 | int cqueue_isempty(cqueue const * const cq) { |
| 263 | return (cqueue_size(cq) == 0); | 287 | int val = cqueue_size(cq); |
| 288 | return (val < 0) ? -1 : (val == 0); | ||
| 264 | } | 289 | } |
| 265 | 290 | ||
| 266 | int cqueue_trypop(cqueue * const cq, task **ret) { | 291 | int cqueue_trypop(cqueue * const cq, task **ret) { |
| @@ -270,6 +295,11 @@ int cqueue_trypop(cqueue * const cq, task **ret) { | |||
| 270 | int retval = 0; | 295 | int retval = 0; |
| 271 | 296 | ||
| 272 | mtx_lock(cq->mutex); | 297 | mtx_lock(cq->mutex); |
| 298 | if(cq->canceled) { | ||
| 299 | mtx_unlock(cq->mutex); | ||
| 300 | thrd_exit(thrd_timedout); | ||
| 301 | } | ||
| 302 | |||
| 273 | if(!dlinkedlist_isempty(cq->list)) { | 303 | if(!dlinkedlist_isempty(cq->list)) { |
| 274 | *ret = (task*)dlinkedlist_poplast(cq->list); | 304 | *ret = (task*)dlinkedlist_poplast(cq->list); |
| 275 | retval = 1; | 305 | retval = 1; |
| @@ -284,10 +314,98 @@ int cqueue_waitpop(cqueue * const cq, task **ret) { | |||
| 284 | RETURNWERR(EINVAL, -1); | 314 | RETURNWERR(EINVAL, -1); |
| 285 | 315 | ||
| 286 | mtx_lock(cq->mutex); | 316 | mtx_lock(cq->mutex); |
| 287 | while(!dlinkedlist_isempty(cq->list)) | 317 | |
| 318 | while(!dlinkedlist_isempty(cq->list) && !cq->canceled) | ||
| 288 | cnd_wait(cq->conditional, cq->mutex); // Unlocks mutex while waiting, acquires lock once waiting is done | 319 | cnd_wait(cq->conditional, cq->mutex); // Unlocks mutex while waiting, acquires lock once waiting is done |
| 320 | |||
| 321 | if(cq->canceled) { | ||
| 322 | mtx_unlock(cq->mutex); | ||
| 323 | thrd_exit(thrd_timedout); | ||
| 324 | } | ||
| 325 | |||
| 289 | *ret = dlinkedlist_poplast(cq->list); | 326 | *ret = dlinkedlist_poplast(cq->list); |
| 327 | |||
| 290 | mtx_unlock(cq->mutex); | 328 | mtx_unlock(cq->mutex); |
| 291 | 329 | ||
| 292 | return 0; | 330 | return 0; |
| 331 | } | ||
| 332 | |||
| 333 | int cqueue_cancel(cqueue * const cq) { | ||
| 334 | if(!cq) | ||
| 335 | RETURNWERR(EINVAL, -1); | ||
| 336 | |||
| 337 | int retval = 0; | ||
| 338 | |||
| 339 | mtx_lock(cq->mutex); | ||
| 340 | if(cq->canceled) | ||
| 341 | retval = -1; | ||
| 342 | else | ||
| 343 | cq->canceled++; | ||
| 344 | |||
| 345 | mtx_unlock(cq->mutex); | ||
| 346 | cnd_broadcast(cq->conditional); | ||
| 347 | |||
| 348 | return retval; | ||
| 349 | } | ||
| 350 | |||
| 351 | |||
| 352 | static void ___ucleanup_cqfree(void *cq) { | ||
| 353 | if(!cq) | ||
| 354 | return; | ||
| 355 | |||
| 356 | cqueue_free(cq); | ||
| 357 | return; | ||
| 358 | } | ||
| 359 | |||
| 360 | threadpool * threadpool_init(int threads) { | ||
| 361 | if(threads < 1) | ||
| 362 | RETURNWERR(EINVAL, NULL); | ||
| 363 | cleanup_CREATE(10); | ||
| 364 | |||
| 365 | threadpool *tp = VALLOC(1, sizeof(*tp)); | ||
| 366 | if(!tp) | ||
| 367 | return NULL; | ||
| 368 | cleanup_REGISTER(free, tp); | ||
| 369 | |||
| 370 | tp->taskqueue = cqueue_init(mtx_plain); | ||
| 371 | if(!tp->taskqueue) | ||
| 372 | cleanup_MARK(); | ||
| 373 | cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue); | ||
| 374 | |||
| 375 | tp->threads = VALLOC(threads, sizeof(*tp->threads)); | ||
| 376 | if(!tp->threads) | ||
| 377 | cleanup_MARK(); | ||
| 378 | cleanup_CNDREGISTER(free, tp->threads); | ||
| 379 | |||
| 380 | for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) { | ||
| 381 | tp->threads[i] = VALLOC(1, sizeof(**tp->threads)); | ||
| 382 | if(!tp->threads[i]) { | ||
| 383 | cleanup_MARK(); | ||
| 384 | for(int j = 0; j < i; j++) | ||
| 385 | free(tp->threads[j]); | ||
| 386 | } | ||
| 387 | } | ||
| 388 | |||
| 389 | if(cleanup_ERRORFLAGGED) | ||
| 390 | cleanup_FIRE(); | ||
| 391 | else | ||
| 392 | tp->nthreads = threads; | ||
| 393 | |||
| 394 | return tp; | ||
| 395 | } | ||
| 396 | |||
| 397 | void threadpool_free(threadpool *tp) { | ||
| 398 | if(!tp) | ||
| 399 | return; | ||
| 400 | |||
| 401 | cqueue_free(tp->taskqueue); | ||
| 402 | for(int i = 0; i < tp->nthreads; i++) { | ||
| 403 | thrd_detach(*tp->threads[i]); | ||
| 404 | free(tp->threads[i]); | ||
| 405 | } | ||
| 406 | free(tp->threads); | ||
| 407 | cqueue_free(tp->taskqueue); | ||
| 408 | free(tp); | ||
| 409 | |||
| 410 | return; | ||
| 293 | } \ No newline at end of file | 411 | } \ No newline at end of file |
