summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/threadpool.c159
-rw-r--r--src/threadpool.h8
2 files changed, 92 insertions, 75 deletions
diff --git a/src/threadpool.c b/src/threadpool.c
index e1c11aa..0baa024 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -131,95 +131,114 @@ typedef struct cq {
131 cnd_t cnd; 131 cnd_t cnd;
132 132
133 unsigned char canceled; 133 unsigned char canceled;
134
135} cqueue; 134} cqueue;
136 135
137 136
137static void ___ucleanup_mtxd(void *mtx) {
138 if(!mtx)
139 return;
138 140
139// static void ___ucleanup_dfree(void *dll) { 141 mtx_destroy((mtx_t*)mtx);
140// if(!dll) 142 return;
141// return; 143}
142 144
143// dlinkedlist_free((dlinkedlist *)dll); 145static void ___ucleanup_cndd(void *cnd) {
144// return; 146 if(!cnd)
145// } 147 return;
146 148
147// static void ___ucleanup_cndd(void *cnd) { 149 cnd_destroy((cnd_t *)cnd);
148// if(!cnd) 150 return;
149// return; 151}
150 152
151// cnd_destroy((cnd_t *)cnd); 153cqueue * cqueue_init() {
152// return; 154 cleanup_CREATE(10);
153// } 155
156 // Create base object
157 cqueue *cq = VALLOC(1, sizeof(*cq));
158 if(!cq)
159 RETURNWERR(errno, NULL);
160 cleanup_REGISTER(free, cq);
161 cq->canceled = 0;
154 162
155// static void ___ucleanup_mtxd(void *mtx) { 163 // Initialize the mutex
156// if(!mtx) 164 if(mtx_init(&cq->mtx, mtx_plain) != thrd_success)
157// return; 165 cleanup_MARK();
166 cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx);
167
168 // Initialize the conditional
169 if(!cleanup_ERRORFLAGGED)
170 if(cnd_init(&cq->cnd) != thrd_success)
171 cleanup_MARK();
172 cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd);
173
174 // Create the taskqueue
175 if(!cleanup_ERRORFLAGGED)
176 if(!(cq->taskqueue = dlinkedlist_init()))
177 cleanup_MARK();
178 cleanup_CNDREGISTER(dlinkedlist_free, cq->taskqueue);
179
180 // Create the thread list
181 if(!cleanup_ERRORFLAGGED)
182 if(!(cq->rthreads = dlinkedlist_init()))
183 cleanup_MARK();
184 cleanup_CNDREGISTER(dlinkedlist_free, cq->rthreads);
185
186 if(cleanup_ERRORFLAGGED)
187 cleanup_FIRE();
188
189 return cq;
190
191 // Lambdas would make this a million times easier, as I could wrap this whole thing in a while loop then run a bunch of in-line
192 // callbacks that do these operations and I wouldn't need this badness. That or I could use a goto, but I also hate that idea
193}
158 194
159// mtx_destroy((mtx_t*)mtx); 195void cqueue_cancel(cqueue * const cq) {
160// return; 196 if(!cq)
161// } 197 return;
162 198
163// cqueue * cqueue_init(int mtx_type) { 199 mtx_lock(&cq->mtx);
164// cleanup_CREATE(10);
165
166// cqueue *cq = VALLOC(1, sizeof(*cq));
167// if(!cq)
168// return NULL;
169// cleanup_REGISTER(free, cq);
170
171// cq->canceled = FALSE;
172// cq->list = dlinkedlist_init();
173// if(!cq->list)
174// cleanup_MARK();
175// cleanup_CNDREGISTER(___ucleanup_dfree, cq->list);
176
177// if(!cleanup_ERRORFLAGGED)
178// if(cnd_init(&cq->cnd) == thrd_error)
179// cleanup_MARK();
180// cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd);
181
182// if(!cleanup_ERRORFLAGGED)
183// if(mtx_init(&cq->mtx, mtx_type) != thrd_success)
184// cleanup_MARK();
185// cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx);
186 200
187// if(cleanup_ERRORFLAGGED) 201 if(cq->canceled) {
188// cleanup_FIRE(); 202 mtx_unlock(&cq->mtx);
203 return;
204 }
205 cq->canceled = 1;
189 206
190// return cq; 207 mtx_unlock(&cq->mtx);
191// } 208 cnd_broadcast(&cq->cnd);
209
210 return;
211}
192 212
193// void cqueue_cancel(cqueue *cq) { 213static int ___cqueue_join(void *t) {
194// if(!cq) 214 if(!t)
195// return; 215 return -1;
196 216
197// mtx_lock(cq->mtx); 217 int retval = 0;
198// if(cq->canceled) { 218 thrd_t thread = *((thrd_t*)t);
199// mtx_unlock(cq->mtx); 219 thrd_join(thread, &retval);
200// thrd_exit(-1); 220
201// } 221 return retval;
222}
202 223
203// cq->canceled++; 224void cqueue_free(void *cq) {
204// mtx_unlock(cq->mtx); 225 if(!cq)
205// cnd_broadcast(cq->cnd); 226 return;
206 227
207// return; 228 cqueue *real = (cqueue *)cq;
208// }
209 229
210// void cqueue_free(cqueue *cq) { 230 // Cancel threads and wait for them to exit
211// if(!cq) 231 cqueue_cancel(real);
212// return; 232 dlinkedlist_foreach(real->rthreads, ___cqueue_join);
213 233
214// cqueue_cancel(cq); 234 // Threads are dead, no need to worry about concurrency anymore
215// mtx_destroy(cq->mtx); 235 mtx_destroy(&real->mtx);
216// cnd_destroy(cq->cnd); 236 cnd_destroy(&real->cnd);
217// free(cq->mtx); 237 dlinkedlist_free(real->rthreads);
218// free(cq->cnd); 238 dlinkedlist_free(real->taskqueue);
219// dlinkedlist_free(cq->list);
220 239
221// return; 240 return;
222// } 241}
223 242
224// int cqueue_addtask(cqueue * const cq, task * const tsk) { 243// int cqueue_addtask(cqueue * const cq, task * const tsk) {
225// if(!cq || !tsk) 244// if(!cq || !tsk)
diff --git a/src/threadpool.h b/src/threadpool.h
index bd1b787..db6fa2e 100644
--- a/src/threadpool.h
+++ b/src/threadpool.h
@@ -12,11 +12,9 @@ task * task_init(task_callback cb, void *arg);
12void task_free(task *ts); 12void task_free(task *ts);
13int task_fire(task *ts); 13int task_fire(task *ts);
14 14
15cqueue * cqueue_init(int mtx_type); 15cqueue * cqueue_init();
16void cqueue_cancel(cqueue * const cq);
17void cqueue_free(void *cq);
16 18
17void cqueue_cancel(cqueue *cq);
18void cqueue_free(cqueue *cq);
19int cqueue_addtask(cqueue * const cq, task * const tsk);
20task * cqueue_waitpop(cqueue * const cq);
21 19
22#endif \ No newline at end of file 20#endif \ No newline at end of file