summaryrefslogtreecommitdiff
path: root/extra
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2022-07-11 15:00:34 +0200
committerVladislav Vaintroub <wlad@mariadb.com>2022-07-11 15:00:34 +0200
commit4f62dfe676c29437b4a19c4d229e2accd2dda4a6 (patch)
tree8d9a59a1d7bf67c64475270ffe9768647cb65118 /extra
parent7598ef4b2616c476ea38362a3f41c399c4bf530e (diff)
downloadmariadb-git-4f62dfe676c29437b4a19c4d229e2accd2dda4a6.tar.gz
Revert "MDEV-28689, MDEV-28690: Incorrect error handling for ctrl_mutex"
This reverts commit 863c3eda872b19f70ce6045119bf621584e1312d.
Diffstat (limited to 'extra')
-rw-r--r--extra/mariabackup/ds_compress.cc77
1 files changed, 55 insertions, 22 deletions
diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc
index 39a72cdca34..1e2bf76e580 100644
--- a/extra/mariabackup/ds_compress.cc
+++ b/extra/mariabackup/ds_compress.cc
@@ -1,6 +1,5 @@
/******************************************************
Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
-Copyright (c) 2022, MariaDB Corporation.
Compressing datasink implementation for XtraBackup.
@@ -33,8 +32,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
typedef struct {
pthread_t id;
uint num;
+ pthread_mutex_t ctrl_mutex;
+ pthread_cond_t ctrl_cond;
pthread_mutex_t data_mutex;
pthread_cond_t data_cond;
+ my_bool started;
my_bool data_avail;
my_bool cancelled;
const char *from;
@@ -206,13 +208,14 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i;
- pthread_mutex_lock(&thd->data_mutex);
+ pthread_mutex_lock(&thd->ctrl_mutex);
chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
COMPRESS_CHUNK_SIZE : len;
thd->from = ptr;
thd->from_len = chunk_len;
+ pthread_mutex_lock(&thd->data_mutex);
thd->data_avail = TRUE;
pthread_cond_signal(&thd->data_cond);
pthread_mutex_unlock(&thd->data_mutex);
@@ -256,6 +259,7 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
"failed.");
return 1;
}
+ pthread_mutex_unlock(&threads[i].ctrl_mutex);
}
}
@@ -326,23 +330,6 @@ write_uint64_le(ds_file_t *file, ulonglong n)
}
static
-void
-destroy_worker_thread(comp_thread_ctxt_t *thd)
-{
- pthread_mutex_lock(&thd->data_mutex);
- thd->cancelled = TRUE;
- pthread_cond_signal(&thd->data_cond);
- pthread_mutex_unlock(&thd->data_mutex);
-
- pthread_join(thd->id, NULL);
-
- pthread_cond_destroy(&thd->data_cond);
- pthread_mutex_destroy(&thd->data_mutex);
-
- my_free(thd->to);
-}
-
-static
comp_thread_ctxt_t *
create_worker_threads(uint n)
{
@@ -356,6 +343,7 @@ create_worker_threads(uint n)
comp_thread_ctxt_t *thd = threads + i;
thd->num = i + 1;
+ thd->started = FALSE;
thd->cancelled = FALSE;
thd->data_avail = FALSE;
@@ -363,25 +351,46 @@ create_worker_threads(uint n)
MY_QLZ_COMPRESS_OVERHEAD,
MYF(MY_FAE));
+ /* Initialize the control mutex and condition var */
+ if (pthread_mutex_init(&thd->ctrl_mutex, NULL) ||
+ pthread_cond_init(&thd->ctrl_cond, NULL)) {
+ goto err;
+ }
+
/* Initialize and data mutex and condition var */
if (pthread_mutex_init(&thd->data_mutex, NULL) ||
pthread_cond_init(&thd->data_cond, NULL)) {
goto err;
}
+ pthread_mutex_lock(&thd->ctrl_mutex);
+
if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
thd)) {
msg("compress: pthread_create() failed: "
"errno = %d", errno);
+ pthread_mutex_unlock(&thd->ctrl_mutex);
goto err;
}
}
+ /* Wait for the threads to start */
+ for (i = 0; i < n; i++) {
+ comp_thread_ctxt_t *thd = threads + i;
+
+ while (thd->started == FALSE)
+ pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex);
+ pthread_mutex_unlock(&thd->ctrl_mutex);
+ }
+
return threads;
err:
- for (; i; i--) {
- destroy_worker_thread(threads + i);
+ while (i > 0) {
+ comp_thread_ctxt_t *thd;
+ i--;
+ thd = threads + i;
+ pthread_mutex_unlock(&thd->ctrl_mutex);
}
my_free(threads);
@@ -395,7 +404,21 @@ destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
uint i;
for (i = 0; i < n; i++) {
- destroy_worker_thread(threads + i);
+ comp_thread_ctxt_t *thd = threads + i;
+
+ pthread_mutex_lock(&thd->data_mutex);
+ threads[i].cancelled = TRUE;
+ pthread_cond_signal(&thd->data_cond);
+ pthread_mutex_unlock(&thd->data_mutex);
+
+ pthread_join(thd->id, NULL);
+
+ pthread_cond_destroy(&thd->data_cond);
+ pthread_mutex_destroy(&thd->data_mutex);
+ pthread_cond_destroy(&thd->ctrl_cond);
+ pthread_mutex_destroy(&thd->ctrl_mutex);
+
+ my_free(thd->to);
}
my_free(threads);
@@ -407,9 +430,19 @@ compress_worker_thread_func(void *arg)
{
comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
+ pthread_mutex_lock(&thd->ctrl_mutex);
+
pthread_mutex_lock(&thd->data_mutex);
+ thd->started = TRUE;
+ pthread_cond_signal(&thd->ctrl_cond);
+
+ pthread_mutex_unlock(&thd->ctrl_mutex);
+
while (1) {
+ thd->data_avail = FALSE;
+ pthread_cond_signal(&thd->data_cond);
+
while (!thd->data_avail && !thd->cancelled) {
pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
}