diff options
| author | @syxhe <https://t.me/syxhe> | 2025-05-14 12:19:34 -0500 |
|---|---|---|
| committer | @syxhe <https://t.me/syxhe> | 2025-05-14 12:19:34 -0500 |
| commit | 928238bdcbc78c4196eb4a3508808c79e31f7c84 (patch) | |
| tree | 6ce7c393be64ba26c54f9f5f8d3fa442c6f5833d /src/threadpool.c | |
| parent | 9be8b5f26a29dad04035386331461c4c320f9237 (diff) | |
Update signatures of free-type functions to match the freecallback signature
Diffstat (limited to 'src/threadpool.c')
| -rw-r--r-- | src/threadpool.c | 264 |
1 files changed, 125 insertions, 139 deletions
diff --git a/src/threadpool.c b/src/threadpool.c index 9d00030..e1c11aa 100644 --- a/src/threadpool.c +++ b/src/threadpool.c | |||
| @@ -10,7 +10,7 @@ | |||
| 10 | // Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) | 10 | // Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) |
| 11 | typedef struct mtxp { | 11 | typedef struct mtxp { |
| 12 | void *data; | 12 | void *data; |
| 13 | mtx_t *mtx; | 13 | mtx_t mtx; |
| 14 | } mtxpair; | 14 | } mtxpair; |
| 15 | 15 | ||
| 16 | mtxpair * mtxpair_init(void * const data, int type) { | 16 | mtxpair * mtxpair_init(void * const data, int type) { |
| @@ -18,16 +18,9 @@ mtxpair * mtxpair_init(void * const data, int type) { | |||
| 18 | if(!mtxp) | 18 | if(!mtxp) |
| 19 | return NULL; | 19 | return NULL; |
| 20 | 20 | ||
| 21 | // Make room for the mutex | ||
| 22 | mtxp->mtx = VALLOC(1, sizeof(*mtxp->mtx)); | ||
| 23 | if(!mtxp->mtx) { | ||
| 24 | free(mtxp); | ||
| 25 | return NULL; | ||
| 26 | } | ||
| 27 | |||
| 28 | // Init the mutex | 21 | // Init the mutex |
| 29 | if(mtx_init(mtxp->mtx, type) == thrd_error) { | 22 | if(mtx_init(&mtxp->mtx, type) == thrd_error) { |
| 30 | free(mtxp->mtx); free(mtxp); | 23 | free(mtxp); |
| 31 | RETURNWERR(errno, NULL); | 24 | RETURNWERR(errno, NULL); |
| 32 | } | 25 | } |
| 33 | 26 | ||
| @@ -39,8 +32,7 @@ void mtxpair_free(mtxpair *mp) { | |||
| 39 | if(!mp) | 32 | if(!mp) |
| 40 | return; | 33 | return; |
| 41 | 34 | ||
| 42 | mtx_destroy(mp->mtx); | 35 | mtx_destroy(&mp->mtx); |
| 43 | free(mp->mtx); | ||
| 44 | free(mp); | 36 | free(mp); |
| 45 | 37 | ||
| 46 | return; | 38 | return; |
| @@ -64,9 +56,9 @@ int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) | |||
| 64 | if(!mtxd) | 56 | if(!mtxd) |
| 65 | RETURNWERR(EINVAL, thrd_error); | 57 | RETURNWERR(EINVAL, thrd_error); |
| 66 | 58 | ||
| 67 | if(mtx_lock(mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} | 59 | if(mtx_lock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} |
| 68 | int retval = thrd_create(thr, func, mtxd->data); | 60 | int retval = thrd_create(thr, func, mtxd->data); |
| 69 | if(mtx_unlock(mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} | 61 | if(mtx_unlock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} |
| 70 | 62 | ||
| 71 | return retval; | 63 | return retval; |
| 72 | } | 64 | } |
| @@ -99,21 +91,6 @@ typedef struct task { | |||
| 99 | void *arg; | 91 | void *arg; |
| 100 | } task; | 92 | } task; |
| 101 | 93 | ||
| 102 | typedef struct cq { | ||
| 103 | dlinkedlist *list; | ||
| 104 | mtx_t *mtx; | ||
| 105 | cnd_t *cnd; | ||
| 106 | unsigned char canceled; | ||
| 107 | } cqueue; | ||
| 108 | |||
| 109 | typedef struct tp { | ||
| 110 | thrd_t **threads; | ||
| 111 | int nthreads; | ||
| 112 | |||
| 113 | cqueue *taskqueue; | ||
| 114 | } threadpool; | ||
| 115 | |||
| 116 | |||
| 117 | task * task_init(task_callback cb, void *arg) { | 94 | task * task_init(task_callback cb, void *arg) { |
| 118 | if(cb == NULL) | 95 | if(cb == NULL) |
| 119 | RETURNWERR(EINVAL, NULL); | 96 | RETURNWERR(EINVAL, NULL); |
| @@ -138,149 +115,158 @@ void task_free(task *ts) { | |||
| 138 | int task_fire(task *ts) { | 115 | int task_fire(task *ts) { |
| 139 | if(!ts) | 116 | if(!ts) |
| 140 | RETURNWERR(EINVAL, -1); | 117 | RETURNWERR(EINVAL, -1); |
| 118 | if(ts->cb == NULL) | ||
| 119 | RETURNWERR(EINVAL, -1); | ||
| 141 | 120 | ||
| 142 | return ts->cb(ts->arg); | 121 | return ts->cb(ts->arg); |
| 143 | } | 122 | } |
| 144 | 123 | ||
| 145 | 124 | ||
| 146 | /* Mutex: Lock a shared resource. Used to prevent race conditions when accessing / modifying some shared resource. A lock must | ||
| 147 | // always be followed by an unlock | ||
| 148 | 125 | ||
| 149 | // Semaphore: Send / wait on a signal; solves the consumer/producer problem. A function that sends should never wait, and a | 126 | typedef struct cq { |
| 150 | // function that waits should never send */ | 127 | dlinkedlist *taskqueue; |
| 128 | dlinkedlist *rthreads; | ||
| 151 | 129 | ||
| 152 | static void ___ucleanup_dfree(void *dll) { | 130 | mtx_t mtx; |
| 153 | if(!dll) | 131 | cnd_t cnd; |
| 154 | return; | 132 | |
| 133 | unsigned char canceled; | ||
| 155 | 134 | ||
| 156 | dlinkedlist_free((dlinkedlist *)dll); | 135 | } cqueue; |
| 157 | return; | ||
| 158 | } | ||
| 159 | 136 | ||
| 160 | static void ___ucleanup_cndd(void *cnd) { | ||
| 161 | if(!cnd) | ||
| 162 | return; | ||
| 163 | 137 | ||
| 164 | cnd_destroy((cnd_t *)cnd); | ||
| 165 | return; | ||
| 166 | } | ||
| 167 | 138 | ||
| 168 | static void ___ucleanup_mtxd(void *mtx) { | 139 | // static void ___ucleanup_dfree(void *dll) { |
| 169 | if(!mtx) | 140 | // if(!dll) |
| 170 | return; | 141 | // return; |
| 171 | 142 | ||
| 172 | mtx_destroy((mtx_t*)mtx); | 143 | // dlinkedlist_free((dlinkedlist *)dll); |
| 173 | return; | 144 | // return; |
| 174 | } | 145 | // } |
| 146 | |||
| 147 | // static void ___ucleanup_cndd(void *cnd) { | ||
| 148 | // if(!cnd) | ||
| 149 | // return; | ||
| 150 | |||
| 151 | // cnd_destroy((cnd_t *)cnd); | ||
| 152 | // return; | ||
| 153 | // } | ||
| 154 | |||
| 155 | // static void ___ucleanup_mtxd(void *mtx) { | ||
| 156 | // if(!mtx) | ||
| 157 | // return; | ||
| 175 | 158 | ||
| 176 | cqueue * cqueue_init(int mtx_type) { | 159 | // mtx_destroy((mtx_t*)mtx); |
| 177 | cleanup_CREATE(10); | 160 | // return; |
| 161 | // } | ||
| 162 | |||
| 163 | // cqueue * cqueue_init(int mtx_type) { | ||
| 164 | // cleanup_CREATE(10); | ||
| 178 | 165 | ||
| 179 | cqueue *cq = VALLOC(1, sizeof(*cq)); | 166 | // cqueue *cq = VALLOC(1, sizeof(*cq)); |
| 180 | if(!cq) | 167 | // if(!cq) |
| 181 | return NULL; | 168 | // return NULL; |
| 182 | cleanup_REGISTER(free, cq); | 169 | // cleanup_REGISTER(free, cq); |
| 183 | 170 | ||
| 184 | cq->canceled = FALSE; | 171 | // cq->canceled = FALSE; |
| 185 | cq->list = dlinkedlist_init(); | 172 | // cq->list = dlinkedlist_init(); |
| 186 | if(!cq->list) | 173 | // if(!cq->list) |
| 187 | cleanup_MARK(); | 174 | // cleanup_MARK(); |
| 188 | cleanup_CNDREGISTER(___ucleanup_dfree, cq->list); | 175 | // cleanup_CNDREGISTER(___ucleanup_dfree, cq->list); |
| 189 | |||
| 190 | if(!cleanup_ERRORFLAGGED) | ||
| 191 | if(!(cq->cnd = VALLOC(1, sizeof(*cq->cnd)))) | ||
| 192 | cleanup_MARK(); | ||
| 193 | cleanup_CNDREGISTER(free, cq->cnd); | ||
| 194 | 176 | ||
| 195 | if(!cleanup_ERRORFLAGGED) | 177 | // if(!cleanup_ERRORFLAGGED) |
| 196 | if(cnd_init(cq->cnd) == thrd_error) | 178 | // if(cnd_init(&cq->cnd) == thrd_error) |
| 197 | cleanup_MARK(); | 179 | // cleanup_MARK(); |
| 198 | cleanup_CNDREGISTER(___ucleanup_cndd, cq->cnd); | 180 | // cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd); |
| 199 | 181 | ||
| 200 | if(!cleanup_ERRORFLAGGED) | 182 | // if(!cleanup_ERRORFLAGGED) |
| 201 | if(!(cq->mtx = VALLOC(1, sizeof(*cq->mtx)))) | 183 | // if(mtx_init(&cq->mtx, mtx_type) != thrd_success) |
| 202 | cleanup_MARK(); | 184 | // cleanup_MARK(); |
| 203 | cleanup_CNDREGISTER(free, cq->mtx); | 185 | // cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx); |
| 204 | |||
| 205 | if(!cleanup_ERRORFLAGGED) | ||
| 206 | if(mtx_init(cq->mtx, mtx_type) != thrd_success) | ||
| 207 | cleanup_MARK(); | ||
| 208 | cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx); | ||
| 209 | 186 | ||
| 210 | if(cleanup_ERRORFLAGGED) | 187 | // if(cleanup_ERRORFLAGGED) |
| 211 | cleanup_FIRE(); | 188 | // cleanup_FIRE(); |
| 212 | 189 | ||
| 213 | return cq; | 190 | // return cq; |
| 214 | } | 191 | // } |
| 215 | 192 | ||
| 216 | void cqueue_cancel(cqueue *cq) { | 193 | // void cqueue_cancel(cqueue *cq) { |
| 217 | if(!cq) | 194 | // if(!cq) |
| 218 | return; | 195 | // return; |
| 219 | 196 | ||
| 220 | mtx_lock(cq->mtx); | 197 | // mtx_lock(cq->mtx); |
| 221 | if(cq->canceled) { | 198 | // if(cq->canceled) { |
| 222 | mtx_unlock(cq->mtx); | 199 | // mtx_unlock(cq->mtx); |
| 223 | thrd_exit(-1); | 200 | // thrd_exit(-1); |
| 224 | } | 201 | // } |
| 225 | 202 | ||
| 226 | cq->canceled++; | 203 | // cq->canceled++; |
| 227 | mtx_unlock(cq->mtx); | 204 | // mtx_unlock(cq->mtx); |
| 228 | cnd_broadcast(cq->cnd); | 205 | // cnd_broadcast(cq->cnd); |
| 229 | 206 | ||
| 230 | return; | 207 | // return; |
| 231 | } | 208 | // } |
| 232 | 209 | ||
| 233 | void cqueue_free(cqueue *cq) { | 210 | // void cqueue_free(cqueue *cq) { |
| 234 | if(!cq) | 211 | // if(!cq) |
| 235 | return; | 212 | // return; |
| 236 | 213 | ||
| 237 | cqueue_cancel(cq); | 214 | // cqueue_cancel(cq); |
| 238 | mtx_destroy(cq->mtx); | 215 | // mtx_destroy(cq->mtx); |
| 239 | cnd_destroy(cq->cnd); | 216 | // cnd_destroy(cq->cnd); |
| 240 | free(cq->mtx); | 217 | // free(cq->mtx); |
| 241 | free(cq->cnd); | 218 | // free(cq->cnd); |
| 242 | dlinkedlist_free(cq->list); | 219 | // dlinkedlist_free(cq->list); |
| 243 | 220 | ||
| 244 | return; | 221 | // return; |
| 245 | } | 222 | // } |
| 246 | 223 | ||
| 247 | int cqueue_addtask(cqueue * const cq, task * const tsk) { | 224 | // int cqueue_addtask(cqueue * const cq, task * const tsk) { |
| 248 | if(!cq || !tsk) | 225 | // if(!cq || !tsk) |
| 249 | RETURNWERR(EINVAL, -1); | 226 | // RETURNWERR(EINVAL, -1); |
| 250 | 227 | ||
| 251 | mtx_lock(cq->mtx); | 228 | // mtx_lock(cq->mtx); |
| 252 | 229 | ||
| 253 | // TODO: Think about creating an "exception" via signal handling | 230 | // // TODO: Think about creating an "exception" via signal handling |
| 254 | if(cq->canceled) { | 231 | // if(cq->canceled) { |
| 255 | mtx_unlock(cq->mtx); | 232 | // mtx_unlock(cq->mtx); |
| 256 | thrd_exit(-1); | 233 | // thrd_exit(-1); |
| 257 | } | 234 | // } |
| 258 | 235 | ||
| 259 | dlinkedlist_prepend(cq->list, tsk, free); | 236 | // dlinkedlist_prepend(cq->list, tsk, free); |
| 260 | mtx_unlock(cq->mtx); | 237 | // mtx_unlock(cq->mtx); |
| 261 | cnd_signal(cq->cnd); | 238 | // cnd_signal(cq->cnd); |
| 262 | 239 | ||
| 263 | return 0; | 240 | // return 0; |
| 264 | } | 241 | // } |
| 265 | 242 | ||
| 266 | task * cqueue_waitpop(cqueue * const cq) { | 243 | // task * cqueue_waitpop(cqueue * const cq) { |
| 267 | if(!cq) | 244 | // if(!cq) |
| 268 | RETURNWERR(EINVAL, NULL); | 245 | // RETURNWERR(EINVAL, NULL); |
| 269 | 246 | ||
| 270 | task *retval = NULL; | 247 | // task *retval = NULL; |
| 271 | 248 | ||
| 272 | mtx_lock(cq->mtx); | 249 | // mtx_lock(cq->mtx); |
| 273 | while(dlinkedlist_isempty(cq->list) && !cq->canceled) | 250 | // while(dlinkedlist_isempty(cq->list) && !cq->canceled) |
| 274 | cnd_wait(cq->cnd, cq->mtx); | 251 | // cnd_wait(cq->cnd, cq->mtx); |
| 275 | 252 | ||
| 276 | if(cq->canceled) { | 253 | // if(cq->canceled) { |
| 277 | mtx_unlock(cq->mtx); | 254 | // mtx_unlock(cq->mtx); |
| 278 | thrd_exit(-1); | 255 | // thrd_exit(-1); |
| 279 | } | 256 | // } |
| 280 | 257 | ||
| 281 | retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); | 258 | // retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); |
| 282 | dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); | 259 | // dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); |
| 283 | mtx_unlock(cq->mtx); | 260 | // mtx_unlock(cq->mtx); |
| 284 | 261 | ||
| 285 | return retval; | 262 | // return retval; |
| 286 | } \ No newline at end of file | 263 | // } |
| 264 | |||
| 265 | |||
| 266 | |||
| 267 | typedef struct tp { | ||
| 268 | thrd_t **threads; // thrd_t *threads[] | ||
| 269 | int nthreads; | ||
| 270 | |||
| 271 | cqueue *taskqueue; | ||
| 272 | } threadpool; | ||
