#include "threadpool.h" #include "shared.h" #include "ll.h" #include #include #include // Pair some data with a mutex. Specifically a way to deal with mutices easier, not for data storage (mtxpair_free does not free the `(void*)data` member) typedef struct mtxp { void *data; mtx_t mtx; } mtxpair; mtxpair * mtxpair_init(void * const data, int type) { mtxpair *mtxp = VALLOC(1, sizeof(*mtxp)); if(!mtxp) return NULL; // Init the mutex if(mtx_init(&mtxp->mtx, type) == thrd_error) { free(mtxp); RETURNWERR(errno, NULL); } mtxp->data = data; return mtxp; } void mtxpair_free(mtxpair *mp) { if(!mp) return; mtx_destroy(&mp->mtx); free(mp); return; } int mtxpair_setdata(mtxpair * const mp, void * const data) { if(!mp) RETURNWERR(EINVAL, -1); mp->data = data; return 0; } // thrd_create which calls mtx_lock/unlock on `arg` automatically int thrd_createwmx(thrd_t * const thr, thrd_start_t func, mtxpair * const mtxd) { if(!thr) RETURNWERR(EINVAL, thrd_error); if(!func) RETURNWERR(EINVAL, thrd_error); if(!mtxd) RETURNWERR(EINVAL, thrd_error); if(mtx_lock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} int retval = thrd_create(thr, func, mtxd->data); if(mtx_unlock(&mtxd->mtx) == thrd_error) {RETURNWERR(errno, thrd_error);} return retval; } /* Ok, after doing a little more research, the best way to do this is probaby via a producer/consumer architecture. Spawn a bunch of // threads waiting on a queue (via semaphore) and when one is notified pop a task of the queue and execute it. In this case, the // producer would be the filesystem scanner funciton providing new files to encrypt, and the consumers would be threads waiting // to encrypt them */ // Threadpool: // Array of threads // Task Queue // Readiness semaphore / conditional // Mutex // Linked List of Tasks // Task: // int (*callback)(void*) // void *arg // Consumer: // Wait for cqueue to pop // Fire task // Repeat // Here's a good reference of this implemented in C++ using Boost: https://gist.github.com/mikeando/482342 typedef struct task { task_callback cb; void *arg; } task; task * task_init(task_callback cb, void *arg) { if(cb == NULL) RETURNWERR(EINVAL, NULL); task *task = VALLOC(1, sizeof(*task)); if(!task) return NULL; task->cb = cb; task->arg = arg; return task; } void task_free(task *ts) { if(!ts) return; free(ts); // Not making any assumptions about the data in the task return; } int task_fire(task *ts) { if(!ts) RETURNWERR(EINVAL, -1); if(ts->cb == NULL) RETURNWERR(EINVAL, -1); return ts->cb(ts->arg); } typedef struct cq { dlinkedlist *taskqueue; dlinkedlist *rthreads; mtx_t mtx; cnd_t cnd; unsigned char canceled; } cqueue; // static void ___ucleanup_dfree(void *dll) { // if(!dll) // return; // dlinkedlist_free((dlinkedlist *)dll); // return; // } // static void ___ucleanup_cndd(void *cnd) { // if(!cnd) // return; // cnd_destroy((cnd_t *)cnd); // return; // } // static void ___ucleanup_mtxd(void *mtx) { // if(!mtx) // return; // mtx_destroy((mtx_t*)mtx); // return; // } // cqueue * cqueue_init(int mtx_type) { // cleanup_CREATE(10); // cqueue *cq = VALLOC(1, sizeof(*cq)); // if(!cq) // return NULL; // cleanup_REGISTER(free, cq); // cq->canceled = FALSE; // cq->list = dlinkedlist_init(); // if(!cq->list) // cleanup_MARK(); // cleanup_CNDREGISTER(___ucleanup_dfree, cq->list); // if(!cleanup_ERRORFLAGGED) // if(cnd_init(&cq->cnd) == thrd_error) // cleanup_MARK(); // cleanup_CNDREGISTER(___ucleanup_cndd, &cq->cnd); // if(!cleanup_ERRORFLAGGED) // if(mtx_init(&cq->mtx, mtx_type) != thrd_success) // cleanup_MARK(); // cleanup_CNDREGISTER(___ucleanup_mtxd, &cq->mtx); // if(cleanup_ERRORFLAGGED) // cleanup_FIRE(); // return cq; // } // void cqueue_cancel(cqueue *cq) { // if(!cq) // return; // mtx_lock(cq->mtx); // if(cq->canceled) { // mtx_unlock(cq->mtx); // thrd_exit(-1); // } // cq->canceled++; // mtx_unlock(cq->mtx); // cnd_broadcast(cq->cnd); // return; // } // void cqueue_free(cqueue *cq) { // if(!cq) // return; // cqueue_cancel(cq); // mtx_destroy(cq->mtx); // cnd_destroy(cq->cnd); // free(cq->mtx); // free(cq->cnd); // dlinkedlist_free(cq->list); // return; // } // int cqueue_addtask(cqueue * const cq, task * const tsk) { // if(!cq || !tsk) // RETURNWERR(EINVAL, -1); // mtx_lock(cq->mtx); // // TODO: Think about creating an "exception" via signal handling // if(cq->canceled) { // mtx_unlock(cq->mtx); // thrd_exit(-1); // } // dlinkedlist_prepend(cq->list, tsk, free); // mtx_unlock(cq->mtx); // cnd_signal(cq->cnd); // return 0; // } // task * cqueue_waitpop(cqueue * const cq) { // if(!cq) // RETURNWERR(EINVAL, NULL); // task *retval = NULL; // mtx_lock(cq->mtx); // while(dlinkedlist_isempty(cq->list) && !cq->canceled) // cnd_wait(cq->cnd, cq->mtx); // if(cq->canceled) { // mtx_unlock(cq->mtx); // thrd_exit(-1); // } // retval = dlinkedlist_get(cq->list, dlinkedlist_size(cq->list) - 1); // dlinkedlist_remove(cq->list, dlinkedlist_size(cq->list) - 1); // mtx_unlock(cq->mtx); // return retval; // } typedef struct tp { thrd_t **threads; // thrd_t *threads[] int nthreads; cqueue *taskqueue; } threadpool;