summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--extra/mariabackup/ds_compress.cc107
1 files changed, 37 insertions, 70 deletions
diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc
index 1e2bf76e580..dc1140edfee 100644
--- a/extra/mariabackup/ds_compress.cc
+++ b/extra/mariabackup/ds_compress.cc
@@ -1,5 +1,6 @@
/******************************************************
Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
+Copyright (c) 2022, MariaDB Corporation.
Compressing datasink implementation for XtraBackup.
@@ -32,11 +33,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
typedef struct {
pthread_t id;
uint num;
- pthread_mutex_t ctrl_mutex;
- pthread_cond_t ctrl_cond;
pthread_mutex_t data_mutex;
pthread_cond_t data_cond;
- my_bool started;
+ pthread_cond_t done_cond;
my_bool data_avail;
my_bool cancelled;
const char *from;
@@ -208,14 +207,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i;
- pthread_mutex_lock(&thd->ctrl_mutex);
+ pthread_mutex_lock(&thd->data_mutex);
chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
COMPRESS_CHUNK_SIZE : len;
thd->from = ptr;
thd->from_len = chunk_len;
- pthread_mutex_lock(&thd->data_mutex);
thd->data_avail = TRUE;
pthread_cond_signal(&thd->data_cond);
pthread_mutex_unlock(&thd->data_mutex);
@@ -234,32 +232,30 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i;
pthread_mutex_lock(&thd->data_mutex);
- while (thd->data_avail == TRUE) {
- pthread_cond_wait(&thd->data_cond,
+ while (!thd->to_len) {
+ pthread_cond_wait(&thd->done_cond,
&thd->data_mutex);
}
- xb_a(threads[i].to_len > 0);
-
bool fail = ds_write(dest_file, "NEWBNEWB", 8) ||
write_uint64_le(dest_file,
comp_file->bytes_processed);
- comp_file->bytes_processed += threads[i].from_len;
+ comp_file->bytes_processed += thd->from_len;
if (!fail) {
- fail = write_uint32_le(dest_file, threads[i].adler) ||
- ds_write(dest_file, threads[i].to,
- threads[i].to_len);
+ fail = write_uint32_le(dest_file, thd->adler) ||
+ ds_write(dest_file, thd->to,
+ thd->to_len);
}
- pthread_mutex_unlock(&threads[i].data_mutex);
+ thd->to_len = 0;
+ pthread_mutex_unlock(&thd->data_mutex);
if (fail) {
msg("compress: write to the destination stream "
"failed.");
return 1;
}
- pthread_mutex_unlock(&threads[i].ctrl_mutex);
}
}
@@ -330,6 +326,24 @@ write_uint64_le(ds_file_t *file, ulonglong n)
}
static
+void
+destroy_worker_thread(comp_thread_ctxt_t *thd)
+{
+ pthread_mutex_lock(&thd->data_mutex);
+ thd->cancelled = TRUE;
+ pthread_cond_signal(&thd->data_cond);
+ pthread_mutex_unlock(&thd->data_mutex);
+
+ pthread_join(thd->id, NULL);
+
+ pthread_cond_destroy(&thd->data_cond);
+ pthread_cond_destroy(&thd->done_cond);
+ pthread_mutex_destroy(&thd->data_mutex);
+
+ my_free(thd->to);
+}
+
+static
comp_thread_ctxt_t *
create_worker_threads(uint n)
{
@@ -337,60 +351,36 @@ create_worker_threads(uint n)
uint i;
threads = (comp_thread_ctxt_t *)
- my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE));
+ my_malloc(n * sizeof *threads, MYF(MY_ZEROFILL|MY_FAE));
for (i = 0; i < n; i++) {
comp_thread_ctxt_t *thd = threads + i;
thd->num = i + 1;
- thd->started = FALSE;
- thd->cancelled = FALSE;
- thd->data_avail = FALSE;
-
thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE +
MY_QLZ_COMPRESS_OVERHEAD,
MYF(MY_FAE));
- /* Initialize the control mutex and condition var */
- if (pthread_mutex_init(&thd->ctrl_mutex, NULL) ||
- pthread_cond_init(&thd->ctrl_cond, NULL)) {
- goto err;
- }
-
/* Initialize and data mutex and condition var */
if (pthread_mutex_init(&thd->data_mutex, NULL) ||
- pthread_cond_init(&thd->data_cond, NULL)) {
+ pthread_cond_init(&thd->data_cond, NULL) ||
+ pthread_cond_init(&thd->done_cond, NULL)) {
goto err;
}
- pthread_mutex_lock(&thd->ctrl_mutex);
-
if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
thd)) {
msg("compress: pthread_create() failed: "
"errno = %d", errno);
- pthread_mutex_unlock(&thd->ctrl_mutex);
goto err;
}
}
- /* Wait for the threads to start */
- for (i = 0; i < n; i++) {
- comp_thread_ctxt_t *thd = threads + i;
-
- while (thd->started == FALSE)
- pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex);
- pthread_mutex_unlock(&thd->ctrl_mutex);
- }
-
return threads;
err:
- while (i > 0) {
- comp_thread_ctxt_t *thd;
- i--;
- thd = threads + i;
- pthread_mutex_unlock(&thd->ctrl_mutex);
+ for (; i; i--) {
+ destroy_worker_thread(threads + i);
}
my_free(threads);
@@ -404,21 +394,7 @@ destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
uint i;
for (i = 0; i < n; i++) {
- comp_thread_ctxt_t *thd = threads + i;
-
- pthread_mutex_lock(&thd->data_mutex);
- threads[i].cancelled = TRUE;
- pthread_cond_signal(&thd->data_cond);
- pthread_mutex_unlock(&thd->data_mutex);
-
- pthread_join(thd->id, NULL);
-
- pthread_cond_destroy(&thd->data_cond);
- pthread_mutex_destroy(&thd->data_mutex);
- pthread_cond_destroy(&thd->ctrl_cond);
- pthread_mutex_destroy(&thd->ctrl_mutex);
-
- my_free(thd->to);
+ destroy_worker_thread(threads + i);
}
my_free(threads);
@@ -430,26 +406,16 @@ compress_worker_thread_func(void *arg)
{
comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
- pthread_mutex_lock(&thd->ctrl_mutex);
-
pthread_mutex_lock(&thd->data_mutex);
- thd->started = TRUE;
- pthread_cond_signal(&thd->ctrl_cond);
-
- pthread_mutex_unlock(&thd->ctrl_mutex);
-
while (1) {
- thd->data_avail = FALSE;
- pthread_cond_signal(&thd->data_cond);
-
while (!thd->data_avail && !thd->cancelled) {
pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
}
if (thd->cancelled)
break;
-
+ thd->data_avail = FALSE;
thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
&thd->state);
@@ -464,6 +430,7 @@ compress_worker_thread_func(void *arg)
thd->adler = adler32(0x00000001, (uchar *) thd->to,
(uInt)thd->to_len);
+ pthread_cond_signal(&thd->done_cond);
}
pthread_mutex_unlock(&thd->data_mutex);