summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author@syxhe <https://t.me/syxhe>2025-04-22 18:53:20 -0500
committer@syxhe <https://t.me/syxhe>2025-04-22 18:53:20 -0500
commitdb6aacd22f5c43cc6ea45e6c07bf962859ebac8d (patch)
tree928324c6c344429e682a7422d19b01c8300103be
parent39c3fa785cafd5fa9b75bfbf92d7a702310ba480 (diff)
Stop threadpool from crashing
-rw-r--r--src/main.c1
-rw-r--r--src/threadpool.c120
-rw-r--r--src/threadpool.h1
3 files changed, 68 insertions, 54 deletions
diff --git a/src/main.c b/src/main.c
index f89a124..6af87ab 100644
--- a/src/main.c
+++ b/src/main.c
@@ -23,6 +23,7 @@ int main() {
23 threadpool *tp = threadpool_init(2); 23 threadpool *tp = threadpool_init(2);
24 task *tsk = task_init(testcb, "This is some data"); 24 task *tsk = task_init(testcb, "This is some data");
25 threadpool_addtask(tp, tsk); 25 threadpool_addtask(tp, tsk);
26 threadpool_join(tp);
26 threadpool_free(tp); 27 threadpool_free(tp);
27 28
28 return 0; 29 return 0;
diff --git a/src/threadpool.c b/src/threadpool.c
index 31c300c..ab0733d 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -1,5 +1,4 @@
1#include "threadpool.h" 1#include "threadpool.h"
2#include "arena.h"
3#include "shared.h" 2#include "shared.h"
4 3
5#include "ll.h" 4#include "ll.h"
@@ -102,8 +101,8 @@ typedef struct task {
102 101
103typedef struct cq { 102typedef struct cq {
104 dlinkedlist *list; 103 dlinkedlist *list;
105 mtx_t *mutex; 104 mtx_t *mtx;
106 cnd_t *conditional; 105 cnd_t *cnd;
107 unsigned char canceled; 106 unsigned char canceled;
108} cqueue; 107} cqueue;
109 108
@@ -168,32 +167,37 @@ cqueue * cqueue_init(int mtx_type) {
168 return NULL; 167 return NULL;
169 cleanup_REGISTER(free, cq); 168 cleanup_REGISTER(free, cq);
170 169
171 cq->mutex = VALLOC(1, sizeof(*(cq->mutex))); 170 cq->mtx = VALLOC(1, sizeof(*(cq->mtx)));
172 if(!(cq->mutex)) 171 if(!(cq->mtx))
173 cleanup_MARK(); 172 cleanup_MARK();
174 cleanup_CNDREGISTER(free, cq->mutex); 173 cleanup_CNDREGISTER(free, cq->mtx);
175 174
176 if(cleanup_ERRORFLAGGED && mtx_init(cq->mutex, mtx_type) != thrd_success) 175 if(!cleanup_ERRORFLAGGED)
177 cleanup_MARK(); 176 if(mtx_init(cq->mtx, mtx_type) != thrd_success)
178 cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mutex); 177 cleanup_MARK();
178 cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx);
179 179
180 if(cleanup_ERRORFLAGGED && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional))))) 180 if(!cleanup_ERRORFLAGGED)
181 cleanup_MARK(); 181 if(!(cq->cnd = VALLOC(1, sizeof(*(cq->cnd)))))
182 cleanup_CNDREGISTER(free, cq->conditional); 182 cleanup_MARK();
183 cleanup_CNDREGISTER(free, cq->cnd);
183 184
184 if(cleanup_ERRORFLAGGED && cnd_init(cq->conditional) != thrd_success) 185 if(!cleanup_ERRORFLAGGED)
185 cleanup_MARK(); 186 if(cnd_init(cq->cnd) != thrd_success)
186 cleanup_CNDREGISTER(___ucleanup_cndd, cq->conditional); 187 cleanup_MARK();
188 cleanup_CNDREGISTER(___ucleanup_cndd, cq->cnd);
187 189
188 cq->list = dlinkedlist_init(); 190 if(!cleanup_ERRORFLAGGED)
189 if(cleanup_ERRORFLAGGED && !cq->list) 191 if(!(cq->list = dlinkedlist_init()))
190 cleanup_MARK(); 192 cleanup_MARK();
191 cleanup_CNDREGISTER(___ucleanup_dll, cq->list); 193 cleanup_CNDREGISTER(___ucleanup_dll, cq->list);
192 194
195
193 if(cleanup_ERRORFLAGGED) 196 if(cleanup_ERRORFLAGGED)
194 cleanup_fire(&__CLEANUP); 197 cleanup_fire(&__CLEANUP);
195 198
196 // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions 199 // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions
200 // The implementation was not better lmao
197 201
198 cq->canceled = 0; 202 cq->canceled = 0;
199 return cq; 203 return cq;
@@ -207,10 +211,10 @@ void cqueue_free(cqueue *cq) {
207 cqueue_cancel(cq); 211 cqueue_cancel(cq);
208 212
209 dlinkedlist_free(cq->list); 213 dlinkedlist_free(cq->list);
210 cnd_destroy(cq->conditional); 214 cnd_destroy(cq->cnd);
211 mtx_destroy(cq->mutex); 215 mtx_destroy(cq->mtx);
212 free(cq->conditional); 216 free(cq->cnd);
213 free(cq->mutex); 217 free(cq->mtx);
214 free(cq); 218 free(cq);
215 219
216 return; 220 return;
@@ -220,15 +224,15 @@ int cqueue_append(cqueue * const cq, task *tsk) {
220 if(!cq || !tsk) 224 if(!cq || !tsk)
221 RETURNWERR(EINVAL, -1); 225 RETURNWERR(EINVAL, -1);
222 226
223 mtx_lock(cq->mutex); 227 mtx_lock(cq->mtx);
224 if(cq->canceled) { 228 if(cq->canceled) {
225 mtx_unlock(cq->mutex); 229 mtx_unlock(cq->mtx);
226 thrd_exit(thrd_timedout); 230 thrd_exit(thrd_timedout);
227 } 231 }
228 232
229 dlinkedlist_append(cq->list, tsk, free); 233 dlinkedlist_append(cq->list, tsk, free);
230 mtx_unlock(cq->mutex); 234 mtx_unlock(cq->mtx);
231 cnd_signal(cq->conditional); 235 cnd_signal(cq->cnd);
232 236
233 return 0; 237 return 0;
234} 238}
@@ -237,15 +241,15 @@ int cqueue_prepend(cqueue * const cq, task *tsk) {
237 if(!cq || !tsk) 241 if(!cq || !tsk)
238 RETURNWERR(EINVAL, -1); 242 RETURNWERR(EINVAL, -1);
239 243
240 mtx_lock(cq->mutex); 244 mtx_lock(cq->mtx);
241 if(cq->canceled) { 245 if(cq->canceled) {
242 mtx_unlock(cq->mutex); 246 mtx_unlock(cq->mtx);
243 thrd_exit(thrd_timedout); 247 thrd_exit(thrd_timedout);
244 } 248 }
245 249
246 dlinkedlist_prepend(cq->list, tsk, free); 250 dlinkedlist_prepend(cq->list, tsk, free);
247 mtx_unlock(cq->mutex); 251 mtx_unlock(cq->mtx);
248 cnd_signal(cq->conditional); 252 cnd_signal(cq->cnd);
249 253
250 return 0; 254 return 0;
251} 255}
@@ -254,15 +258,15 @@ int cqueue_insert(cqueue * const cq, task *tsk, int index) {
254 if(!cq || !tsk || index < 0) // Can't check to see if the index is too high without locking the mutex first 258 if(!cq || !tsk || index < 0) // Can't check to see if the index is too high without locking the mutex first
255 RETURNWERR(EINVAL, -1); 259 RETURNWERR(EINVAL, -1);
256 260
257 mtx_lock(cq->mutex); 261 mtx_lock(cq->mtx);
258 if(cq->canceled) { 262 if(cq->canceled) {
259 mtx_unlock(cq->mutex); 263 mtx_unlock(cq->mtx);
260 thrd_exit(thrd_timedout); 264 thrd_exit(thrd_timedout);
261 } 265 }
262 266
263 dlinkedlist_insert(cq->list, tsk, free, index); 267 dlinkedlist_insert(cq->list, tsk, free, index);
264 mtx_unlock(cq->mutex); 268 mtx_unlock(cq->mtx);
265 cnd_signal(cq->conditional); 269 cnd_signal(cq->cnd);
266 270
267 return 0; 271 return 0;
268} 272}
@@ -271,14 +275,14 @@ int cqueue_size(cqueue const * const cq) {
271 if(!cq) 275 if(!cq)
272 RETURNWERR(EINVAL, -1); 276 RETURNWERR(EINVAL, -1);
273 277
274 mtx_lock(cq->mutex); 278 mtx_lock(cq->mtx);
275 if(cq->canceled) { 279 if(cq->canceled) {
276 mtx_unlock(cq->mutex); 280 mtx_unlock(cq->mtx);
277 thrd_exit(thrd_timedout); 281 thrd_exit(thrd_timedout);
278 } 282 }
279 283
280 int retval = dlinkedlist_size(cq->list); 284 int retval = dlinkedlist_size(cq->list);
281 mtx_unlock(cq->mutex); 285 mtx_unlock(cq->mtx);
282 286
283 return retval; 287 return retval;
284} 288}
@@ -294,9 +298,9 @@ int cqueue_trypop(cqueue * const cq, task **ret) {
294 298
295 int retval = 0; 299 int retval = 0;
296 300
297 mtx_lock(cq->mutex); 301 mtx_lock(cq->mtx);
298 if(cq->canceled) { 302 if(cq->canceled) {
299 mtx_unlock(cq->mutex); 303 mtx_unlock(cq->mtx);
300 thrd_exit(thrd_timedout); 304 thrd_exit(thrd_timedout);
301 } 305 }
302 306
@@ -304,7 +308,7 @@ int cqueue_trypop(cqueue * const cq, task **ret) {
304 *ret = (task*)dlinkedlist_poplast(cq->list); 308 *ret = (task*)dlinkedlist_poplast(cq->list);
305 retval = 1; 309 retval = 1;
306 } 310 }
307 mtx_unlock(cq->mutex); 311 mtx_unlock(cq->mtx);
308 312
309 return retval; 313 return retval;
310} 314}
@@ -313,19 +317,19 @@ int cqueue_waitpop(cqueue * const cq, task **ret) {
313 if(!cq || !ret) 317 if(!cq || !ret)
314 RETURNWERR(EINVAL, -1); 318 RETURNWERR(EINVAL, -1);
315 319
316 mtx_lock(cq->mutex); 320 mtx_lock(cq->mtx);
317 321
318 while(!dlinkedlist_isempty(cq->list) && !cq->canceled) 322 while(!dlinkedlist_isempty(cq->list) && !cq->canceled)
319 cnd_wait(cq->conditional, cq->mutex); // Unlocks mutex while waiting, acquires lock once waiting is done 323 cnd_wait(cq->cnd, cq->mtx); // Unlocks mutex while waiting, acquires lock once waiting is done
320 324
321 if(cq->canceled) { 325 if(cq->canceled) {
322 mtx_unlock(cq->mutex); 326 mtx_unlock(cq->mtx);
323 thrd_exit(thrd_timedout); 327 thrd_exit(thrd_timedout);
324 } 328 }
325 329
326 *ret = dlinkedlist_poplast(cq->list); 330 *ret = dlinkedlist_poplast(cq->list);
327 331
328 mtx_unlock(cq->mutex); 332 mtx_unlock(cq->mtx);
329 333
330 return 0; 334 return 0;
331} 335}
@@ -336,14 +340,14 @@ int cqueue_cancel(cqueue * const cq) {
336 340
337 int retval = 0; 341 int retval = 0;
338 342
339 mtx_lock(cq->mutex); 343 mtx_lock(cq->mtx);
340 if(cq->canceled) 344 if(cq->canceled)
341 retval = -1; 345 retval = -1;
342 else 346 else
343 cq->canceled++; 347 cq->canceled++;
344 348
345 mtx_unlock(cq->mutex); 349 mtx_unlock(cq->mtx);
346 cnd_broadcast(cq->conditional); 350 cnd_broadcast(cq->cnd);
347 351
348 return retval; 352 return retval;
349} 353}
@@ -389,9 +393,9 @@ threadpool * threadpool_init(int threads) {
389 cleanup_MARK(); 393 cleanup_MARK();
390 cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue); 394 cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue);
391 395
392 tp->threads = VALLOC(threads, sizeof(*tp->threads)); 396 if(!cleanup_ERRORFLAGGED)
393 if(!tp->threads) 397 if(!(tp->threads = VALLOC(threads, sizeof(*tp->threads))))
394 cleanup_MARK(); 398 cleanup_MARK();
395 cleanup_CNDREGISTER(free, tp->threads); 399 cleanup_CNDREGISTER(free, tp->threads);
396 400
397 for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) { 401 for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) {
@@ -420,13 +424,11 @@ void threadpool_free(threadpool *tp) {
420 return; 424 return;
421 425
422 cqueue_free(tp->taskqueue); 426 cqueue_free(tp->taskqueue);
423 for(int i = 0; i < tp->nthreads; i++) { 427 for(int i = 0; i < tp->nthreads; i++)
424 thrd_detach(*tp->threads[i]);
425 free(tp->threads[i]); 428 free(tp->threads[i]);
426 }
427 free(tp->threads); 429 free(tp->threads);
428 free(tp); 430 free(tp);
429 431
430 return; 432 return;
431} 433}
432 434
@@ -435,4 +437,14 @@ int threadpool_addtask(threadpool * const tp, task * const task) {
435 RETURNWERR(EINVAL, -1); 437 RETURNWERR(EINVAL, -1);
436 438
437 return cqueue_append(tp->taskqueue, task); 439 return cqueue_append(tp->taskqueue, task);
440}
441
442int threadpool_join(const threadpool * const tp) {
443 if(!tp)
444 RETURNWERR(EINVAL, -1);
445
446 for(int i = 0; i < tp->nthreads; i++)
447 thrd_join(*(tp->threads[i]), NULL);
448
449 return 0;
438} \ No newline at end of file 450} \ No newline at end of file
diff --git a/src/threadpool.h b/src/threadpool.h
index 48ad9eb..bc3ce06 100644
--- a/src/threadpool.h
+++ b/src/threadpool.h
@@ -25,6 +25,7 @@ int cqueue_cancel(cqueue * const cq);
25threadpool * threadpool_init(int threads); 25threadpool * threadpool_init(int threads);
26void threadpool_free(threadpool *tp); 26void threadpool_free(threadpool *tp);
27int threadpool_addtask(threadpool * const tp, task * const task); 27int threadpool_addtask(threadpool * const tp, task * const task);
28int threadpool_join(const threadpool * const tp);
28 29
29typedef struct mtxp mtxpair; 30typedef struct mtxp mtxpair;
30mtxpair * mtxpair_init(void * const data, int type); 31mtxpair * mtxpair_init(void * const data, int type);