From b817afaa1c148437e1016d1981f138d0c46ccbc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20M=C3=A4kel=C3=A4?= Date: Mon, 11 Jul 2022 21:00:18 +0300 Subject: MDEV-28689, MDEV-28690: Remove ctrl_mutex This reverts the revert 4f62dfe676c29437b4a19c4d229e2accd2dda4a6 and fixes the hang that was introduced when ctrl_mutex was removed. The test mariabackup.compress_qpress covers this code, but the test is skipped if a stand-alone qpress executable is not available. It is not available in many software repositories, possibly because the code base has not been updated since 2010. This was tested with an executable that was compile from the source code at http://www.quicklz.com/qpress-11-source.zip (after adding a missing #include for the definition of isatty()). Compared to the grandparent commit (before the revert), the changes are as follows: comp_thread_ctxt_t::done_cond: A separate condition for completed compression, signaling that thd->to_len has been updated. compress_write(): Replace some threads[i] with thd. Reset thd->to_len = 0 after consuming the compressed data. compress_worker_thread_func(): After consuming the uncompressed data, set thd->data_avail = FALSE. After compressing, signal thd->done_cond. --- extra/mariabackup/ds_compress.cc | 107 ++++++++++++++------------------------- 1 file changed, 37 insertions(+), 70 deletions(-) diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc index 1e2bf76e580..dc1140edfee 100644 --- a/extra/mariabackup/ds_compress.cc +++ b/extra/mariabackup/ds_compress.cc @@ -1,5 +1,6 @@ /****************************************************** Copyright (c) 2011-2013 Percona LLC and/or its affiliates. +Copyright (c) 2022, MariaDB Corporation. Compressing datasink implementation for XtraBackup. @@ -32,11 +33,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA typedef struct { pthread_t id; uint num; - pthread_mutex_t ctrl_mutex; - pthread_cond_t ctrl_cond; pthread_mutex_t data_mutex; pthread_cond_t data_cond; - my_bool started; + pthread_cond_t done_cond; my_bool data_avail; my_bool cancelled; const char *from; @@ -208,14 +207,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) thd = threads + i; - pthread_mutex_lock(&thd->ctrl_mutex); + pthread_mutex_lock(&thd->data_mutex); chunk_len = (len > COMPRESS_CHUNK_SIZE) ? COMPRESS_CHUNK_SIZE : len; thd->from = ptr; thd->from_len = chunk_len; - pthread_mutex_lock(&thd->data_mutex); thd->data_avail = TRUE; pthread_cond_signal(&thd->data_cond); pthread_mutex_unlock(&thd->data_mutex); @@ -234,32 +232,30 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) thd = threads + i; pthread_mutex_lock(&thd->data_mutex); - while (thd->data_avail == TRUE) { - pthread_cond_wait(&thd->data_cond, + while (!thd->to_len) { + pthread_cond_wait(&thd->done_cond, &thd->data_mutex); } - xb_a(threads[i].to_len > 0); - bool fail = ds_write(dest_file, "NEWBNEWB", 8) || write_uint64_le(dest_file, comp_file->bytes_processed); - comp_file->bytes_processed += threads[i].from_len; + comp_file->bytes_processed += thd->from_len; if (!fail) { - fail = write_uint32_le(dest_file, threads[i].adler) || - ds_write(dest_file, threads[i].to, - threads[i].to_len); + fail = write_uint32_le(dest_file, thd->adler) || + ds_write(dest_file, thd->to, + thd->to_len); } - pthread_mutex_unlock(&threads[i].data_mutex); + thd->to_len = 0; + pthread_mutex_unlock(&thd->data_mutex); if (fail) { msg("compress: write to the destination stream " "failed."); return 1; } - pthread_mutex_unlock(&threads[i].ctrl_mutex); } } @@ -329,6 +325,24 @@ write_uint64_le(ds_file_t *file, ulonglong n) return ds_write(file, tmp, sizeof(tmp)); } +static +void +destroy_worker_thread(comp_thread_ctxt_t *thd) +{ + pthread_mutex_lock(&thd->data_mutex); + thd->cancelled = TRUE; + pthread_cond_signal(&thd->data_cond); + pthread_mutex_unlock(&thd->data_mutex); + + pthread_join(thd->id, NULL); + + pthread_cond_destroy(&thd->data_cond); + pthread_cond_destroy(&thd->done_cond); + pthread_mutex_destroy(&thd->data_mutex); + + my_free(thd->to); +} + static comp_thread_ctxt_t * create_worker_threads(uint n) @@ -337,60 +351,36 @@ create_worker_threads(uint n) uint i; threads = (comp_thread_ctxt_t *) - my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE)); + my_malloc(n * sizeof *threads, MYF(MY_ZEROFILL|MY_FAE)); for (i = 0; i < n; i++) { comp_thread_ctxt_t *thd = threads + i; thd->num = i + 1; - thd->started = FALSE; - thd->cancelled = FALSE; - thd->data_avail = FALSE; - thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE + MY_QLZ_COMPRESS_OVERHEAD, MYF(MY_FAE)); - /* Initialize the control mutex and condition var */ - if (pthread_mutex_init(&thd->ctrl_mutex, NULL) || - pthread_cond_init(&thd->ctrl_cond, NULL)) { - goto err; - } - /* Initialize and data mutex and condition var */ if (pthread_mutex_init(&thd->data_mutex, NULL) || - pthread_cond_init(&thd->data_cond, NULL)) { + pthread_cond_init(&thd->data_cond, NULL) || + pthread_cond_init(&thd->done_cond, NULL)) { goto err; } - pthread_mutex_lock(&thd->ctrl_mutex); - if (pthread_create(&thd->id, NULL, compress_worker_thread_func, thd)) { msg("compress: pthread_create() failed: " "errno = %d", errno); - pthread_mutex_unlock(&thd->ctrl_mutex); goto err; } } - /* Wait for the threads to start */ - for (i = 0; i < n; i++) { - comp_thread_ctxt_t *thd = threads + i; - - while (thd->started == FALSE) - pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex); - pthread_mutex_unlock(&thd->ctrl_mutex); - } - return threads; err: - while (i > 0) { - comp_thread_ctxt_t *thd; - i--; - thd = threads + i; - pthread_mutex_unlock(&thd->ctrl_mutex); + for (; i; i--) { + destroy_worker_thread(threads + i); } my_free(threads); @@ -404,21 +394,7 @@ destroy_worker_threads(comp_thread_ctxt_t *threads, uint n) uint i; for (i = 0; i < n; i++) { - comp_thread_ctxt_t *thd = threads + i; - - pthread_mutex_lock(&thd->data_mutex); - threads[i].cancelled = TRUE; - pthread_cond_signal(&thd->data_cond); - pthread_mutex_unlock(&thd->data_mutex); - - pthread_join(thd->id, NULL); - - pthread_cond_destroy(&thd->data_cond); - pthread_mutex_destroy(&thd->data_mutex); - pthread_cond_destroy(&thd->ctrl_cond); - pthread_mutex_destroy(&thd->ctrl_mutex); - - my_free(thd->to); + destroy_worker_thread(threads + i); } my_free(threads); @@ -430,26 +406,16 @@ compress_worker_thread_func(void *arg) { comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg; - pthread_mutex_lock(&thd->ctrl_mutex); - pthread_mutex_lock(&thd->data_mutex); - thd->started = TRUE; - pthread_cond_signal(&thd->ctrl_cond); - - pthread_mutex_unlock(&thd->ctrl_mutex); - while (1) { - thd->data_avail = FALSE; - pthread_cond_signal(&thd->data_cond); - while (!thd->data_avail && !thd->cancelled) { pthread_cond_wait(&thd->data_cond, &thd->data_mutex); } if (thd->cancelled) break; - + thd->data_avail = FALSE; thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len, &thd->state); @@ -464,6 +430,7 @@ compress_worker_thread_func(void *arg) thd->adler = adler32(0x00000001, (uchar *) thd->to, (uInt)thd->to_len); + pthread_cond_signal(&thd->done_cond); } pthread_mutex_unlock(&thd->data_mutex); -- cgit v1.2.1