diff options
-rw-r--r-- | src/bio.c | 47 | ||||
-rw-r--r-- | src/bio.h | 7 |
2 files changed, 52 insertions, 2 deletions
@@ -27,6 +27,13 @@ static pthread_mutex_t bio_mutex; static pthread_cond_t bio_condvar; static list *bio_jobs; +/* The following array is used to hold the number of pending jobs for every + * OP type. This allows us to export the bioPendingJobsOfType() API that is + * useful when the main thread wants to perform some operation that may involve + * objects shared with the background thread. The main thread will just wait + * that there are no longer jobs of this type to be executed before performing + * the sensible operation. This data is also useful for reporting. */ +static unsigned long long *bio_pending; /* This structure represents a background Job. It is only used locally to this * file as the API deos not expose the internals at all. */ @@ -46,10 +53,14 @@ void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; + int j; + /* Initialization of state vars and objects */ pthread_mutex_init(&bio_mutex,NULL); pthread_cond_init(&bio_condvar,NULL); bio_jobs = listCreate(); + bio_pending = zmalloc(sizeof(*bio_pending)*REDIS_BIO_MAX_OP_ID); + for (j = 0; j < REDIS_BIO_MAX_OP_ID; j++) bio_pending[j] = 0; /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); @@ -72,6 +83,7 @@ void bioCreateBackgroundJob(int type, void *data) { job->data = data; pthread_mutex_lock(&bio_mutex); listAddNodeTail(bio_jobs,job); + bio_pending[type]++; pthread_cond_signal(&bio_condvar); pthread_mutex_unlock(&bio_mutex); } @@ -84,6 +96,7 @@ void *bioProcessBackgroundJobs(void *arg) { pthread_mutex_lock(&bio_mutex); while(1) { listNode *ln; + int type; /* The loop always starts with the lock hold. */ if (listLength(bio_jobs) == 0) { @@ -93,13 +106,14 @@ void *bioProcessBackgroundJobs(void *arg) { /* Pop the job from the queue. */ ln = listFirst(bio_jobs); job = ln->value; + type = job->type; listDelNode(bio_jobs,ln); /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ pthread_mutex_unlock(&bio_mutex); /* Process the job accordingly to its type. */ - if (job->type == REDIS_BIO_CLOSE_FILE) { + if (type == REDIS_BIO_CLOSE_FILE) { close((long)job->data); } else { redisPanic("Wrong job type in bioProcessBackgroundJobs()."); @@ -109,5 +123,36 @@ void *bioProcessBackgroundJobs(void *arg) { /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ pthread_mutex_lock(&bio_mutex); + bio_pending[type]--; + } +} + +/* Return the number of pending jobs of the specified type. */ +unsigned long long bioPendingJobsOfType(int type) { + unsigned long long val; + pthread_mutex_lock(&bio_mutex); + val = bio_pending[type]; + pthread_mutex_unlock(&bio_mutex); + return val; +} + +/* Wait until the number of pending jobs of the specified type are + * less or equal to the specified number. + * + * This function may block for long time, it should only be used to perform + * special tasks like AOF rewriting or alike. */ +void bioWaitPendingJobsLE(int type, unsigned long long num) { + unsigned long long iteration = 0; + + /* We poll the jobs queue aggressively to start, and gradually relax + * the polling speed if it is going to take too much time. */ + while(1) { + iteration++; + if (iteration > 1000 && iteration <= 10000) { + usleep(100); + } else if (iteration > 10000) { + usleep(1000); + } + if (bioPendingJobsOfType(type) <= num) break; } } @@ -1,6 +1,11 @@ /* Exported API */ void bioInit(void); void bioCreateBackgroundJob(int type, void *data); +unsigned long long bioPendingJobsOfType(int type); +void bioWaitPendingJobsLE(int type, unsigned long long num); /* Background job opcodes */ -#define REDIS_BIO_CLOSE_FILE 1 +#define REDIS_BIO_ZERO_OP_ID 0 /* We don't use zero as it is the most likely + * passed value in case of bugs/races. */ +#define REDIS_BIO_CLOSE_FILE 1 /* Deferred close(2) syscall. */ +#define REDIS_BIO_MAX_OP_ID 1 |