summaryrefslogtreecommitdiff
path: root/src/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/threadpool.c')
-rw-r--r--src/threadpool.c264
1 files changed, 125 insertions, 139 deletions
diff --git a/src/threadpool.c b/src/threadpool.c
index 9d00030..e1c11aa 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -10,7 +10,7 @@
10// Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) 10// Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member)
11typedef struct mtxp { 11typedef struct mtxp {
12 void *data; 12 void *data;
13 mtx_t *mtx; 13 mtx_t mtx;
14} mtxpair; 14} mtxpair;
15 15
16mtxpair * mtxpair_init(void * const data, int type) { 16mtxpair * mtxpair_init(void * const data, int type) {
@@ -18,16 +18,9 @@ mtxpair * mtxpair_init(void * const data, int type) {
18 if(!mtxp) 18 if(!mtxp)
19 return NULL; 19 return NULL;
20 20
21 // Make room for the mutex
22 mtxp->mtx = VALLOC(1, sizeof(*mtxp->mtx));
23 if(!mtxp->mtx) {
24 free(mtxp);
25 return NULL;
26 }
27
28 // Init the mutex 21 // Init the mutex
29 if(mtx_init(mtxp->mtx, type) == thrd_error) { 22 if(mtx_init(&mtxp->mtx, type) == thrd_error) {
30 free(mtxp->mtx); free(mtxp); 23 free(mtxp);
31 RETURNWERR(errno, NULL); 24 RETURNWERR(errno, NULL);
32 } 25 }
33 26
@@ -39,8 +32,7 @@ void mtxpair_free(mtxpair *mp) {
39 if(!mp) 32 if(!mp)
40 return; 33 return;
41 34
42 mtx_destroy(mp->mtx); 35 mtx_destroy(&mp->mtx);
43 free(mp->mtx);
44 free(mp); 36 free(mp);
45 37
46 return; 38 return;
@@ -64,9 +56,9 @@ int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd)
64 if(!mtxd) 56 if(!mtxd)
65 RETURNWERR(EINVAL, thrd_error); 57 RETURNWERR(EINVAL, thrd_error);
66 58
67 if(mtx_lock(mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} 59 if(mtx_lock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);}
68 int retval = thrd_create(thr, func, mtxd->data); 60 int retval = thrd_create(thr, func, mtxd->data);
69 if(mtx_unlock(mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} 61 if(mtx_unlock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);}
70 62
71 return retval; 63 return retval;
72} 64}
@@ -99,21 +91,6 @@ typedef struct task {
99 void *arg; 91 void *arg;
100} task; 92} task;
101 93
102typedef struct cq {
103 dlinkedlist *list;
104 mtx_t *mtx;
105 cnd_t *cnd;
106 unsigned char canceled;
107} cqueue;
108
109typedef struct tp {
110 thrd_t **threads;
111 int nthreads;
112
113 cqueue *taskqueue;
114} threadpool;
115
116
117task * task_init(task_callback cb, void *arg) { 94task * task_init(task_callback cb, void *arg) {
118 if(cb == NULL) 95 if(cb == NULL)
119 RETURNWERR(EINVAL, NULL); 96 RETURNWERR(EINVAL, NULL);
@@ -138,149 +115,158 @@ void task_free(task *ts) {
138int task_fire(task *ts) { 115int task_fire(task *ts) {
139 if(!ts) 116 if(!ts)
140 RETURNWERR(EINVAL, -1); 117 RETURNWERR(EINVAL, -1);
118 if(ts->cb == NULL)
119 RETURNWERR(EINVAL, -1);
141 120
142 return ts->cb(ts->arg); 121 return ts->cb(ts->arg);
143} 122}
144 123
145 124
146/* Mutex: Lock a shared resource. Used to prevent race conditions when accessing / modifying some shared resource. A lock must
147// always be followed by an unlock
148 125
149// Semaphore: Send / wait on a signal; solves the consumer/producer problem. A function that sends should never wait, and a 126typedef struct cq {
150// function that waits should never send */ 127 dlinkedlist *taskqueue;
128 dlinkedlist *rthreads;
151 129
152static void ___ucleanup_dfree(void *dll) { 130 mtx_t mtx;
153 if(!dll) 131 cnd_t cnd;
154 return; 132
133 unsigned char canceled;
155 134
156 dlinkedlist_free((dlinkedlist *)dll); 135} cqueue;
157 return;
158}
159 136
160static void ___ucleanup_cndd(void *cnd) {
161 if(!cnd)
162 return;
163 137
164 cnd_destroy((cnd_t *)cnd);
165 return;
166}
167 138
168static void ___ucleanup_mtxd(void *mtx) { 139// static void ___ucleanup_dfree(void *dll) {
169 if(!mtx) 140// if(!dll)
170 return; 141// return;
171 142
172 mtx_destroy((mtx_t*)mtx); 143// dlinkedlist_free((dlinkedlist *)dll);
173 return; 144// return;
174} 145// }
146
147// static void ___ucleanup_cndd(void *cnd) {
148// if(!cnd)
149// return;
150
151// cnd_destroy((cnd_t *)cnd);
152// return;
153// }
154
155// static void ___ucleanup_mtxd(void *mtx) {
156// if(!mtx)
157// return;
175 158
176cqueue * cqueue_init(int mtx_type) { 159// mtx_destroy((mtx_t*)mtx);
177 cleanup_CREATE(10); 160// return;
161// }
162
163// cqueue * cqueue_init(int mtx_type) {
164// cleanup_CREATE(10);
178 165
179 cqueue *cq = VALLOC(1, sizeof(*cq)); 166// cqueue *cq = VALLOC(1, sizeof(*cq));
180 if(!cq) 167// if(!cq)
181 return NULL; 168// return NULL;
182 cleanup_REGISTER(free, cq); 169// cleanup_REGISTER(free, cq);
183 170
184 cq->canceled = FALSE; 171// cq->canceled = FALSE;
185 cq->list = dlinkedlist_init(); 172// cq->list = dlinkedlist_init();
186 if(!cq->list) 173// if(!cq->list)
187 cleanup_MARK(); 174// cleanup_MARK();
188 cleanup_CNDREGISTER(___ucleanup_dfree, cq->list); 175// cleanup_CNDREGISTER(___ucleanup_dfree, cq->list);
189
190 if(!cleanup_ERRORFLAGGED)
191 if(!(cq->cnd = VALLOC(1, sizeof(*cq->cnd))))
192 cleanup_MARK();
193 cleanup_CNDREGISTER(free, cq->cnd);
194 176
195 if(!cleanup_ERRORFLAGGED) 177// if(!cleanup_ERRORFLAGGED)
196 if(cnd_init(cq->cnd) == thrd_error) 178// if(cnd_init(&cq->cnd) == thrd_error)
197 cleanup_MARK(); 179// cleanup_MARK();
198 cleanup_CNDREGISTER(___ucleanup_cndd, cq->cnd); 180// cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd);
199 181
200 if(!cleanup_ERRORFLAGGED) 182// if(!cleanup_ERRORFLAGGED)
201 if(!(cq->mtx = VALLOC(1, sizeof(*cq->mtx)))) 183// if(mtx_init(&cq->mtx, mtx_type) != thrd_success)
202 cleanup_MARK(); 184// cleanup_MARK();
203 cleanup_CNDREGISTER(free, cq->mtx); 185// cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx);
204
205 if(!cleanup_ERRORFLAGGED)
206 if(mtx_init(cq->mtx, mtx_type) != thrd_success)
207 cleanup_MARK();
208 cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mtx);
209 186
210 if(cleanup_ERRORFLAGGED) 187// if(cleanup_ERRORFLAGGED)
211 cleanup_FIRE(); 188// cleanup_FIRE();
212 189
213 return cq; 190// return cq;
214} 191// }
215 192
216void cqueue_cancel(cqueue *cq) { 193// void cqueue_cancel(cqueue *cq) {
217 if(!cq) 194// if(!cq)
218 return; 195// return;
219 196
220 mtx_lock(cq->mtx); 197// mtx_lock(cq->mtx);
221 if(cq->canceled) { 198// if(cq->canceled) {
222 mtx_unlock(cq->mtx); 199// mtx_unlock(cq->mtx);
223 thrd_exit(-1); 200// thrd_exit(-1);
224 } 201// }
225 202
226 cq->canceled++; 203// cq->canceled++;
227 mtx_unlock(cq->mtx); 204// mtx_unlock(cq->mtx);
228 cnd_broadcast(cq->cnd); 205// cnd_broadcast(cq->cnd);
229 206
230 return; 207// return;
231} 208// }
232 209
233void cqueue_free(cqueue *cq) { 210// void cqueue_free(cqueue *cq) {
234 if(!cq) 211// if(!cq)
235 return; 212// return;
236 213
237 cqueue_cancel(cq); 214// cqueue_cancel(cq);
238 mtx_destroy(cq->mtx); 215// mtx_destroy(cq->mtx);
239 cnd_destroy(cq->cnd); 216// cnd_destroy(cq->cnd);
240 free(cq->mtx); 217// free(cq->mtx);
241 free(cq->cnd); 218// free(cq->cnd);
242 dlinkedlist_free(cq->list); 219// dlinkedlist_free(cq->list);
243 220
244 return; 221// return;
245} 222// }
246 223
247int cqueue_addtask(cqueue * const cq, task * const tsk) { 224// int cqueue_addtask(cqueue * const cq, task * const tsk) {
248 if(!cq || !tsk) 225// if(!cq || !tsk)
249 RETURNWERR(EINVAL, -1); 226// RETURNWERR(EINVAL, -1);
250 227
251 mtx_lock(cq->mtx); 228// mtx_lock(cq->mtx);
252 229
253 // TODO: Think about creating an "exception" via signal handling 230// // TODO: Think about creating an "exception" via signal handling
254 if(cq->canceled) { 231// if(cq->canceled) {
255 mtx_unlock(cq->mtx); 232// mtx_unlock(cq->mtx);
256 thrd_exit(-1); 233// thrd_exit(-1);
257 } 234// }
258 235
259 dlinkedlist_prepend(cq->list, tsk, free); 236// dlinkedlist_prepend(cq->list, tsk, free);
260 mtx_unlock(cq->mtx); 237// mtx_unlock(cq->mtx);
261 cnd_signal(cq->cnd); 238// cnd_signal(cq->cnd);
262 239
263 return 0; 240// return 0;
264} 241// }
265 242
266task * cqueue_waitpop(cqueue * const cq) { 243// task * cqueue_waitpop(cqueue * const cq) {
267 if(!cq) 244// if(!cq)
268 RETURNWERR(EINVAL, NULL); 245// RETURNWERR(EINVAL, NULL);
269 246
270 task *retval = NULL; 247// task *retval = NULL;
271 248
272 mtx_lock(cq->mtx); 249// mtx_lock(cq->mtx);
273 while(dlinkedlist_isempty(cq->list) && !cq->canceled) 250// while(dlinkedlist_isempty(cq->list) && !cq->canceled)
274 cnd_wait(cq->cnd, cq->mtx); 251// cnd_wait(cq->cnd, cq->mtx);
275 252
276 if(cq->canceled) { 253// if(cq->canceled) {
277 mtx_unlock(cq->mtx); 254// mtx_unlock(cq->mtx);
278 thrd_exit(-1); 255// thrd_exit(-1);
279 } 256// }
280 257
281 retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); 258// retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1);
282 dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); 259// dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1);
283 mtx_unlock(cq->mtx); 260// mtx_unlock(cq->mtx);
284 261
285 return retval; 262// return retval;
286} \ No newline at end of file 263// }
264
265
266
267typedef struct tp {
268 thrd_t **threads; // thrd_t *threads[]
269 int nthreads;
270
271 cqueue *taskqueue;
272} threadpool;