summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2022-07-11 21:00:18 +0300
committerMarko Mäkelä <marko.makela@mariadb.com>2022-07-11 21:00:18 +0300
commitb817afaa1c148437e1016d1981f138d0c46ccbc8 (patch)
tree60c77fb81d68200b24c51515f77d2c7cdff75d2b
parent4f62dfe676c29437b4a19c4d229e2accd2dda4a6 (diff)
downloadmariadb-git-b817afaa1c148437e1016d1981f138d0c46ccbc8.tar.gz
MDEV-28689, MDEV-28690: Remove ctrl_mutex
This reverts the revert 4f62dfe676c29437b4a19c4d229e2accd2dda4a6 and fixes the hang that was introduced when ctrl_mutex was removed. The test mariabackup.compress_qpress covers this code, but the test is skipped if a stand-alone qpress executable is not available. It is not available in many software repositories, possibly because the code base has not been updated since 2010. This was tested with an executable that was compile from the source code at http://www.quicklz.com/qpress-11-source.zip (after adding a missing #include <unistd.h> for the definition of isatty()). Compared to the grandparent commit (before the revert), the changes are as follows: comp_thread_ctxt_t::done_cond: A separate condition for completed compression, signaling that thd->to_len has been updated. compress_write(): Replace some threads[i] with thd. Reset thd->to_len = 0 after consuming the compressed data. compress_worker_thread_func(): After consuming the uncompressed data, set thd->data_avail = FALSE. After compressing, signal thd->done_cond.
-rw-r--r--extra/mariabackup/ds_compress.cc107
1 files changed, 37 insertions, 70 deletions
diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc
index 1e2bf76e580..dc1140edfee 100644
--- a/extra/mariabackup/ds_compress.cc
+++ b/extra/mariabackup/ds_compress.cc
@@ -1,5 +1,6 @@
/******************************************************
Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
+Copyright (c) 2022, MariaDB Corporation.
Compressing datasink implementation for XtraBackup.
@@ -32,11 +33,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
typedef struct {
pthread_t id;
uint num;
- pthread_mutex_t ctrl_mutex;
- pthread_cond_t ctrl_cond;
pthread_mutex_t data_mutex;
pthread_cond_t data_cond;
- my_bool started;
+ pthread_cond_t done_cond;
my_bool data_avail;
my_bool cancelled;
const char *from;
@@ -208,14 +207,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i;
- pthread_mutex_lock(&thd->ctrl_mutex);
+ pthread_mutex_lock(&thd->data_mutex);
chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
COMPRESS_CHUNK_SIZE : len;
thd->from = ptr;
thd->from_len = chunk_len;
- pthread_mutex_lock(&thd->data_mutex);
thd->data_avail = TRUE;
pthread_cond_signal(&thd->data_cond);
pthread_mutex_unlock(&thd->data_mutex);
@@ -234,32 +232,30 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i;
pthread_mutex_lock(&thd->data_mutex);
- while (thd->data_avail == TRUE) {
- pthread_cond_wait(&thd->data_cond,
+ while (!thd->to_len) {
+ pthread_cond_wait(&thd->done_cond,
&thd->data_mutex);
}
- xb_a(threads[i].to_len > 0);
-
bool fail = ds_write(dest_file, "NEWBNEWB", 8) ||
write_uint64_le(dest_file,
comp_file->bytes_processed);
- comp_file->bytes_processed += threads[i].from_len;
+ comp_file->bytes_processed += thd->from_len;
if (!fail) {
- fail = write_uint32_le(dest_file, threads[i].adler) ||
- ds_write(dest_file, threads[i].to,
- threads[i].to_len);
+ fail = write_uint32_le(dest_file, thd->adler) ||
+ ds_write(dest_file, thd->to,
+ thd->to_len);
}
- pthread_mutex_unlock(&threads[i].data_mutex);
+ thd->to_len = 0;
+ pthread_mutex_unlock(&thd->data_mutex);
if (fail) {
msg("compress: write to the destination stream "
"failed.");
return 1;
}
- pthread_mutex_unlock(&threads[i].ctrl_mutex);
}
}
@@ -330,6 +326,24 @@ write_uint64_le(ds_file_t *file, ulonglong n)
}
static
+void
+destroy_worker_thread(comp_thread_ctxt_t *thd)
+{
+ pthread_mutex_lock(&thd->data_mutex);
+ thd->cancelled = TRUE;
+ pthread_cond_signal(&thd->data_cond);
+ pthread_mutex_unlock(&thd->data_mutex);
+
+ pthread_join(thd->id, NULL);
+
+ pthread_cond_destroy(&thd->data_cond);
+ pthread_cond_destroy(&thd->done_cond);
+ pthread_mutex_destroy(&thd->data_mutex);
+
+ my_free(thd->to);
+}
+
+static
comp_thread_ctxt_t *
create_worker_threads(uint n)
{
@@ -337,60 +351,36 @@ create_worker_threads(uint n)
uint i;
threads = (comp_thread_ctxt_t *)
- my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE));
+ my_malloc(n * sizeof *threads, MYF(MY_ZEROFILL|MY_FAE));
for (i = 0; i < n; i++) {
comp_thread_ctxt_t *thd = threads + i;
thd->num = i + 1;
- thd->started = FALSE;
- thd->cancelled = FALSE;
- thd->data_avail = FALSE;
-
thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE +
MY_QLZ_COMPRESS_OVERHEAD,
MYF(MY_FAE));
- /* Initialize the control mutex and condition var */
- if (pthread_mutex_init(&thd->ctrl_mutex, NULL) ||
- pthread_cond_init(&thd->ctrl_cond, NULL)) {
- goto err;
- }
-
/* Initialize and data mutex and condition var */
if (pthread_mutex_init(&thd->data_mutex, NULL) ||
- pthread_cond_init(&thd->data_cond, NULL)) {
+ pthread_cond_init(&thd->data_cond, NULL) ||
+ pthread_cond_init(&thd->done_cond, NULL)) {
goto err;
}
- pthread_mutex_lock(&thd->ctrl_mutex);
-
if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
thd)) {
msg("compress: pthread_create() failed: "
"errno = %d", errno);
- pthread_mutex_unlock(&thd->ctrl_mutex);
goto err;
}
}
- /* Wait for the threads to start */
- for (i = 0; i < n; i++) {
- comp_thread_ctxt_t *thd = threads + i;
-
- while (thd->started == FALSE)
- pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex);
- pthread_mutex_unlock(&thd->ctrl_mutex);
- }
-
return threads;
err:
- while (i > 0) {
- comp_thread_ctxt_t *thd;
- i--;
- thd = threads + i;
- pthread_mutex_unlock(&thd->ctrl_mutex);
+ for (; i; i--) {
+ destroy_worker_thread(threads + i);
}
my_free(threads);
@@ -404,21 +394,7 @@ destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
uint i;
for (i = 0; i < n; i++) {
- comp_thread_ctxt_t *thd = threads + i;
-
- pthread_mutex_lock(&thd->data_mutex);
- threads[i].cancelled = TRUE;
- pthread_cond_signal(&thd->data_cond);
- pthread_mutex_unlock(&thd->data_mutex);
-
- pthread_join(thd->id, NULL);
-
- pthread_cond_destroy(&thd->data_cond);
- pthread_mutex_destroy(&thd->data_mutex);
- pthread_cond_destroy(&thd->ctrl_cond);
- pthread_mutex_destroy(&thd->ctrl_mutex);
-
- my_free(thd->to);
+ destroy_worker_thread(threads + i);
}
my_free(threads);
@@ -430,26 +406,16 @@ compress_worker_thread_func(void *arg)
{
comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
- pthread_mutex_lock(&thd->ctrl_mutex);
-
pthread_mutex_lock(&thd->data_mutex);
- thd->started = TRUE;
- pthread_cond_signal(&thd->ctrl_cond);
-
- pthread_mutex_unlock(&thd->ctrl_mutex);
-
while (1) {
- thd->data_avail = FALSE;
- pthread_cond_signal(&thd->data_cond);
-
while (!thd->data_avail && !thd->cancelled) {
pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
}
if (thd->cancelled)
break;
-
+ thd->data_avail = FALSE;
thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
&thd->state);
@@ -464,6 +430,7 @@ compress_worker_thread_func(void *arg)
thd->adler = adler32(0x00000001, (uchar *) thd->to,
(uInt)thd->to_len);
+ pthread_cond_signal(&thd->done_cond);
}
pthread_mutex_unlock(&thd->data_mutex);