summaryrefslogtreecommitdiff
path: root/extra
diff options
context:
space:
mode:
Diffstat (limited to 'extra')
-rw-r--r--extra/mariabackup/ds_compress.cc51
1 files changed, 43 insertions, 8 deletions
diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc
index dc1140edfee..a89e676190d 100644
--- a/extra/mariabackup/ds_compress.cc
+++ b/extra/mariabackup/ds_compress.cc
@@ -34,9 +34,10 @@ typedef struct {
pthread_t id;
uint num;
pthread_mutex_t data_mutex;
+ pthread_cond_t avail_cond;
pthread_cond_t data_cond;
pthread_cond_t done_cond;
- my_bool data_avail;
+ pthread_t data_avail;
my_bool cancelled;
const char *from;
size_t from_len;
@@ -197,9 +198,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
threads = comp_ctxt->threads;
nthreads = comp_ctxt->nthreads;
+ const pthread_t self = pthread_self();
+
ptr = (const char *) buf;
while (len > 0) {
- uint max_thread;
+ bool wait = nthreads == 1;
+retry:
+ bool submitted = false;
/* Send data to worker threads for compression */
for (i = 0; i < nthreads; i++) {
@@ -208,16 +213,33 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i;
pthread_mutex_lock(&thd->data_mutex);
+ if (thd->data_avail == pthread_t(~0UL)) {
+ } else if (!wait) {
+skip:
+ pthread_mutex_unlock(&thd->data_mutex);
+ continue;
+ } else {
+ for (;;) {
+ pthread_cond_wait(&thd->avail_cond,
+ &thd->data_mutex);
+ if (thd->data_avail
+ == pthread_t(~0UL)) {
+ break;
+ }
+ goto skip;
+ }
+ }
chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
COMPRESS_CHUNK_SIZE : len;
thd->from = ptr;
thd->from_len = chunk_len;
- thd->data_avail = TRUE;
+ thd->data_avail = self;
pthread_cond_signal(&thd->data_cond);
pthread_mutex_unlock(&thd->data_mutex);
+ submitted = true;
len -= chunk_len;
if (len == 0) {
break;
@@ -225,13 +247,20 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
ptr += chunk_len;
}
- max_thread = (i < nthreads) ? i : nthreads - 1;
+ if (!submitted) {
+ wait = true;
+ goto retry;
+ }
- /* Reap and stream the compressed data */
- for (i = 0; i <= max_thread; i++) {
+ for (i = 0; i < nthreads; i++) {
thd = threads + i;
pthread_mutex_lock(&thd->data_mutex);
+ if (thd->data_avail != self) {
+ pthread_mutex_unlock(&thd->data_mutex);
+ continue;
+ }
+
while (!thd->to_len) {
pthread_cond_wait(&thd->done_cond,
&thd->data_mutex);
@@ -249,6 +278,8 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
}
thd->to_len = 0;
+ thd->data_avail = pthread_t(~0UL);
+ pthread_cond_signal(&thd->avail_cond);
pthread_mutex_unlock(&thd->data_mutex);
if (fail) {
@@ -336,6 +367,7 @@ destroy_worker_thread(comp_thread_ctxt_t *thd)
pthread_join(thd->id, NULL);
+ pthread_cond_destroy(&thd->avail_cond);
pthread_cond_destroy(&thd->data_cond);
pthread_cond_destroy(&thd->done_cond);
pthread_mutex_destroy(&thd->data_mutex);
@@ -363,11 +395,14 @@ create_worker_threads(uint n)
/* Initialize and data mutex and condition var */
if (pthread_mutex_init(&thd->data_mutex, NULL) ||
+ pthread_cond_init(&thd->avail_cond, NULL) ||
pthread_cond_init(&thd->data_cond, NULL) ||
pthread_cond_init(&thd->done_cond, NULL)) {
goto err;
}
+ thd->data_avail = pthread_t(~0UL);
+
if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
thd)) {
msg("compress: pthread_create() failed: "
@@ -409,13 +444,13 @@ compress_worker_thread_func(void *arg)
pthread_mutex_lock(&thd->data_mutex);
while (1) {
- while (!thd->data_avail && !thd->cancelled) {
+ while (!thd->cancelled
+ && (thd->to_len || thd->data_avail == pthread_t(~0UL))) {
pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
}
if (thd->cancelled)
break;
- thd->data_avail = FALSE;
thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
&thd->state);