From 4f62dfe676c29437b4a19c4d229e2accd2dda4a6 Mon Sep 17 00:00:00 2001 From: Vladislav Vaintroub Date: Mon, 11 Jul 2022 15:00:34 +0200 Subject: Revert "MDEV-28689, MDEV-28690: Incorrect error handling for ctrl_mutex" This reverts commit 863c3eda872b19f70ce6045119bf621584e1312d. --- extra/mariabackup/ds_compress.cc | 77 ++++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 22 deletions(-) diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc index 39a72cdca34..1e2bf76e580 100644 --- a/extra/mariabackup/ds_compress.cc +++ b/extra/mariabackup/ds_compress.cc @@ -1,6 +1,5 @@ /****************************************************** Copyright (c) 2011-2013 Percona LLC and/or its affiliates. -Copyright (c) 2022, MariaDB Corporation. Compressing datasink implementation for XtraBackup. @@ -33,8 +32,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA typedef struct { pthread_t id; uint num; + pthread_mutex_t ctrl_mutex; + pthread_cond_t ctrl_cond; pthread_mutex_t data_mutex; pthread_cond_t data_cond; + my_bool started; my_bool data_avail; my_bool cancelled; const char *from; @@ -206,13 +208,14 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) thd = threads + i; - pthread_mutex_lock(&thd->data_mutex); + pthread_mutex_lock(&thd->ctrl_mutex); chunk_len = (len > COMPRESS_CHUNK_SIZE) ? COMPRESS_CHUNK_SIZE : len; thd->from = ptr; thd->from_len = chunk_len; + pthread_mutex_lock(&thd->data_mutex); thd->data_avail = TRUE; pthread_cond_signal(&thd->data_cond); pthread_mutex_unlock(&thd->data_mutex); @@ -256,6 +259,7 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) "failed."); return 1; } + pthread_mutex_unlock(&threads[i].ctrl_mutex); } } @@ -325,23 +329,6 @@ write_uint64_le(ds_file_t *file, ulonglong n) return ds_write(file, tmp, sizeof(tmp)); } -static -void -destroy_worker_thread(comp_thread_ctxt_t *thd) -{ - pthread_mutex_lock(&thd->data_mutex); - thd->cancelled = TRUE; - pthread_cond_signal(&thd->data_cond); - pthread_mutex_unlock(&thd->data_mutex); - - pthread_join(thd->id, NULL); - - pthread_cond_destroy(&thd->data_cond); - pthread_mutex_destroy(&thd->data_mutex); - - my_free(thd->to); -} - static comp_thread_ctxt_t * create_worker_threads(uint n) @@ -356,6 +343,7 @@ create_worker_threads(uint n) comp_thread_ctxt_t *thd = threads + i; thd->num = i + 1; + thd->started = FALSE; thd->cancelled = FALSE; thd->data_avail = FALSE; @@ -363,25 +351,46 @@ create_worker_threads(uint n) MY_QLZ_COMPRESS_OVERHEAD, MYF(MY_FAE)); + /* Initialize the control mutex and condition var */ + if (pthread_mutex_init(&thd->ctrl_mutex, NULL) || + pthread_cond_init(&thd->ctrl_cond, NULL)) { + goto err; + } + /* Initialize and data mutex and condition var */ if (pthread_mutex_init(&thd->data_mutex, NULL) || pthread_cond_init(&thd->data_cond, NULL)) { goto err; } + pthread_mutex_lock(&thd->ctrl_mutex); + if (pthread_create(&thd->id, NULL, compress_worker_thread_func, thd)) { msg("compress: pthread_create() failed: " "errno = %d", errno); + pthread_mutex_unlock(&thd->ctrl_mutex); goto err; } } + /* Wait for the threads to start */ + for (i = 0; i < n; i++) { + comp_thread_ctxt_t *thd = threads + i; + + while (thd->started == FALSE) + pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex); + pthread_mutex_unlock(&thd->ctrl_mutex); + } + return threads; err: - for (; i; i--) { - destroy_worker_thread(threads + i); + while (i > 0) { + comp_thread_ctxt_t *thd; + i--; + thd = threads + i; + pthread_mutex_unlock(&thd->ctrl_mutex); } my_free(threads); @@ -395,7 +404,21 @@ destroy_worker_threads(comp_thread_ctxt_t *threads, uint n) uint i; for (i = 0; i < n; i++) { - destroy_worker_thread(threads + i); + comp_thread_ctxt_t *thd = threads + i; + + pthread_mutex_lock(&thd->data_mutex); + threads[i].cancelled = TRUE; + pthread_cond_signal(&thd->data_cond); + pthread_mutex_unlock(&thd->data_mutex); + + pthread_join(thd->id, NULL); + + pthread_cond_destroy(&thd->data_cond); + pthread_mutex_destroy(&thd->data_mutex); + pthread_cond_destroy(&thd->ctrl_cond); + pthread_mutex_destroy(&thd->ctrl_mutex); + + my_free(thd->to); } my_free(threads); @@ -407,9 +430,19 @@ compress_worker_thread_func(void *arg) { comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg; + pthread_mutex_lock(&thd->ctrl_mutex); + pthread_mutex_lock(&thd->data_mutex); + thd->started = TRUE; + pthread_cond_signal(&thd->ctrl_cond); + + pthread_mutex_unlock(&thd->ctrl_mutex); + while (1) { + thd->data_avail = FALSE; + pthread_cond_signal(&thd->data_cond); + while (!thd->data_avail && !thd->cancelled) { pthread_cond_wait(&thd->data_cond, &thd->data_mutex); } -- cgit v1.2.1