summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main.c32
-rw-r--r--src/shared.h2
-rw-r--r--src/threadpool.c314
-rw-r--r--src/threadpool.h24
4 files changed, 107 insertions, 265 deletions
diff --git a/src/main.c b/src/main.c
index 6af87ab..95b55ff 100644
--- a/src/main.c
+++ b/src/main.c
@@ -7,8 +7,9 @@
7 7
8#include <errno.h> 8#include <errno.h>
9#include <error.h> 9#include <error.h>
10#include <threads.h>
11#include <unistd.h>
10 12
11#include <stdio.h>
12int testcb(void *data) { 13int testcb(void *data) {
13 if(!data) 14 if(!data)
14 return -1; 15 return -1;
@@ -17,14 +18,31 @@ int testcb(void *data) {
17 return 0; 18 return 0;
18} 19}
19 20
21int consumer(void *cq) {
22 if(!cq)
23 return -1;
24
25 cqueue *rcq = (cqueue*)cq;
26 for(task *tsk = NULL;;) {
27 tsk = cqueue_waitpop(rcq);
28 if(!tsk)
29 thrd_exit(-1);
30
31 task_fire(tsk);
32 }
33
34 return 0;
35}
36
20int main() { 37int main() {
21 // error(1, ENOTSUP, "No main file lol"); 38 // error(1, ENOTSUP, "No main file lol");
22 39
23 threadpool *tp = threadpool_init(2); 40 thrd_t thread;
24 task *tsk = task_init(testcb, "This is some data"); 41 cqueue *cq = cqueue_init(mtx_plain);
25 threadpool_addtask(tp, tsk); 42 thrd_create(&thread, consumer, cq);
26 threadpool_join(tp); 43 cqueue_addtask(cq, task_init(testcb, (void*)"This is some data"));
27 threadpool_free(tp); 44 sleep(10);
45 cqueue_free(cq);
28 46
29 return 0; 47 return 0;
30} \ No newline at end of file 48} \ No newline at end of file
diff --git a/src/shared.h b/src/shared.h
index 9e7eaa8..825814e 100644
--- a/src/shared.h
+++ b/src/shared.h
@@ -4,6 +4,8 @@
4#include <stddef.h> 4#include <stddef.h>
5 5
6#define STATIC_ARRAY_LEN(arr) (sizeof((arr))/sizeof((arr)[0])) 6#define STATIC_ARRAY_LEN(arr) (sizeof((arr))/sizeof((arr)[0]))
7#define FALSE 0
8#define TRUE 1
7 9
8#define RETURNWERR(errval, retval) do {\ 10#define RETURNWERR(errval, retval) do {\
9 errno = (errval);\ 11 errno = (errval);\
diff --git a/src/threadpool.c b/src/threadpool.c
index ab0733d..9d00030 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -7,12 +7,6 @@
7#include <stdlib.h> 7#include <stdlib.h>
8#include <errno.h> 8#include <errno.h>
9 9
10/* Mutex: Lock a shared resource. Used to prevent race conditions when accessing / modifying some shared resource. A lock must
11// always be followed by an unlock
12
13// Semaphore: Send / wait on a signal; solves the consumer/producer problem. A function that sends should never wait, and a
14// function that waits should never send */
15
16// Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) 10// Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member)
17typedef struct mtxp { 11typedef struct mtxp {
18 void *data; 12 void *data;
@@ -86,12 +80,18 @@ int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd)
86// Threadpool: 80// Threadpool:
87 // Array of threads 81 // Array of threads
88 // Task Queue 82 // Task Queue
89 // Readiness semaphore 83 // Readiness semaphore / conditional
84 // Mutex
90 // Linked List of Tasks 85 // Linked List of Tasks
91 // Task: 86 // Task:
92 // int (*callback)(void*) 87 // int (*callback)(void*)
93 // void *arg 88 // void *arg
94 89
90// Consumer:
91 // Wait for cqueue to pop
92 // Fire task
93 // Repeat
94
95// Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 95// Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342
96 96
97typedef struct task { 97typedef struct task {
@@ -135,14 +135,28 @@ void task_free(task *ts) {
135 return; 135 return;
136} 136}
137 137
138int task_fire(task *ts) {
139 if(!ts)
140 RETURNWERR(EINVAL, -1);
138 141
139static void ___ucleanup_mtxd(void *mtx) { 142 return ts->cb(ts->arg);
140 if(!mtx) 143}
144
145
146/* Mutex: Lock a shared resource. Used to prevent race conditions when accessing / modifying some shared resource. A lock must
147// always be followed by an unlock
148
149// Semaphore: Send / wait on a signal; solves the consumer/producer problem. A function that sends should never wait, and a
150// function that waits should never send */
151
152static void ___ucleanup_dfree(void *dll) {
153 if(!dll)
141 return; 154 return;
142 155
143 mtx_destroy((mtx_t *)mtx); 156 dlinkedlist_free((dlinkedlist *)dll);
144 return; 157 return;
145} 158}
159
146static void ___ucleanup_cndd(void *cnd) { 160static void ___ucleanup_cndd(void *cnd) {
147 if(!cnd) 161 if(!cnd)
148 return; 162 return;
@@ -150,15 +164,15 @@ static void ___ucleanup_cndd(void *cnd) {
150 cnd_destroy((cnd_t *)cnd); 164 cnd_destroy((cnd_t *)cnd);
151 return; 165 return;
152} 166}
153static void ___ucleanup_dll(void *dll) { 167
154 if(!dll) 168static void ___ucleanup_mtxd(void *mtx) {
169 if(!mtx)
155 return; 170 return;
156 171
157 dlinkedlist_free((dlinkedlist *)dll); 172 mtx_destroy((mtx_t*)mtx);
158 return; 173 return;
159} 174}
160 175
161
162cqueue * cqueue_init(int mtx_type) { 176cqueue * cqueue_init(int mtx_type) {
163 cleanup_CREATE(10); 177 cleanup_CREATE(10);
164 178
@@ -167,284 +181,106 @@ cqueue * cqueue_init(int mtx_type) {
167 return NULL; 181 return NULL;
168 cleanup_REGISTER(free, cq); 182 cleanup_REGISTER(free, cq);
169 183
170 cq->mtx = VALLOC(1, sizeof(*(cq->mtx))); 184 cq->canceled = FALSE;
171 if(!(cq->mtx)) 185 cq->list = dlinkedlist_init();
186 if(!cq->list)
172 cleanup_MARK(); 187 cleanup_MARK();
173 cleanup_CNDREGISTER(free, cq->mtx); 188 cleanup_CNDREGISTER(___ucleanup_dfree, cq->list);
174
175 if(!cleanup_ERRORFLAGGED)
176 if(mtx_init(cq->mtx, mtx_type) != thrd_success)
177 cleanup_MARK();
178 cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx);
179 189
180 if(!cleanup_ERRORFLAGGED) 190 if(!cleanup_ERRORFLAGGED)
181 if(!(cq->cnd = VALLOC(1, sizeof(*(cq->cnd))))) 191 if(!(cq->cnd = VALLOC(1, sizeof(*cq->cnd))))
182 cleanup_MARK(); 192 cleanup_MARK();
183 cleanup_CNDREGISTER(free, cq->cnd); 193 cleanup_CNDREGISTER(free, cq->cnd);
184 194
185 if(!cleanup_ERRORFLAGGED) 195 if(!cleanup_ERRORFLAGGED)
186 if(cnd_init(cq->cnd) != thrd_success) 196 if(cnd_init(cq->cnd) == thrd_error)
187 cleanup_MARK(); 197 cleanup_MARK();
188 cleanup_CNDREGISTER(___ucleanup_cndd, cq->cnd); 198 cleanup_CNDREGISTER(___ucleanup_cndd, cq->cnd);
189 199
190 if(!cleanup_ERRORFLAGGED) 200 if(!cleanup_ERRORFLAGGED)
191 if(!(cq->list = dlinkedlist_init())) 201 if(!(cq->mtx = VALLOC(1, sizeof(*cq->mtx))))
192 cleanup_MARK(); 202 cleanup_MARK();
193 cleanup_CNDREGISTER(___ucleanup_dll, cq->list); 203 cleanup_CNDREGISTER(free, cq->mtx);
194
195 204
205 if(!cleanup_ERRORFLAGGED)
206 if(mtx_init(cq->mtx, mtx_type) != thrd_success)
207 cleanup_MARK();
208 cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx);
209
196 if(cleanup_ERRORFLAGGED) 210 if(cleanup_ERRORFLAGGED)
197 cleanup_fire(&__CLEANUP); 211 cleanup_FIRE();
198
199 // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions
200 // The implementation was not better lmao
201 212
202 cq->canceled = 0;
203 return cq; 213 return cq;
204} 214}
205 215
206void cqueue_free(cqueue *cq) { 216void cqueue_cancel(cqueue *cq) {
207 if(!cq) 217 if(!cq)
208 return; 218 return;
209 219
210 // Cancel any outstanding threads before freeing everything
211 cqueue_cancel(cq);
212
213 dlinkedlist_free(cq->list);
214 cnd_destroy(cq->cnd);
215 mtx_destroy(cq->mtx);
216 free(cq->cnd);
217 free(cq->mtx);
218 free(cq);
219
220 return;
221}
222
223int cqueue_append(cqueue * const cq, task *tsk) {
224 if(!cq || !tsk)
225 RETURNWERR(EINVAL, -1);
226
227 mtx_lock(cq->mtx); 220 mtx_lock(cq->mtx);
228 if(cq->canceled) { 221 if(cq->canceled) {
229 mtx_unlock(cq->mtx); 222 mtx_unlock(cq->mtx);
230 thrd_exit(thrd_timedout); 223 thrd_exit(-1);
231 } 224 }
232 225
233 dlinkedlist_append(cq->list, tsk, free); 226 cq->canceled++;
234 mtx_unlock(cq->mtx); 227 mtx_unlock(cq->mtx);
235 cnd_signal(cq->cnd); 228 cnd_broadcast(cq->cnd);
236 229
237 return 0; 230 return;
238} 231}
239 232
240int cqueue_prepend(cqueue * const cq, task *tsk) { 233void cqueue_free(cqueue *cq) {
241 if(!cq || !tsk) 234 if(!cq)
242 RETURNWERR(EINVAL, -1); 235 return;
243
244 mtx_lock(cq->mtx);
245 if(cq->canceled) {
246 mtx_unlock(cq->mtx);
247 thrd_exit(thrd_timedout);
248 }
249 236
250 dlinkedlist_prepend(cq->list, tsk, free); 237 cqueue_cancel(cq);
251 mtx_unlock(cq->mtx); 238 mtx_destroy(cq->mtx);
252 cnd_signal(cq->cnd); 239 cnd_destroy(cq->cnd);
240 free(cq->mtx);
241 free(cq->cnd);
242 dlinkedlist_free(cq->list);
253 243
254 return 0; 244 return;
255} 245}
256 246
257int cqueue_insert(cqueue * const cq, task *tsk, int index) { 247int cqueue_addtask(cqueue * const cq, task * const tsk) {
258 if(!cq || !tsk || index < 0) // Can't check to see if the index is too high without locking the mutex first 248 if(!cq || !tsk)
259 RETURNWERR(EINVAL, -1); 249 RETURNWERR(EINVAL, -1);
260 250
261 mtx_lock(cq->mtx); 251 mtx_lock(cq->mtx);
252
253 // TODO: Think about creating an "exception" via signal handling
262 if(cq->canceled) { 254 if(cq->canceled) {
263 mtx_unlock(cq->mtx); 255 mtx_unlock(cq->mtx);
264 thrd_exit(thrd_timedout); 256 thrd_exit(-1);
265 } 257 }
266 258
267 dlinkedlist_insert(cq->list, tsk, free, index); 259 dlinkedlist_prepend(cq->list, tsk, free);
268 mtx_unlock(cq->mtx); 260 mtx_unlock(cq->mtx);
269 cnd_signal(cq->cnd); 261 cnd_signal(cq->cnd);
270 262
271 return 0; 263 return 0;
272} 264}
273 265
274int cqueue_size(cqueue const * const cq) { 266task * cqueue_waitpop(cqueue * const cq) {
275 if(!cq) 267 if(!cq)
276 RETURNWERR(EINVAL, -1); 268 RETURNWERR(EINVAL, NULL);
277
278 mtx_lock(cq->mtx);
279 if(cq->canceled) {
280 mtx_unlock(cq->mtx);
281 thrd_exit(thrd_timedout);
282 }
283
284 int retval = dlinkedlist_size(cq->list);
285 mtx_unlock(cq->mtx);
286
287 return retval;
288}
289
290int cqueue_isempty(cqueue const * const cq) {
291 int val = cqueue_size(cq);
292 return (val < 0) ? -1 : (val == 0);
293}
294 269
295int cqueue_trypop(cqueue * const cq, task **ret) { 270 task *retval = NULL;
296 if(!cq || !ret || !*ret)
297 RETURNWERR(EINVAL, -1);
298 271
299 int retval = 0;
300
301 mtx_lock(cq->mtx); 272 mtx_lock(cq->mtx);
302 if(cq->canceled) { 273 while(dlinkedlist_isempty(cq->list) && !cq->canceled)
303 mtx_unlock(cq->mtx); 274 cnd_wait(cq->cnd, cq->mtx);
304 thrd_exit(thrd_timedout);
305 }
306
307 if(!dlinkedlist_isempty(cq->list)) {
308 *ret = (task*)dlinkedlist_poplast(cq->list);
309 retval = 1;
310 }
311 mtx_unlock(cq->mtx);
312
313 return retval;
314}
315
316int cqueue_waitpop(cqueue * const cq, task **ret) {
317 if(!cq || !ret)
318 RETURNWERR(EINVAL, -1);
319 275
320 mtx_lock(cq->mtx);
321
322 while(!dlinkedlist_isempty(cq->list) && !cq->canceled)
323 cnd_wait(cq->cnd, cq->mtx); // Unlocks mutex while waiting, acquires lock once waiting is done
324
325 if(cq->canceled) { 276 if(cq->canceled) {
326 mtx_unlock(cq->mtx); 277 mtx_unlock(cq->mtx);
327 thrd_exit(thrd_timedout); 278 thrd_exit(-1);
328 } 279 }
329
330 *ret = dlinkedlist_poplast(cq->list);
331
332 mtx_unlock(cq->mtx);
333
334 return 0;
335}
336 280
337int cqueue_cancel(cqueue * const cq) { 281 retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1);
338 if(!cq) 282 dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1);
339 RETURNWERR(EINVAL, -1);
340
341 int retval = 0;
342
343 mtx_lock(cq->mtx);
344 if(cq->canceled)
345 retval = -1;
346 else
347 cq->canceled++;
348
349 mtx_unlock(cq->mtx); 283 mtx_unlock(cq->mtx);
350 cnd_broadcast(cq->cnd);
351
352 return retval;
353}
354
355int cqueue_consumer(void *passed) {
356 if(!passed)
357 thrd_exit(thrd_error);
358 // Not setting errno because then I'd have to make a mutex for it
359
360 cqueue *cq = (cqueue *)passed;
361
362 for(task *current_task;;) {
363 cqueue_waitpop(cq, &current_task);
364 if(!current_task)
365 thrd_exit(thrd_error);
366
367 current_task->cb(current_task->arg);
368 }
369
370 thrd_exit(thrd_success);
371}
372
373static void ___ucleanup_cqfree(void *cq) {
374 if(!cq)
375 return;
376
377 cqueue_free(cq);
378 return;
379}
380
381threadpool * threadpool_init(int threads) {
382 if(threads < 1)
383 RETURNWERR(EINVAL, NULL);
384 cleanup_CREATE(10);
385
386 threadpool *tp = VALLOC(1, sizeof(*tp));
387 if(!tp)
388 return NULL;
389 cleanup_REGISTER(free, tp);
390
391 tp->taskqueue = cqueue_init(mtx_plain);
392 if(!tp->taskqueue)
393 cleanup_MARK();
394 cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue);
395
396 if(!cleanup_ERRORFLAGGED)
397 if(!(tp->threads = VALLOC(threads, sizeof(*tp->threads))))
398 cleanup_MARK();
399 cleanup_CNDREGISTER(free, tp->threads);
400
401 for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) {
402 tp->threads[i] = VALLOC(1, sizeof(**tp->threads));
403 if(!tp->threads[i]) {
404 cleanup_MARK();
405 for(int j = 0; j < i; j++)
406 free(tp->threads[j]);
407 }
408
409 if(!cleanup_ERRORFLAGGED)
410 thrd_create(tp->threads[i], cqueue_consumer, tp->taskqueue);
411 // TODO: Error Checking ^
412 }
413
414 if(cleanup_ERRORFLAGGED)
415 cleanup_FIRE();
416 else
417 tp->nthreads = threads;
418
419 return tp;
420}
421
422void threadpool_free(threadpool *tp) {
423 if(!tp)
424 return;
425 284
426 cqueue_free(tp->taskqueue); 285 return retval;
427 for(int i = 0; i < tp->nthreads; i++)
428 free(tp->threads[i]);
429 free(tp->threads);
430 free(tp);
431
432 return;
433}
434
435int threadpool_addtask(threadpool * const tp, task * const task) {
436 if(!tp || !task)
437 RETURNWERR(EINVAL, -1);
438
439 return cqueue_append(tp->taskqueue, task);
440}
441
442int threadpool_join(const threadpool * const tp) {
443 if(!tp)
444 RETURNWERR(EINVAL, -1);
445
446 for(int i = 0; i < tp->nthreads; i++)
447 thrd_join(*(tp->threads[i]), NULL);
448
449 return 0;
450} \ No newline at end of file 286} \ No newline at end of file
diff --git a/src/threadpool.h b/src/threadpool.h
index bc3ce06..bd1b787 100644
--- a/src/threadpool.h
+++ b/src/threadpool.h
@@ -10,27 +10,13 @@ typedef struct tp threadpool;
10 10
11task * task_init(task_callback cb, void *arg); 11task * task_init(task_callback cb, void *arg);
12void task_free(task *ts); 12void task_free(task *ts);
13int task_fire(task *ts);
13 14
14cqueue * cqueue_init(int mtx_type); 15cqueue * cqueue_init(int mtx_type);
15void cqueue_free(cqueue *cq);
16int cqueue_append(cqueue * const cq, task *tsk);
17int cqueue_prepend(cqueue * const cq, task *tsk);
18int cqueue_insert(cqueue * const cq, task *tsk, int index);
19int cqueue_size(cqueue const * const cq);
20int cqueue_isempty(cqueue const * const cq);
21int cqueue_trypop(cqueue * const cq, task **ret);
22int cqueue_waitpop(cqueue * const cq, task **ret);
23int cqueue_cancel(cqueue * const cq);
24
25threadpool * threadpool_init(int threads);
26void threadpool_free(threadpool *tp);
27int threadpool_addtask(threadpool * const tp, task * const task);
28int threadpool_join(const threadpool * const tp);
29 16
30typedef struct mtxp mtxpair; 17void cqueue_cancel(cqueue *cq);
31mtxpair * mtxpair_init(void * const data, int type); 18void cqueue_free(cqueue *cq);
32void mtxpair_free(mtxpair *mp); 19int cqueue_addtask(cqueue * const cq, task * const tsk);
33int mtxpair_setdata(mtxpair * const mp, void * const data); 20task * cqueue_waitpop(cqueue * const cq);
34int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd);
35 21
36#endif \ No newline at end of file 22#endif \ No newline at end of file