#include "threadpool.h" #include "shared.h" #include "ll.h" #include #include #include #include #include // Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) typedef struct mtxp { void *data; mtx_t mtx; } mtxpair; mtxpair * mtxpair_init(void * const data, int type) { mtxpair *mtxp = VALLOC(1, sizeof(*mtxp)); if(!mtxp) return NULL; // Init the mutex if(mtx_init(&mtxp->mtx, type) == thrd_error) { free(mtxp); RETURNWERR(errno, NULL); } mtxp->data = data; return mtxp; } void mtxpair_free(mtxpair *mp) { if(!mp) return; mtx_destroy(&mp->mtx); free(mp); return; } int mtxpair_setdata(mtxpair * const mp, void * const data) { if(!mp) RETURNWERR(EINVAL, -1); mp->data = data; return 0; } // thrd_create which calls mtx_lock/unlock on `arg` automatically int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) { if(!thr) RETURNWERR(EINVAL, thrd_error); if(!func) RETURNWERR(EINVAL, thrd_error); if(!mtxd) RETURNWERR(EINVAL, thrd_error); if(mtx_lock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} int retval = thrd_create(thr, func, mtxd->data); if(mtx_unlock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} return retval; } /* Ok, after doing a little more research, the best way to do this is probaby via a producer/consumer architecture. Spawn a bunch of // threads waiting on a queue (via semaphore) and when one is notified pop a task of the queue and execute it. In this case, the // producer would be the filesystem scanner funciton providing new files to encrypt, and the consumers would be threads waiting // to encrypt them */ // Threadpool: // Array of threads // Task Queue // Readiness semaphore / conditional // Mutex // Linked List of Tasks // Task: // int (*callback)(void*) // void *arg // Consumer: // Wait for cqueue to pop // Fire task // Repeat // Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 typedef struct task { task_callback cb; void *arg; } task; task * task_init(task_callback cb, void *arg) { if(cb == NULL) RETURNWERR(EINVAL, NULL); task *task = VALLOC(1, sizeof(*task)); if(!task) return NULL; task->cb = cb; task->arg = arg; return task; } void task_free(void *ts) { if(!ts) return; free(ts); // Not making any assumptions about the data in the task return; } int task_fire(task *ts) { if(!ts) RETURNWERR(EINVAL, -1); if(ts->cb == NULL) RETURNWERR(EINVAL, -1); return ts->cb(ts->arg); } typedef struct cq { dlinkedlist *taskqueue; dlinkedlist *rthreads; mtx_t mtx; cnd_t cnd; unsigned char canceled; } cqueue; static void ___ucleanup_mtxd(void *mtx) { if(!mtx) return; mtx_destroy((mtx_t*)mtx); return; } static void ___ucleanup_cndd(void *cnd) { if(!cnd) return; cnd_destroy((cnd_t *)cnd); return; } cqueue * cqueue_init() { cleanup_CREATE(10); // Create base object cqueue *cq = VALLOC(1, sizeof(*cq)); if(!cq) RETURNWERR(errno, NULL); cleanup_REGISTER(free, cq); cq->canceled = 0; // Initialize the mutex if(mtx_init(&cq->mtx, mtx_plain) != thrd_success) cleanup_MARK(); cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx); // Initialize the conditional if(!cleanup_ERRORFLAGGED) if(cnd_init(&cq->cnd) != thrd_success) cleanup_MARK(); cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd); // Create the taskqueue if(!cleanup_ERRORFLAGGED) if(!(cq->taskqueue = dlinkedlist_init())) cleanup_MARK(); cleanup_CNDREGISTER(dlinkedlist_free, cq->taskqueue); // Create the thread list if(!cleanup_ERRORFLAGGED) if(!(cq->rthreads = dlinkedlist_init())) cleanup_MARK(); cleanup_CNDREGISTER(dlinkedlist_free, cq->rthreads); if(cleanup_ERRORFLAGGED) cleanup_FIRE(); return cq; // Lambdas would make this a million times easier, as I could wrap this whole thing in a while loop then run a bunch of in-line // callbacks that do these operations and I wouldn't need this badness. That or I could use a goto, but I also hate that idea } void cqueue_cancel(cqueue * const cq) { if(!cq) return; mtx_lock(&cq->mtx); if(cq->canceled) { mtx_unlock(&cq->mtx); return; } cq->canceled = 1; mtx_unlock(&cq->mtx); cnd_broadcast(&cq->cnd); return; } static int ___cqueue_join(void *t) { if(!t) return -1; int retval = 0; thrd_join(*((thrd_t*)t), &retval); return retval; } void cqueue_free(void *cq) { if(!cq) return; cqueue *real = (cqueue *)cq; // Cancel threads and wait for them to exit cqueue_cancel(real); dlinkedlist_foreach(real->rthreads, ___cqueue_join); // Threads are dead, no need to worry about concurrency anymore mtx_destroy(&real->mtx); cnd_destroy(&real->cnd); dlinkedlist_free(real->rthreads); dlinkedlist_free(real->taskqueue); return; } int cqueue_addtask(cqueue * const cq, task * const tsk) { if(!cq || !tsk) RETURNWERR(EINVAL, -1); mtx_lock(&cq->mtx); if(cq->canceled) { mtx_unlock(&cq->mtx); RETURNWERR(ECANCELED, -1); } dlinkedlist_prepend(cq->taskqueue, tsk, task_free); mtx_unlock(&cq->mtx); cnd_signal(&cq->cnd); return 0; } task * cqueue_waitpop(cqueue * const cq) { if(!cq) RETURNWERR(EINVAL, NULL); task *tsk = NULL; int index = -1; mtx_lock(&cq->mtx); while(dlinkedlist_size(cq->taskqueue) <= 0 && !cq->canceled) cnd_wait(&cq->cnd, &cq->mtx); if(cq->canceled) { mtx_unlock(&cq->mtx); thrd_exit(-1); } tsk = dlinkedlist_get(cq->taskqueue, (index = dlinkedlist_size(cq->taskqueue) - 1)); dlinkedlist_remove(cq->taskqueue, index); mtx_unlock(&cq->mtx); return tsk; } static int consumer(void *cq) { if(!cq) thrd_exit(-1); cqueue *real = (cqueue *)cq; for(task *ctask;;) { ctask = cqueue_waitpop(real); if(!ctask) task_fire(ctask); } thrd_exit(0); } int cqueue_registerthreads(cqueue * const cq, int threads) { if(!cq || threads <= 0) RETURNWERR(EINVAL, -1); mtx_lock(&cq->mtx); if(cq->canceled) { mtx_unlock(&cq->mtx); RETURNWERR(ECANCELED, -1); } thrd_t *newthreads[threads]; for(int i = 0; i < threads; i++) { newthreads[i] = VALLOC(1, sizeof(thrd_t)); if(!newthreads[i]) { for(int j = 0; j < i; j++) free(newthreads[j]); return -1; } dlinkedlist_prepend(cq->rthreads, newthreads[i], free); thrd_create(newthreads[i], consumer, cq); } mtx_unlock(&cq->mtx); return 0; } int cqueue_registerthread(cqueue * const cq) { return cqueue_registerthreads(cq, 1); } enum __CQUEUE_STAT_OPTIONS { __CQUEUE_STAT_NOTDEF, __CQUEUE_CANCELED, __CQUEUE_THREADS_NUM, __CQUEUE_TASKS_NUM, __CQUEUE_STAT_TOOBIG, }; int cqueue_getstat(cqueue * const cq, enum __CQUEUE_STAT_OPTIONS opt) { if(!cq || opt <= __CQUEUE_STAT_NOTDEF || opt >= __CQUEUE_STAT_TOOBIG) RETURNWERR(EINVAL, -1); int retval = -1; mtx_lock(&cq->mtx); switch(opt) { case __CQUEUE_CANCELED: retval = cq->canceled; mtx_unlock(&cq->mtx); return retval; break; case __CQUEUE_THREADS_NUM: retval = dlinkedlist_size(cq->rthreads); mtx_unlock(&cq->mtx); return retval; break; case __CQUEUE_TASKS_NUM: retval = dlinkedlist_size(cq->taskqueue); mtx_unlock(&cq->mtx); return retval; break; default: RETURNWERR(EINVAL, -1); break; } // This should absolutely never run RETURNWERR(ENOTRECOVERABLE, -1); } int cqueue_iscanceled(cqueue * const cq) { return cqueue_getstat(cq, __CQUEUE_CANCELED); } int cqueue_numthreads(cqueue * const cq) { return cqueue_getstat(cq, __CQUEUE_THREADS_NUM); } int cqueue_numtasks(cqueue * const cq) { return cqueue_getstat(cq, __CQUEUE_TASKS_NUM); }