summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author@syxhe <https://t.me/syxhe>2025-05-16 19:09:02 -0500
committer@syxhe <https://t.me/syxhe>2025-05-16 19:09:02 -0500
commit60715c7785e7fbe4759df64258ad58a8e2a0769d (patch)
tree7471bc9816853518bdbc8baa02f817d5cf73594b
parent5a5ff094101d63935f7fc65c124b3e56d553522d (diff)
Do some more work on the concurrent queue
-rw-r--r--src/shared.h9
-rw-r--r--src/threadpool.c171
-rw-r--r--src/threadpool.h10
3 files changed, 147 insertions, 43 deletions
diff --git a/src/shared.h b/src/shared.h
index 825814e..66d1af3 100644
--- a/src/shared.h
+++ b/src/shared.h
@@ -7,6 +7,9 @@
7#define FALSE 0 7#define FALSE 0
8#define TRUE 1 8#define TRUE 1
9 9
10typedef int (*gcallback)(void*); // Generic callback signature
11typedef void (*fcallback)(void*); // free()-like callback signature
12
10#define RETURNWERR(errval, retval) do {\ 13#define RETURNWERR(errval, retval) do {\
11 errno = (errval);\ 14 errno = (errval);\
12 return (retval);\ 15 return (retval);\
@@ -60,12 +63,12 @@ char * xdirname(const char * const path);
60 63
61 64
62// Cleanup callback. Should act like `free()`, in that it doesn't crash if the pointer it's given is null 65// Cleanup callback. Should act like `free()`, in that it doesn't crash if the pointer it's given is null
63typedef void (*cleanup_callback)(void*); 66typedef fcallback cleanup_callback;
64 67
65// Cleanup struct. Stores a STATICALLY defined array of callbacks and void* arguments 68// Cleanup struct. Stores a STATICALLY defined array of callbacks and void* arguments
66typedef struct cl { 69typedef struct cl {
67 cleanup_callback *funcs; 70 cleanup_callback *funcs; // Actual type: cleanup_callback funcs[]
68 void **args; 71 void **args; // Actual type: void *args[]
69 72
70 int size; 73 int size;
71 int used; 74 int used;
diff --git a/src/threadpool.c b/src/threadpool.c
index 0baa024..6912790 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -3,6 +3,8 @@
3 3
4#include "ll.h" 4#include "ll.h"
5 5
6#include <asm-generic/errno-base.h>
7#include <asm-generic/errno.h>
6#include <threads.h> 8#include <threads.h>
7#include <stdlib.h> 9#include <stdlib.h>
8#include <errno.h> 10#include <errno.h>
@@ -104,7 +106,7 @@ task * task_init(task_callback cb, void *arg) {
104 return task; 106 return task;
105} 107}
106 108
107void task_free(task *ts) { 109void task_free(void *ts) {
108 if(!ts) 110 if(!ts)
109 return; 111 return;
110 112
@@ -215,8 +217,7 @@ static int ___cqueue_join(void *t) {
215 return -1; 217 return -1;
216 218
217 int retval = 0; 219 int retval = 0;
218 thrd_t thread = *((thrd_t*)t); 220 thrd_join(*((thrd_t*)t), &retval);
219 thrd_join(thread, &retval);
220 221
221 return retval; 222 return retval;
222} 223}
@@ -240,52 +241,146 @@ void cqueue_free(void *cq) {
240 return; 241 return;
241} 242}
242 243
243// int cqueue_addtask(cqueue * const cq, task * const tsk) { 244int cqueue_addtask(cqueue * const cq, task * const tsk) {
244// if(!cq || !tsk) 245 if(!cq || !tsk)
245// RETURNWERR(EINVAL, -1); 246 RETURNWERR(EINVAL, -1);
246 247
247// mtx_lock(cq->mtx); 248 mtx_lock(&cq->mtx);
248 249
249// // TODO: Think about creating an "exception" via signal handling 250 if(cq->canceled) {
250// if(cq->canceled) { 251 mtx_unlock(&cq->mtx);
251// mtx_unlock(cq->mtx); 252 RETURNWERR(ECANCELED, -1);
252// thrd_exit(-1); 253 }
253// }
254 254
255// dlinkedlist_prepend(cq->list, tsk, free); 255 dlinkedlist_prepend(cq->taskqueue, tsk, task_free);
256// mtx_unlock(cq->mtx); 256 mtx_unlock(&cq->mtx);
257// cnd_signal(cq->cnd); 257 cnd_signal(&cq->cnd);
258 258
259// return 0; 259 return 0;
260// } 260}
261 261
262// task * cqueue_waitpop(cqueue * const cq) { 262task * cqueue_waitpop(cqueue * const cq) {
263// if(!cq) 263 if(!cq)
264// RETURNWERR(EINVAL, NULL); 264 RETURNWERR(EINVAL, NULL);
265 265
266// task *retval = NULL; 266 task *tsk = NULL;
267 int index = -1;
267 268
268// mtx_lock(cq->mtx); 269 mtx_lock(&cq->mtx);
269// while(dlinkedlist_isempty(cq->list) && !cq->canceled) 270 while(dlinkedlist_size(cq->taskqueue) <= 0 && !cq->canceled)
270// cnd_wait(cq->cnd, cq->mtx); 271 cnd_wait(&cq->cnd, &cq->mtx);
271 272
272// if(cq->canceled) { 273 if(cq->canceled) {
273// mtx_unlock(cq->mtx); 274 mtx_unlock(&cq->mtx);
274// thrd_exit(-1); 275 thrd_exit(-1);
275// } 276 }
276 277
277// retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); 278 tsk = dlinkedlist_get(cq->taskqueue, (index = dlinkedlist_size(cq->taskqueue) - 1));
278// dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); 279 dlinkedlist_remove(cq->taskqueue, index);
279// mtx_unlock(cq->mtx);
280 280
281// return retval; 281 mtx_unlock(&cq->mtx);
282// }
283 282
283 return tsk;
284}
285
286static int consumer(void *cq) {
287 if(!cq)
288 thrd_exit(-1);
289
290 cqueue *real = (cqueue *)cq;
291 for(task *ctask;;) {
292 ctask = cqueue_waitpop(real);
293 if(!ctask)
294 task_fire(ctask);
295 }
296
297 thrd_exit(0);
298}
299
300int cqueue_registerthreads(cqueue * const cq, int threads) {
301 if(!cq || threads <= 0)
302 RETURNWERR(EINVAL, -1);
303
304 mtx_lock(&cq->mtx);
305 if(cq->canceled) {
306 mtx_unlock(&cq->mtx);
307 RETURNWERR(ECANCELED, -1);
308 }
284 309
310 thrd_t *newthreads[threads];
311 for(int i = 0; i < threads; i++) {
312 newthreads[i] = VALLOC(1, sizeof(thrd_t));
313 if(!newthreads[i]) {
314 for(int j = 0; j < i; j++)
315 free(newthreads[j]);
285 316
286typedef struct tp { 317 return -1;
287 thrd_t **threads; // thrd_t *threads[] 318 }
288 int nthreads;
289 319
290 cqueue *taskqueue; 320 dlinkedlist_prepend(cq->rthreads, newthreads[i], free);
291} threadpool; 321 thrd_create(newthreads[i], consumer, cq);
322 }
323
324 mtx_unlock(&cq->mtx);
325
326 return 0;
327}
328
329int cqueue_registerthread(cqueue * const cq) {
330 return cqueue_registerthreads(cq, 1);
331}
332
333enum __CQUEUE_STAT_OPTIONS {
334 __CQUEUE_STAT_NOTDEF,
335
336 __CQUEUE_CANCELED,
337 __CQUEUE_THREADS_NUM,
338 __CQUEUE_TASKS_NUM,
339
340 __CQUEUE_STAT_TOOBIG,
341};
342
343int cqueue_getstat(cqueue * const cq, enum __CQUEUE_STAT_OPTIONS opt) {
344 if(!cq || opt <= __CQUEUE_STAT_NOTDEF || opt >= __CQUEUE_STAT_TOOBIG)
345 RETURNWERR(EINVAL, -1);
346
347 int retval = -1;
348 mtx_lock(&cq->mtx);
349
350 switch(opt) {
351 case __CQUEUE_CANCELED:
352 retval = cq->canceled;
353 mtx_unlock(&cq->mtx);
354 return retval;
355 break;
356
357 case __CQUEUE_THREADS_NUM:
358 retval = dlinkedlist_size(cq->rthreads);
359 mtx_unlock(&cq->mtx);
360 return retval;
361 break;
362
363 case __CQUEUE_TASKS_NUM:
364 retval = dlinkedlist_size(cq->taskqueue);
365 mtx_unlock(&cq->mtx);
366 return retval;
367 break;
368
369 default:
370 RETURNWERR(EINVAL, -1);
371 break;
372 }
373
374 // This should absolutely never run
375 RETURNWERR(ENOTRECOVERABLE, -1);
376}
377
378int cqueue_iscanceled(cqueue * const cq) {
379 return cqueue_getstat(cq, __CQUEUE_CANCELED);
380}
381int cqueue_numthreads(cqueue * const cq) {
382 return cqueue_getstat(cq, __CQUEUE_THREADS_NUM);
383}
384int cqueue_numtasks(cqueue * const cq) {
385 return cqueue_getstat(cq, __CQUEUE_TASKS_NUM);
386}
diff --git a/src/threadpool.h b/src/threadpool.h
index db6fa2e..d7b713c 100644
--- a/src/threadpool.h
+++ b/src/threadpool.h
@@ -9,12 +9,18 @@ typedef struct cq cqueue;
9typedef struct tp threadpool; 9typedef struct tp threadpool;
10 10
11task * task_init(task_callback cb, void *arg); 11task * task_init(task_callback cb, void *arg);
12void task_free(task *ts); 12void task_free(void *ts);
13int task_fire(task *ts); 13int task_fire(task *ts);
14 14
15cqueue * cqueue_init(); 15cqueue * cqueue_init();
16void cqueue_cancel(cqueue * const cq); 16void cqueue_cancel(cqueue * const cq);
17void cqueue_free(void *cq); 17void cqueue_free(void *cq);
18 18int cqueue_addtask(cqueue * const cq, task * const tsk);
19task * cqueue_waitpop(cqueue * const cq);
20int cqueue_registerthreads(cqueue * const cq, int threads);
21int cqueue_registerthread(cqueue * const cq);
22int cqueue_iscanceled(cqueue * const cq);
23int cqueue_numthreads(cqueue * const cq);
24int cqueue_numtasks(cqueue * const cq);
19 25
20#endif \ No newline at end of file 26#endif \ No newline at end of file