summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
author@syxhe <https://t.me/syxhe>2025-04-22 16:35:44 -0500
committer@syxhe <https://t.me/syxhe>2025-04-22 16:35:44 -0500
commit0ee5044805c8d157d5023fc1322f980e3a480df7 (patch)
tree0cbdcbb5286a3c776c12d490afb1e24c40771294 /src
parentc704a8a382c231e066a7fbf0402a33455c40b8f5 (diff)
Start work on threadpool implementation
Diffstat (limited to 'src')
-rw-r--r--src/shared.h31
-rw-r--r--src/threadpool.c160
-rw-r--r--src/threadpool.h21
3 files changed, 189 insertions, 23 deletions
diff --git a/src/shared.h b/src/shared.h
index 7034255..9e7eaa8 100644
--- a/src/shared.h
+++ b/src/shared.h
@@ -69,14 +69,31 @@ typedef struct cl {
69 int used; 69 int used;
70} cleanup; 70} cleanup;
71 71
72// Initialize a local cleanup stack. `loc`, `funcs` and `args` need to be locally defined, non allocated arrays, and must be at least `size` elements large
72int cleanup_init(cleanup * const loc, int size, cleanup_callback funcs[], void *args[]); 73int cleanup_init(cleanup * const loc, int size, cleanup_callback funcs[], void *args[]);
74
75// Register a cleanup callback for a given cleanup object
73int cleanup_register(cleanup * const loc, cleanup_callback cb, void *arg); 76int cleanup_register(cleanup * const loc, cleanup_callback cb, void *arg);
77
78// Register a cleanup callback, if and only if `flag == 0`
74int cleanup_cndregister(cleanup * const loc, unsigned char flag, cleanup_callback cb, void *arg); 79int cleanup_cndregister(cleanup * const loc, unsigned char flag, cleanup_callback cb, void *arg);
80
81// Clear the contents of a cleanup stack
75int cleanup_clear(cleanup * const loc); 82int cleanup_clear(cleanup * const loc);
83
84// Get the top callback without removing it from the cleanup stack
76cleanup_callback cleanup_peekf(cleanup * const loc); 85cleanup_callback cleanup_peekf(cleanup * const loc);
86
87// Get and remove the top callback from the cleanup stack. Does not return the argument for the given callback
77cleanup_callback cleanup_popf(cleanup * const loc); 88cleanup_callback cleanup_popf(cleanup * const loc);
89
90// Get the top argument without removing it from the cleanup stack
78void * cleanup_peeka(cleanup * const loc); 91void * cleanup_peeka(cleanup * const loc);
92
93// Get and remove the top argument from the cleanup stack. Does not return the callback it was to be fed into
79void * cleanup_popa(cleanup * const loc); 94void * cleanup_popa(cleanup * const loc);
95
96// Fire all the callbacks in the cleanup stack
80int cleanup_fire(cleanup * const loc); 97int cleanup_fire(cleanup * const loc);
81 98
82/* Cleanup environment creator. Automatically defines the variables `__CLEANUP`, `__CLEANUP_FUNCS`, and `__CLEANUP_ARGS` and initializes 99/* Cleanup environment creator. Automatically defines the variables `__CLEANUP`, `__CLEANUP_FUNCS`, and `__CLEANUP_ARGS` and initializes
@@ -85,9 +102,19 @@ int cleanup_fire(cleanup * const loc);
85cleanup __CLEANUP; \ 102cleanup __CLEANUP; \
86cleanup_callback __CLEANUP_FUNCS[(size)]; \ 103cleanup_callback __CLEANUP_FUNCS[(size)]; \
87void *__CLEANUP_ARGS[(size)]; \ 104void *__CLEANUP_ARGS[(size)]; \
105unsigned char __FLAG = 0; \
88cleanup_init(&__CLEANUP, (size), __CLEANUP_FUNCS, __CLEANUP_ARGS) 106cleanup_init(&__CLEANUP, (size), __CLEANUP_FUNCS, __CLEANUP_ARGS)
89 107
90#define cleanup_REGISTER(cb, arg) cleanup_register(&__CLEANUP, (cb), (arg)) 108#define cleanup_REGISTER(cb, arg) cleanup_register(&__CLEANUP, (cb), (arg))
91#define cleanup_CNDREGISTER(flag, cb, arg) cleanup_cndregister(&__CLEANUP, (flag), (cb), (arg)) 109#define cleanup_CNDREGISTER(cb, arg) cleanup_cndregister(&__CLEANUP, __FLAG, (cb), (arg))
110#define cleanup_CLEAR() cleanup_clear(&__CLEANUP)
111#define cleanup_PEEKF() cleanup_peekf(&__CLEANUP)
112#define cleanup_POPF() cleanup_popf(&__CLEANUP)
113#define cleanup_PEEKA() cleanup_peeka(&__CLEANUP)
114#define cleanup_POPA() cleanup_popa(&__CLEANUP)
115#define cleanup_FIRE() cleanup_fire(&__CLEANUP)
116#define cleanup_MARK() (__FLAG = 1)
117#define cleanup_UNMARK() (__FLAG = 0)
118#define cleanup_ERRORFLAGGED (__FLAG != 0)
92 119
93#endif \ No newline at end of file 120#endif \ No newline at end of file
diff --git a/src/threadpool.c b/src/threadpool.c
index 56dcd6b..66c0d06 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -21,12 +21,12 @@ typedef struct mtxp {
21} mtxpair; 21} mtxpair;
22 22
23mtxpair * mtxpair_init(void * const data, int type) { 23mtxpair * mtxpair_init(void * const data, int type) {
24 mtxpair *mtxp = malloc(1 * sizeof(*mtxp)); 24 mtxpair *mtxp = VALLOC(1, sizeof(*mtxp));
25 if(!mtxp) 25 if(!mtxp)
26 return NULL; 26 return NULL;
27 27
28 // Make room for the mutex 28 // Make room for the mutex
29 mtxp->mtx = malloc(1 * sizeof(*mtxp->mtx)); 29 mtxp->mtx = VALLOC(1, sizeof(*mtxp->mtx));
30 if(!mtxp->mtx) { 30 if(!mtxp->mtx) {
31 free(mtxp); 31 free(mtxp);
32 return NULL; 32 return NULL;
@@ -95,7 +95,6 @@ int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd)
95 95
96// Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 96// Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342
97 97
98typedef int (*task_callback)(void*);
99typedef struct task { 98typedef struct task {
100 task_callback cb; 99 task_callback cb;
101 void *arg; 100 void *arg;
@@ -105,6 +104,7 @@ typedef struct cq {
105 dlinkedlist *list; 104 dlinkedlist *list;
106 mtx_t *mutex; 105 mtx_t *mutex;
107 cnd_t *conditional; 106 cnd_t *conditional;
107 unsigned char canceled;
108} cqueue; 108} cqueue;
109 109
110typedef struct tp { 110typedef struct tp {
@@ -114,6 +114,7 @@ typedef struct tp {
114 cqueue *taskqueue; 114 cqueue *taskqueue;
115} threadpool; 115} threadpool;
116 116
117
117task * task_init(task_callback cb, void *arg) { 118task * task_init(task_callback cb, void *arg) {
118 if(cb == NULL) 119 if(cb == NULL)
119 RETURNWERR(EINVAL, NULL); 120 RETURNWERR(EINVAL, NULL);
@@ -160,7 +161,6 @@ static void ___ucleanup_dll(void *dll) {
160 161
161 162
162cqueue * cqueue_init(int mtx_type) { 163cqueue * cqueue_init(int mtx_type) {
163 unsigned char flag = 0;
164 cleanup_CREATE(10); 164 cleanup_CREATE(10);
165 165
166 cqueue *cq = VALLOC(1, sizeof(*cq)); 166 cqueue *cq = VALLOC(1, sizeof(*cq));
@@ -170,31 +170,32 @@ cqueue * cqueue_init(int mtx_type) {
170 170
171 cq->mutex = VALLOC(1, sizeof(*(cq->mutex))); 171 cq->mutex = VALLOC(1, sizeof(*(cq->mutex)));
172 if(!(cq->mutex)) 172 if(!(cq->mutex))
173 flag++; 173 cleanup_MARK();
174 cleanup_CNDREGISTER(flag, free, cq->mutex); 174 cleanup_CNDREGISTER(free, cq->mutex);
175 175
176 if(!flag && mtx_init(cq->mutex, mtx_type) != thrd_success) 176 if(cleanup_ERRORFLAGGED && mtx_init(cq->mutex, mtx_type) != thrd_success)
177 flag++; 177 cleanup_MARK();
178 cleanup_CNDREGISTER(flag, ___ucleanup_mtxd, cq->mutex); 178 cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mutex);
179 179
180 if(!flag && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional))))) 180 if(cleanup_ERRORFLAGGED && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional)))))
181 flag++; 181 cleanup_MARK();
182 cleanup_CNDREGISTER(flag, free, cq->conditional); 182 cleanup_CNDREGISTER(free, cq->conditional);
183 183
184 if(!flag && cnd_init(cq->conditional) != thrd_success) 184 if(cleanup_ERRORFLAGGED && cnd_init(cq->conditional) != thrd_success)
185 flag++; 185 cleanup_MARK();
186 cleanup_CNDREGISTER(flag, ___ucleanup_cndd, cq->conditional); 186 cleanup_CNDREGISTER(___ucleanup_cndd, cq->conditional);
187 187
188 cq->list = dlinkedlist_init(); 188 cq->list = dlinkedlist_init();
189 if(!flag && !cq->list) 189 if(cleanup_ERRORFLAGGED && !cq->list)
190 flag++; 190 cleanup_MARK();
191 cleanup_CNDREGISTER(flag, ___ucleanup_dll, cq->list); 191 cleanup_CNDREGISTER(___ucleanup_dll, cq->list);
192 192
193 if(flag) 193 if(cleanup_ERRORFLAGGED)
194 cleanup_fire(&__CLEANUP); 194 cleanup_fire(&__CLEANUP);
195 195
196 // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions 196 // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions
197 197
198 cq->canceled = 0;
198 return cq; 199 return cq;
199} 200}
200 201
@@ -202,6 +203,9 @@ void cqueue_free(cqueue *cq) {
202 if(!cq) 203 if(!cq)
203 return; 204 return;
204 205
206 // Cancel any outstanding threads before freeing everything
207 cqueue_cancel(cq);
208
205 dlinkedlist_free(cq->list); 209 dlinkedlist_free(cq->list);
206 cnd_destroy(cq->conditional); 210 cnd_destroy(cq->conditional);
207 mtx_destroy(cq->mutex); 211 mtx_destroy(cq->mutex);
@@ -217,6 +221,11 @@ int cqueue_append(cqueue * const cq, task *tsk) {
217 RETURNWERR(EINVAL, -1); 221 RETURNWERR(EINVAL, -1);
218 222
219 mtx_lock(cq->mutex); 223 mtx_lock(cq->mutex);
224 if(cq->canceled) {
225 mtx_unlock(cq->mutex);
226 thrd_exit(thrd_timedout);
227 }
228
220 dlinkedlist_append(cq->list, tsk, free); 229 dlinkedlist_append(cq->list, tsk, free);
221 mtx_unlock(cq->mutex); 230 mtx_unlock(cq->mutex);
222 cnd_signal(cq->conditional); 231 cnd_signal(cq->conditional);
@@ -229,6 +238,11 @@ int cqueue_prepend(cqueue * const cq, task *tsk) {
229 RETURNWERR(EINVAL, -1); 238 RETURNWERR(EINVAL, -1);
230 239
231 mtx_lock(cq->mutex); 240 mtx_lock(cq->mutex);
241 if(cq->canceled) {
242 mtx_unlock(cq->mutex);
243 thrd_exit(thrd_timedout);
244 }
245
232 dlinkedlist_prepend(cq->list, tsk, free); 246 dlinkedlist_prepend(cq->list, tsk, free);
233 mtx_unlock(cq->mutex); 247 mtx_unlock(cq->mutex);
234 cnd_signal(cq->conditional); 248 cnd_signal(cq->conditional);
@@ -241,6 +255,11 @@ int cqueue_insert(cqueue * const cq, task *tsk, int index) {
241 RETURNWERR(EINVAL, -1); 255 RETURNWERR(EINVAL, -1);
242 256
243 mtx_lock(cq->mutex); 257 mtx_lock(cq->mutex);
258 if(cq->canceled) {
259 mtx_unlock(cq->mutex);
260 thrd_exit(thrd_timedout);
261 }
262
244 dlinkedlist_insert(cq->list, tsk, free, index); 263 dlinkedlist_insert(cq->list, tsk, free, index);
245 mtx_unlock(cq->mutex); 264 mtx_unlock(cq->mutex);
246 cnd_signal(cq->conditional); 265 cnd_signal(cq->conditional);
@@ -253,6 +272,11 @@ int cqueue_size(cqueue const * const cq) {
253 RETURNWERR(EINVAL, -1); 272 RETURNWERR(EINVAL, -1);
254 273
255 mtx_lock(cq->mutex); 274 mtx_lock(cq->mutex);
275 if(cq->canceled) {
276 mtx_unlock(cq->mutex);
277 thrd_exit(thrd_timedout);
278 }
279
256 int retval = dlinkedlist_size(cq->list); 280 int retval = dlinkedlist_size(cq->list);
257 mtx_unlock(cq->mutex); 281 mtx_unlock(cq->mutex);
258 282
@@ -260,7 +284,8 @@ int cqueue_size(cqueue const * const cq) {
260} 284}
261 285
262int cqueue_isempty(cqueue const * const cq) { 286int cqueue_isempty(cqueue const * const cq) {
263 return (cqueue_size(cq) == 0); 287 int val = cqueue_size(cq);
288 return (val < 0) ? -1 : (val == 0);
264} 289}
265 290
266int cqueue_trypop(cqueue * const cq, task **ret) { 291int cqueue_trypop(cqueue * const cq, task **ret) {
@@ -270,6 +295,11 @@ int cqueue_trypop(cqueue * const cq, task **ret) {
270 int retval = 0; 295 int retval = 0;
271 296
272 mtx_lock(cq->mutex); 297 mtx_lock(cq->mutex);
298 if(cq->canceled) {
299 mtx_unlock(cq->mutex);
300 thrd_exit(thrd_timedout);
301 }
302
273 if(!dlinkedlist_isempty(cq->list)) { 303 if(!dlinkedlist_isempty(cq->list)) {
274 *ret = (task*)dlinkedlist_poplast(cq->list); 304 *ret = (task*)dlinkedlist_poplast(cq->list);
275 retval = 1; 305 retval = 1;
@@ -284,10 +314,98 @@ int cqueue_waitpop(cqueue * const cq, task **ret) {
284 RETURNWERR(EINVAL, -1); 314 RETURNWERR(EINVAL, -1);
285 315
286 mtx_lock(cq->mutex); 316 mtx_lock(cq->mutex);
287 while(!dlinkedlist_isempty(cq->list)) 317
318 while(!dlinkedlist_isempty(cq->list) && !cq->canceled)
288 cnd_wait(cq->conditional, cq->mutex); // Unlocks mutex while waiting, acquires lock once waiting is done 319 cnd_wait(cq->conditional, cq->mutex); // Unlocks mutex while waiting, acquires lock once waiting is done
320
321 if(cq->canceled) {
322 mtx_unlock(cq->mutex);
323 thrd_exit(thrd_timedout);
324 }
325
289 *ret = dlinkedlist_poplast(cq->list); 326 *ret = dlinkedlist_poplast(cq->list);
327
290 mtx_unlock(cq->mutex); 328 mtx_unlock(cq->mutex);
291 329
292 return 0; 330 return 0;
331}
332
333int cqueue_cancel(cqueue * const cq) {
334 if(!cq)
335 RETURNWERR(EINVAL, -1);
336
337 int retval = 0;
338
339 mtx_lock(cq->mutex);
340 if(cq->canceled)
341 retval = -1;
342 else
343 cq->canceled++;
344
345 mtx_unlock(cq->mutex);
346 cnd_broadcast(cq->conditional);
347
348 return retval;
349}
350
351
352static void ___ucleanup_cqfree(void *cq) {
353 if(!cq)
354 return;
355
356 cqueue_free(cq);
357 return;
358}
359
360threadpool * threadpool_init(int threads) {
361 if(threads < 1)
362 RETURNWERR(EINVAL, NULL);
363 cleanup_CREATE(10);
364
365 threadpool *tp = VALLOC(1, sizeof(*tp));
366 if(!tp)
367 return NULL;
368 cleanup_REGISTER(free, tp);
369
370 tp->taskqueue = cqueue_init(mtx_plain);
371 if(!tp->taskqueue)
372 cleanup_MARK();
373 cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue);
374
375 tp->threads = VALLOC(threads, sizeof(*tp->threads));
376 if(!tp->threads)
377 cleanup_MARK();
378 cleanup_CNDREGISTER(free, tp->threads);
379
380 for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) {
381 tp->threads[i] = VALLOC(1, sizeof(**tp->threads));
382 if(!tp->threads[i]) {
383 cleanup_MARK();
384 for(int j = 0; j < i; j++)
385 free(tp->threads[j]);
386 }
387 }
388
389 if(cleanup_ERRORFLAGGED)
390 cleanup_FIRE();
391 else
392 tp->nthreads = threads;
393
394 return tp;
395}
396
397void threadpool_free(threadpool *tp) {
398 if(!tp)
399 return;
400
401 cqueue_free(tp->taskqueue);
402 for(int i = 0; i < tp->nthreads; i++) {
403 thrd_detach(*tp->threads[i]);
404 free(tp->threads[i]);
405 }
406 free(tp->threads);
407 cqueue_free(tp->taskqueue);
408 free(tp);
409
410 return;
293} \ No newline at end of file 411} \ No newline at end of file
diff --git a/src/threadpool.h b/src/threadpool.h
index 8e3ee41..c28f03a 100644
--- a/src/threadpool.h
+++ b/src/threadpool.h
@@ -3,6 +3,25 @@
3 3
4#include <threads.h> 4#include <threads.h>
5 5
6typedef int (*task_callback)(void*);
7typedef struct task task;
8typedef struct cq cqueue;
9typedef struct tp threadpool;
10
11task * task_init(task_callback cb, void *arg);
12void task_free(task *ts);
13
14cqueue * cqueue_init(int mtx_type);
15void cqueue_free(cqueue *cq);
16int cqueue_append(cqueue * const cq, task *tsk);
17int cqueue_prepend(cqueue * const cq, task *tsk);
18int cqueue_insert(cqueue * const cq, task *tsk, int index);
19int cqueue_size(cqueue const * const cq);
20int cqueue_isempty(cqueue const * const cq);
21int cqueue_trypop(cqueue * const cq, task **ret);
22int cqueue_waitpop(cqueue * const cq, task **ret);
23int cqueue_cancel(cqueue * const cq);
24
6typedef struct mtxp mtxpair; 25typedef struct mtxp mtxpair;
7mtxpair * mtxpair_init(void * const data, int type); 26mtxpair * mtxpair_init(void * const data, int type);
8void mtxpair_free(mtxpair *mp); 27void mtxpair_free(mtxpair *mp);
@@ -10,4 +29,6 @@ int mtxpair_setdata(mtxpair * const mp, void * const data);
10 29
11int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd); 30int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd);
12 31
32int cqueue_cancel(cqueue * const cq);
33
13#endif \ No newline at end of file 34#endif \ No newline at end of file