diff options
| author | @syxhe <https://t.me/syxhe> | 2025-05-14 13:20:21 -0500 |
|---|---|---|
| committer | @syxhe <https://t.me/syxhe> | 2025-05-14 13:20:21 -0500 |
| commit | 5a5ff094101d63935f7fc65c124b3e56d553522d (patch) | |
| tree | 3bf3254cb491dc6aef6dd3b3bafaef9709ade9ba /src/threadpool.c | |
| parent | 928238bdcbc78c4196eb4a3508808c79e31f7c84 (diff) | |
Start reimplementing some of the earlier concurrent queue code
Diffstat (limited to 'src/threadpool.c')
| -rw-r--r-- | src/threadpool.c | 159 |
1 files changed, 89 insertions, 70 deletions
diff --git a/src/threadpool.c b/src/threadpool.c index e1c11aa..0baa024 100644 --- a/src/threadpool.c +++ b/src/threadpool.c | |||
| @@ -131,95 +131,114 @@ typedef struct cq { | |||
| 131 | cnd_t cnd; | 131 | cnd_t cnd; |
| 132 | 132 | ||
| 133 | unsigned char canceled; | 133 | unsigned char canceled; |
| 134 | |||
| 135 | } cqueue; | 134 | } cqueue; |
| 136 | 135 | ||
| 137 | 136 | ||
| 137 | static void ___ucleanup_mtxd(void *mtx) { | ||
| 138 | if(!mtx) | ||
| 139 | return; | ||
| 138 | 140 | ||
| 139 | // static void ___ucleanup_dfree(void *dll) { | 141 | mtx_destroy((mtx_t*)mtx); |
| 140 | // if(!dll) | 142 | return; |
| 141 | // return; | 143 | } |
| 142 | 144 | ||
| 143 | // dlinkedlist_free((dlinkedlist *)dll); | 145 | static void ___ucleanup_cndd(void *cnd) { |
| 144 | // return; | 146 | if(!cnd) |
| 145 | // } | 147 | return; |
| 146 | 148 | ||
| 147 | // static void ___ucleanup_cndd(void *cnd) { | 149 | cnd_destroy((cnd_t *)cnd); |
| 148 | // if(!cnd) | 150 | return; |
| 149 | // return; | 151 | } |
| 150 | 152 | ||
| 151 | // cnd_destroy((cnd_t *)cnd); | 153 | cqueue * cqueue_init() { |
| 152 | // return; | 154 | cleanup_CREATE(10); |
| 153 | // } | 155 | |
| 156 | // Create base object | ||
| 157 | cqueue *cq = VALLOC(1, sizeof(*cq)); | ||
| 158 | if(!cq) | ||
| 159 | RETURNWERR(errno, NULL); | ||
| 160 | cleanup_REGISTER(free, cq); | ||
| 161 | cq->canceled = 0; | ||
| 154 | 162 | ||
| 155 | // static void ___ucleanup_mtxd(void *mtx) { | 163 | // Initialize the mutex |
| 156 | // if(!mtx) | 164 | if(mtx_init(&cq->mtx, mtx_plain) != thrd_success) |
| 157 | // return; | 165 | cleanup_MARK(); |
| 166 | cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx); | ||
| 167 | |||
| 168 | // Initialize the conditional | ||
| 169 | if(!cleanup_ERRORFLAGGED) | ||
| 170 | if(cnd_init(&cq->cnd) != thrd_success) | ||
| 171 | cleanup_MARK(); | ||
| 172 | cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd); | ||
| 173 | |||
| 174 | // Create the taskqueue | ||
| 175 | if(!cleanup_ERRORFLAGGED) | ||
| 176 | if(!(cq->taskqueue = dlinkedlist_init())) | ||
| 177 | cleanup_MARK(); | ||
| 178 | cleanup_CNDREGISTER(dlinkedlist_free, cq->taskqueue); | ||
| 179 | |||
| 180 | // Create the thread list | ||
| 181 | if(!cleanup_ERRORFLAGGED) | ||
| 182 | if(!(cq->rthreads = dlinkedlist_init())) | ||
| 183 | cleanup_MARK(); | ||
| 184 | cleanup_CNDREGISTER(dlinkedlist_free, cq->rthreads); | ||
| 185 | |||
| 186 | if(cleanup_ERRORFLAGGED) | ||
| 187 | cleanup_FIRE(); | ||
| 188 | |||
| 189 | return cq; | ||
| 190 | |||
| 191 | // Lambdas would make this a million times easier, as I could wrap this whole thing in a while loop then run a bunch of in-line | ||
| 192 | // callbacks that do these operations and I wouldn't need this badness. That or I could use a goto, but I also hate that idea | ||
| 193 | } | ||
| 158 | 194 | ||
| 159 | // mtx_destroy((mtx_t*)mtx); | 195 | void cqueue_cancel(cqueue * const cq) { |
| 160 | // return; | 196 | if(!cq) |
| 161 | // } | 197 | return; |
| 162 | 198 | ||
| 163 | // cqueue * cqueue_init(int mtx_type) { | 199 | mtx_lock(&cq->mtx); |
| 164 | // cleanup_CREATE(10); | ||
| 165 | |||
| 166 | // cqueue *cq = VALLOC(1, sizeof(*cq)); | ||
| 167 | // if(!cq) | ||
| 168 | // return NULL; | ||
| 169 | // cleanup_REGISTER(free, cq); | ||
| 170 | |||
| 171 | // cq->canceled = FALSE; | ||
| 172 | // cq->list = dlinkedlist_init(); | ||
| 173 | // if(!cq->list) | ||
| 174 | // cleanup_MARK(); | ||
| 175 | // cleanup_CNDREGISTER(___ucleanup_dfree, cq->list); | ||
| 176 | |||
| 177 | // if(!cleanup_ERRORFLAGGED) | ||
| 178 | // if(cnd_init(&cq->cnd) == thrd_error) | ||
| 179 | // cleanup_MARK(); | ||
| 180 | // cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd); | ||
| 181 | |||
| 182 | // if(!cleanup_ERRORFLAGGED) | ||
| 183 | // if(mtx_init(&cq->mtx, mtx_type) != thrd_success) | ||
| 184 | // cleanup_MARK(); | ||
| 185 | // cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx); | ||
| 186 | 200 | ||
| 187 | // if(cleanup_ERRORFLAGGED) | 201 | if(cq->canceled) { |
| 188 | // cleanup_FIRE(); | 202 | mtx_unlock(&cq->mtx); |
| 203 | return; | ||
| 204 | } | ||
| 205 | cq->canceled = 1; | ||
| 189 | 206 | ||
| 190 | // return cq; | 207 | mtx_unlock(&cq->mtx); |
| 191 | // } | 208 | cnd_broadcast(&cq->cnd); |
| 209 | |||
| 210 | return; | ||
| 211 | } | ||
| 192 | 212 | ||
| 193 | // void cqueue_cancel(cqueue *cq) { | 213 | static int ___cqueue_join(void *t) { |
| 194 | // if(!cq) | 214 | if(!t) |
| 195 | // return; | 215 | return -1; |
| 196 | 216 | ||
| 197 | // mtx_lock(cq->mtx); | 217 | int retval = 0; |
| 198 | // if(cq->canceled) { | 218 | thrd_t thread = *((thrd_t*)t); |
| 199 | // mtx_unlock(cq->mtx); | 219 | thrd_join(thread, &retval); |
| 200 | // thrd_exit(-1); | 220 | |
| 201 | // } | 221 | return retval; |
| 222 | } | ||
| 202 | 223 | ||
| 203 | // cq->canceled++; | 224 | void cqueue_free(void *cq) { |
| 204 | // mtx_unlock(cq->mtx); | 225 | if(!cq) |
| 205 | // cnd_broadcast(cq->cnd); | 226 | return; |
| 206 | 227 | ||
| 207 | // return; | 228 | cqueue *real = (cqueue *)cq; |
| 208 | // } | ||
| 209 | 229 | ||
| 210 | // void cqueue_free(cqueue *cq) { | 230 | // Cancel threads and wait for them to exit |
| 211 | // if(!cq) | 231 | cqueue_cancel(real); |
| 212 | // return; | 232 | dlinkedlist_foreach(real->rthreads, ___cqueue_join); |
| 213 | 233 | ||
| 214 | // cqueue_cancel(cq); | 234 | // Threads are dead, no need to worry about concurrency anymore |
| 215 | // mtx_destroy(cq->mtx); | 235 | mtx_destroy(&real->mtx); |
| 216 | // cnd_destroy(cq->cnd); | 236 | cnd_destroy(&real->cnd); |
| 217 | // free(cq->mtx); | 237 | dlinkedlist_free(real->rthreads); |
| 218 | // free(cq->cnd); | 238 | dlinkedlist_free(real->taskqueue); |
| 219 | // dlinkedlist_free(cq->list); | ||
| 220 | 239 | ||
| 221 | // return; | 240 | return; |
| 222 | // } | 241 | } |
| 223 | 242 | ||
| 224 | // int cqueue_addtask(cqueue * const cq, task * const tsk) { | 243 | // int cqueue_addtask(cqueue * const cq, task * const tsk) { |
| 225 | // if(!cq || !tsk) | 244 | // if(!cq || !tsk) |
