#include "threadpool.h" #include #include #include #include typedef struct task { gcallback callback; fcallback freecb; void *data; } task; typedef struct tqnode { struct tqnode *next; struct tqnode *prev; task *task; } tqnode; typedef struct taskqueue { tqnode *start; tqnode *end; unsigned int size; } taskqueue; typedef struct ctqueue { mtx_t mutex; cnd_t cond; unsigned char canceled; taskqueue *tq; thrd_t *thrdarr; int talen; } ctqueue; task * task_init(gcallback callback, fcallback freecb, void *data) { if(callback == NULL) ERRRET(EINVAL, NULL); task *tsk = calloc(1, sizeof(*tsk)); if(!tsk) return NULL; tsk->callback = callback; tsk->freecb = freecb; tsk->data = data; return tsk; } void task_free(void *tsk) { task *real = (task *)tsk; if(!real) return; if(real->freecb != NULL) real->freecb(real->data); free(real); return; } int task_fire(task *tsk) { if(!tsk) ERRRET(EINVAL, -1); return tsk->callback(tsk->data); } int task_fired(task *tsk) { int retval = task_fire(tsk); if(errno == EINVAL && retval == -1) {return -1;} task_free(tsk); return retval; } tqnode * tqnode_init(tqnode *next, tqnode *prev, task *tsk) { if(!tsk) ERRRET(EINVAL, NULL); tqnode *node = calloc(1, sizeof(*node)); if(!node) return NULL; node->next = next; node->prev = prev; node->task = tsk; return node; } void tqnode_free(void *tqn) { tqnode *real = (tqnode *)tqn; if(!real) return; task_free(real->task); free(real); return; } taskqueue * taskqueue_init(void) { taskqueue *tq = calloc(1, sizeof(*tq)); if(!tq) return NULL; tq->start = NULL; tq->end = NULL; tq->size = 0; return tq; } void taskqueue_free(void *tq) { if(!tq) return; for(tqnode *p = ((taskqueue*)tq)->start, *n; p != NULL;) { n = p->next; tqnode_free(p); p = n; } free(tq); return; } int taskqueue_handlefirst(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); if(tq->size) {return 0;} tqnode *first = tqnode_init(NULL, NULL, tsk); if(!first) return -1; tq->start = first; tq->end = first; tq->size = 1; return 1; } int taskqueue_push(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); int hf; if((hf = taskqueue_handlefirst(tq, tsk))) return (hf >= 0) ? 0 : -1; tqnode *newstart = tqnode_init(tq->start, NULL, tsk); if(!newstart) return -1; tq->start->prev = newstart; tq->start = newstart; tq->size++; return 0; } task * taskqueue_pop(taskqueue *tq) { if(!tq) ERRRET(EINVAL, NULL); if(tq->size <= 0) ERRRET(ENODATA, NULL); tqnode *end = tq->end; task *ret = end->task; if(tq->size == 1) { tq->end = NULL; tq->start = NULL; } else { tq->end = end->prev; tq->end->next = NULL; } free(end); tq->size--; return ret; } int taskqueue_pushfront(taskqueue *tq, task *tsk) { if(!tq || !tsk) ERRRET(EINVAL, -1); int hf; if((hf = taskqueue_handlefirst(tq, tsk))) return (hf >= 0) ? 0 : -1; tqnode *newend = tqnode_init(NULL, tq->end, tsk); if(!newend) return -1; tq->end->next = newend; tq->end = newend; tq->size++; return 0; } task * taskqueue_popback(taskqueue *tq) { if(!tq) ERRRET(EINVAL, NULL); if(tq->size <= 0) ERRRET(ENODATA, NULL); tqnode *start = tq->start; task *ret = start->task; if(tq->size == 1) { tq->start = NULL; tq->end = NULL; } else { tq->start = start->next; tq->start->prev = NULL; } free(start); tq->size--; return ret; } int taskqueue_size(taskqueue *tq) { if(!tq) ERRRET(EINVAL, -1); return tq->size; } // Internal helper macro for ctq functions. Acquires a lock via the ctq's mutex, checks to see if the queue has been canceled, then executes "code" as written #define __CTQ_INLOCK(ctq, retval, code) do {\ mtx_lock(&(ctq)->mutex); \ if((ctq)->canceled) { \ errno = ECANCELED; \ mtx_unlock(&(ctq)->mutex); \ return (retval); \ } \ \ code \ mtx_unlock(&(ctq)->mutex); \ } while (0) static void ___ucl_mtxdestroy(void *mtx) { if(!mtx) return; mtx_destroy((mtx_t *)mtx); return; } static void ___ucl_cnddestroy(void *cond) { if(cond) return; cnd_destroy((cnd_t *)cond); return; } ctqueue * ctqueue_init(int nthreads) { if(nthreads <= 0) ERRRET(EINVAL, NULL); cleanup_CREATE(6); ctqueue *ctq = calloc(1, sizeof(*ctq)); if(!ctq) return NULL; cleanup_REGISTER(free, ctq); ctq->canceled = 0; ctq->talen = nthreads; cleanup_CNDEXEC( ctq->tq = taskqueue_init(); if(!ctq->tq) cleanup_MARK(); cleanup_CNDREGISTER(taskqueue_free, ctq->tq); ); cleanup_CNDEXEC( if(mtx_init(&ctq->mutex, mtx_plain) != thrd_success) cleanup_MARK(); cleanup_CNDREGISTER(___ucl_mtxdestroy, (void*)&ctq->mutex); ); cleanup_CNDEXEC( if(cnd_init(&ctq->cond) != thrd_success) cleanup_MARK(); cleanup_CNDREGISTER(___ucl_cnddestroy, (void*)&ctq->cond); ); cleanup_CNDEXEC( ctq->thrdarr = calloc(ctq->talen, sizeof(thrd_t)); if(!ctq->thrdarr) cleanup_MARK(); cleanup_CNDREGISTER(free, ctq->thrdarr); ) cleanup_CNDFIRE(); if(cleanup_ERRORFLAGGED) return NULL; return ctq; } int ctqueue_cancel(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, -1); __CTQ_INLOCK(ctq, 1, ctq->canceled = 1; ); cnd_broadcast(&ctq->cond); return 0; } void ctqueue_free(void *ctq) { if(!ctq) return; ctqueue *real = (ctqueue *)ctq; ctqueue_cancel(real); for(int i = 0; i < real->talen; i++) thrd_join(real->thrdarr[i], NULL); // Threads are dead, everything's free game mtx_destroy(&real->mutex); cnd_destroy(&real->cond); taskqueue_free(real->tq); free(real->thrdarr); free(real); // TODO: figure out if it's necessary / a good idea to do error handling on these functions return; } int ctqueue_waitpush(ctqueue *ctq, task *tsk) { if(!ctq || !tsk) ERRRET(EINVAL, -1); int retval = 0; __CTQ_INLOCK(ctq, -1, retval = taskqueue_push(ctq->tq, tsk); ); if(retval == 0) cnd_signal(&ctq->cond); return retval; } task * ctqueue_waitpop(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, NULL); task *retval = NULL; __CTQ_INLOCK(ctq, NULL, while(taskqueue_size(ctq->tq) == 0 && !ctq->canceled) cnd_wait(&ctq->cond, &ctq->mutex); if(ctq->canceled) { mtx_unlock(&ctq->mutex); ERRRET(ECANCELED, NULL); } retval = taskqueue_pop(ctq->tq); ); return retval; } static int __CTQ_CONSUMER(void *ctq) { if(!ctq) {errno = EINVAL; thrd_exit(-1);} ctqueue *real = (ctqueue *)ctq; for(task *ctask = NULL;;) { ctask = ctqueue_waitpop(real); if(!ctask) break; task_fire(ctask); task_free(ctask); } thrd_exit(1); } // TODO: Make this function return 0 or -1 depending on whether the overall ctq has been canceled or not. Canceling shouldn't // be treated as an error int ctqueue_start(ctqueue *ctq) { if(!ctq) ERRRET(EINVAL, -1); ctq->canceled = 0; int retval = 0; for(int i = 0; i < ctq->talen; i++) if((retval = thrd_create(&ctq->thrdarr[i], __CTQ_CONSUMER, ctq)) != thrd_success) break; if(retval != thrd_success) ctqueue_cancel(ctq); return (retval == thrd_success) ? 0 : -1; }