#include "threadpool.h" #include "arena.h" #include "shared.h" #include "ll.h" #include #include #include /* Mutex: Lock a shared resource. Used to prevent race conditions when accessing / modifying some shared resource. A lock must // always be followed by an unlock // Semaphore: Send / wait on a signal; solves the consumer/producer problem. A function that sends should never wait, and a // function that waits should never send */ // Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) typedef struct mtxp { void *data; mtx_t *mtx; } mtxpair; mtxpair * mtxpair_init(void * const data, int type) { mtxpair *mtxp = VALLOC(1, sizeof(*mtxp)); if(!mtxp) return NULL; // Make room for the mutex mtxp->mtx = VALLOC(1, sizeof(*mtxp->mtx)); if(!mtxp->mtx) { free(mtxp); return NULL; } // Init the mutex if(mtx_init(mtxp->mtx, type) == thrd_error) { free(mtxp->mtx); free(mtxp); RETURNWERR(errno, NULL); } mtxp->data = data; return mtxp; } void mtxpair_free(mtxpair *mp) { if(!mp) return; mtx_destroy(mp->mtx); free(mp->mtx); free(mp); return; } int mtxpair_setdata(mtxpair * const mp, void * const data) { if(!mp) RETURNWERR(EINVAL, -1); mp->data = data; return 0; } // thrd_create which calls mtx_lock/unlock on `arg` automatically int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) { if(!thr) RETURNWERR(EINVAL, thrd_error); if(!func) RETURNWERR(EINVAL, thrd_error); if(!mtxd) RETURNWERR(EINVAL, thrd_error); if(mtx_lock(mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} int retval = thrd_create(thr, func, mtxd->data); if(mtx_unlock(mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} return retval; } /* Ok, after doing a little more research, the best way to do this is probaby via a producer/consumer architecture. Spawn a bunch of // threads waiting on a queue (via semaphore) and when one is notified pop a task of the queue and execute it. In this case, the // producer would be the filesystem scanner funciton providing new files to encrypt, and the consumers would be threads waiting // to encrypt them */ // Threadpool: // Array of threads // Task Queue // Readiness semaphore // Linked List of Tasks // Task: // int (*callback)(void*) // void *arg // Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 typedef struct task { task_callback cb; void *arg; } task; typedef struct cq { dlinkedlist *list; mtx_t *mutex; cnd_t *conditional; unsigned char canceled; } cqueue; typedef struct tp { thrd_t **threads; int nthreads; cqueue *taskqueue; } threadpool; task * task_init(task_callback cb, void *arg) { if(cb == NULL) RETURNWERR(EINVAL, NULL); task *task = VALLOC(1, sizeof(*task)); if(!task) return NULL; task->cb = cb; task->arg = arg; return task; } void task_free(task *ts) { if(!ts) return; free(ts); // Not making any assumptions about the data in the task return; } static void ___ucleanup_mtxd(void *mtx) { if(!mtx) return; mtx_destroy((mtx_t *)mtx); return; } static void ___ucleanup_cndd(void *cnd) { if(!cnd) return; cnd_destroy((cnd_t *)cnd); return; } static void ___ucleanup_dll(void *dll) { if(!dll) return; dlinkedlist_free((dlinkedlist *)dll); return; } cqueue * cqueue_init(int mtx_type) { cleanup_CREATE(10); cqueue *cq = VALLOC(1, sizeof(*cq)); if(!cq) return NULL; cleanup_REGISTER(free, cq); cq->mutex = VALLOC(1, sizeof(*(cq->mutex))); if(!(cq->mutex)) cleanup_MARK(); cleanup_CNDREGISTER(free, cq->mutex); if(cleanup_ERRORFLAGGED && mtx_init(cq->mutex, mtx_type) != thrd_success) cleanup_MARK(); cleanup_CNDREGISTER(___ucleanup_mtxd, cq->mutex); if(cleanup_ERRORFLAGGED && !(cq->conditional = VALLOC(1, sizeof(*(cq->conditional))))) cleanup_MARK(); cleanup_CNDREGISTER(free, cq->conditional); if(cleanup_ERRORFLAGGED && cnd_init(cq->conditional) != thrd_success) cleanup_MARK(); cleanup_CNDREGISTER(___ucleanup_cndd, cq->conditional); cq->list = dlinkedlist_init(); if(cleanup_ERRORFLAGGED && !cq->list) cleanup_MARK(); cleanup_CNDREGISTER(___ucleanup_dll, cq->list); if(cleanup_ERRORFLAGGED) cleanup_fire(&__CLEANUP); // This implementation is better and should be far less error prone than the thing I did earlier, but it would be nicer if C had anonymous functions cq->canceled = 0; return cq; } void cqueue_free(cqueue *cq) { if(!cq) return; // Cancel any outstanding threads before freeing everything cqueue_cancel(cq); dlinkedlist_free(cq->list); cnd_destroy(cq->conditional); mtx_destroy(cq->mutex); free(cq->conditional); free(cq->mutex); free(cq); return; } int cqueue_append(cqueue * const cq, task *tsk) { if(!cq || !tsk) RETURNWERR(EINVAL, -1); mtx_lock(cq->mutex); if(cq->canceled) { mtx_unlock(cq->mutex); thrd_exit(thrd_timedout); } dlinkedlist_append(cq->list, tsk, free); mtx_unlock(cq->mutex); cnd_signal(cq->conditional); return 0; } int cqueue_prepend(cqueue * const cq, task *tsk) { if(!cq || !tsk) RETURNWERR(EINVAL, -1); mtx_lock(cq->mutex); if(cq->canceled) { mtx_unlock(cq->mutex); thrd_exit(thrd_timedout); } dlinkedlist_prepend(cq->list, tsk, free); mtx_unlock(cq->mutex); cnd_signal(cq->conditional); return 0; } int cqueue_insert(cqueue * const cq, task *tsk, int index) { if(!cq || !tsk || index < 0) // Can't check to see if the index is too high without locking the mutex first RETURNWERR(EINVAL, -1); mtx_lock(cq->mutex); if(cq->canceled) { mtx_unlock(cq->mutex); thrd_exit(thrd_timedout); } dlinkedlist_insert(cq->list, tsk, free, index); mtx_unlock(cq->mutex); cnd_signal(cq->conditional); return 0; } int cqueue_size(cqueue const * const cq) { if(!cq) RETURNWERR(EINVAL, -1); mtx_lock(cq->mutex); if(cq->canceled) { mtx_unlock(cq->mutex); thrd_exit(thrd_timedout); } int retval = dlinkedlist_size(cq->list); mtx_unlock(cq->mutex); return retval; } int cqueue_isempty(cqueue const * const cq) { int val = cqueue_size(cq); return (val < 0) ? -1 : (val == 0); } int cqueue_trypop(cqueue * const cq, task **ret) { if(!cq || !ret || !*ret) RETURNWERR(EINVAL, -1); int retval = 0; mtx_lock(cq->mutex); if(cq->canceled) { mtx_unlock(cq->mutex); thrd_exit(thrd_timedout); } if(!dlinkedlist_isempty(cq->list)) { *ret = (task*)dlinkedlist_poplast(cq->list); retval = 1; } mtx_unlock(cq->mutex); return retval; } int cqueue_waitpop(cqueue * const cq, task **ret) { if(!cq || !ret || !*ret) RETURNWERR(EINVAL, -1); mtx_lock(cq->mutex); while(!dlinkedlist_isempty(cq->list) && !cq->canceled) cnd_wait(cq->conditional, cq->mutex); // Unlocks mutex while waiting, acquires lock once waiting is done if(cq->canceled) { mtx_unlock(cq->mutex); thrd_exit(thrd_timedout); } *ret = dlinkedlist_poplast(cq->list); mtx_unlock(cq->mutex); return 0; } int cqueue_cancel(cqueue * const cq) { if(!cq) RETURNWERR(EINVAL, -1); int retval = 0; mtx_lock(cq->mutex); if(cq->canceled) retval = -1; else cq->canceled++; mtx_unlock(cq->mutex); cnd_broadcast(cq->conditional); return retval; } static void ___ucleanup_cqfree(void *cq) { if(!cq) return; cqueue_free(cq); return; } threadpool * threadpool_init(int threads) { if(threads < 1) RETURNWERR(EINVAL, NULL); cleanup_CREATE(10); threadpool *tp = VALLOC(1, sizeof(*tp)); if(!tp) return NULL; cleanup_REGISTER(free, tp); tp->taskqueue = cqueue_init(mtx_plain); if(!tp->taskqueue) cleanup_MARK(); cleanup_CNDREGISTER(___ucleanup_cqfree, tp->taskqueue); tp->threads = VALLOC(threads, sizeof(*tp->threads)); if(!tp->threads) cleanup_MARK(); cleanup_CNDREGISTER(free, tp->threads); for(int i = 0; i < threads && !cleanup_ERRORFLAGGED; i++) { tp->threads[i] = VALLOC(1, sizeof(**tp->threads)); if(!tp->threads[i]) { cleanup_MARK(); for(int j = 0; j < i; j++) free(tp->threads[j]); } } if(cleanup_ERRORFLAGGED) cleanup_FIRE(); else tp->nthreads = threads; return tp; } void threadpool_free(threadpool *tp) { if(!tp) return; cqueue_free(tp->taskqueue); for(int i = 0; i < tp->nthreads; i++) { thrd_detach(*tp->threads[i]); free(tp->threads[i]); } free(tp->threads); cqueue_free(tp->taskqueue); free(tp); return; }