diff options
Diffstat (limited to 'extra')
-rw-r--r-- | extra/mariabackup/ds_compress.cc | 51 |
1 files changed, 43 insertions, 8 deletions
diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc index dc1140edfee..a89e676190d 100644 --- a/extra/mariabackup/ds_compress.cc +++ b/extra/mariabackup/ds_compress.cc @@ -34,9 +34,10 @@ typedef struct { pthread_t id; uint num; pthread_mutex_t data_mutex; + pthread_cond_t avail_cond; pthread_cond_t data_cond; pthread_cond_t done_cond; - my_bool data_avail; + pthread_t data_avail; my_bool cancelled; const char *from; size_t from_len; @@ -197,9 +198,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) threads = comp_ctxt->threads; nthreads = comp_ctxt->nthreads; + const pthread_t self = pthread_self(); + ptr = (const char *) buf; while (len > 0) { - uint max_thread; + bool wait = nthreads == 1; +retry: + bool submitted = false; /* Send data to worker threads for compression */ for (i = 0; i < nthreads; i++) { @@ -208,16 +213,33 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) thd = threads + i; pthread_mutex_lock(&thd->data_mutex); + if (thd->data_avail == pthread_t(~0UL)) { + } else if (!wait) { +skip: + pthread_mutex_unlock(&thd->data_mutex); + continue; + } else { + for (;;) { + pthread_cond_wait(&thd->avail_cond, + &thd->data_mutex); + if (thd->data_avail + == pthread_t(~0UL)) { + break; + } + goto skip; + } + } chunk_len = (len > COMPRESS_CHUNK_SIZE) ? COMPRESS_CHUNK_SIZE : len; thd->from = ptr; thd->from_len = chunk_len; - thd->data_avail = TRUE; + thd->data_avail = self; pthread_cond_signal(&thd->data_cond); pthread_mutex_unlock(&thd->data_mutex); + submitted = true; len -= chunk_len; if (len == 0) { break; @@ -225,13 +247,20 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) ptr += chunk_len; } - max_thread = (i < nthreads) ? i : nthreads - 1; + if (!submitted) { + wait = true; + goto retry; + } - /* Reap and stream the compressed data */ - for (i = 0; i <= max_thread; i++) { + for (i = 0; i < nthreads; i++) { thd = threads + i; pthread_mutex_lock(&thd->data_mutex); + if (thd->data_avail != self) { + pthread_mutex_unlock(&thd->data_mutex); + continue; + } + while (!thd->to_len) { pthread_cond_wait(&thd->done_cond, &thd->data_mutex); @@ -249,6 +278,8 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) } thd->to_len = 0; + thd->data_avail = pthread_t(~0UL); + pthread_cond_signal(&thd->avail_cond); pthread_mutex_unlock(&thd->data_mutex); if (fail) { @@ -336,6 +367,7 @@ destroy_worker_thread(comp_thread_ctxt_t *thd) pthread_join(thd->id, NULL); + pthread_cond_destroy(&thd->avail_cond); pthread_cond_destroy(&thd->data_cond); pthread_cond_destroy(&thd->done_cond); pthread_mutex_destroy(&thd->data_mutex); @@ -363,11 +395,14 @@ create_worker_threads(uint n) /* Initialize and data mutex and condition var */ if (pthread_mutex_init(&thd->data_mutex, NULL) || + pthread_cond_init(&thd->avail_cond, NULL) || pthread_cond_init(&thd->data_cond, NULL) || pthread_cond_init(&thd->done_cond, NULL)) { goto err; } + thd->data_avail = pthread_t(~0UL); + if (pthread_create(&thd->id, NULL, compress_worker_thread_func, thd)) { msg("compress: pthread_create() failed: " @@ -409,13 +444,13 @@ compress_worker_thread_func(void *arg) pthread_mutex_lock(&thd->data_mutex); while (1) { - while (!thd->data_avail && !thd->cancelled) { + while (!thd->cancelled + && (thd->to_len || thd->data_avail == pthread_t(~0UL))) { pthread_cond_wait(&thd->data_cond, &thd->data_mutex); } if (thd->cancelled) break; - thd->data_avail = FALSE; thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len, &thd->state); |