diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2022-07-11 21:00:18 +0300 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2022-07-11 21:00:18 +0300 |
commit | b817afaa1c148437e1016d1981f138d0c46ccbc8 (patch) | |
tree | 60c77fb81d68200b24c51515f77d2c7cdff75d2b | |
parent | 4f62dfe676c29437b4a19c4d229e2accd2dda4a6 (diff) | |
download | mariadb-git-b817afaa1c148437e1016d1981f138d0c46ccbc8.tar.gz |
MDEV-28689, MDEV-28690: Remove ctrl_mutex
This reverts the revert 4f62dfe676c29437b4a19c4d229e2accd2dda4a6
and fixes the hang that was introduced when ctrl_mutex was removed.
The test mariabackup.compress_qpress covers this code, but the
test is skipped if a stand-alone qpress executable is not available.
It is not available in many software repositories, possibly because
the code base has not been updated since 2010.
This was tested with an executable that was compile from the source
code at http://www.quicklz.com/qpress-11-source.zip (after adding
a missing #include <unistd.h> for the definition of isatty()).
Compared to the grandparent commit (before the revert), the changes
are as follows:
comp_thread_ctxt_t::done_cond: A separate condition for completed
compression, signaling that thd->to_len has been updated.
compress_write(): Replace some threads[i] with thd.
Reset thd->to_len = 0 after consuming the compressed data.
compress_worker_thread_func(): After consuming the uncompressed
data, set thd->data_avail = FALSE. After compressing, signal
thd->done_cond.
-rw-r--r-- | extra/mariabackup/ds_compress.cc | 107 |
1 files changed, 37 insertions, 70 deletions
diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc index 1e2bf76e580..dc1140edfee 100644 --- a/extra/mariabackup/ds_compress.cc +++ b/extra/mariabackup/ds_compress.cc @@ -1,5 +1,6 @@ /****************************************************** Copyright (c) 2011-2013 Percona LLC and/or its affiliates. +Copyright (c) 2022, MariaDB Corporation. Compressing datasink implementation for XtraBackup. @@ -32,11 +33,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA typedef struct { pthread_t id; uint num; - pthread_mutex_t ctrl_mutex; - pthread_cond_t ctrl_cond; pthread_mutex_t data_mutex; pthread_cond_t data_cond; - my_bool started; + pthread_cond_t done_cond; my_bool data_avail; my_bool cancelled; const char *from; @@ -208,14 +207,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) thd = threads + i; - pthread_mutex_lock(&thd->ctrl_mutex); + pthread_mutex_lock(&thd->data_mutex); chunk_len = (len > COMPRESS_CHUNK_SIZE) ? COMPRESS_CHUNK_SIZE : len; thd->from = ptr; thd->from_len = chunk_len; - pthread_mutex_lock(&thd->data_mutex); thd->data_avail = TRUE; pthread_cond_signal(&thd->data_cond); pthread_mutex_unlock(&thd->data_mutex); @@ -234,32 +232,30 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) thd = threads + i; pthread_mutex_lock(&thd->data_mutex); - while (thd->data_avail == TRUE) { - pthread_cond_wait(&thd->data_cond, + while (!thd->to_len) { + pthread_cond_wait(&thd->done_cond, &thd->data_mutex); } - xb_a(threads[i].to_len > 0); - bool fail = ds_write(dest_file, "NEWBNEWB", 8) || write_uint64_le(dest_file, comp_file->bytes_processed); - comp_file->bytes_processed += threads[i].from_len; + comp_file->bytes_processed += thd->from_len; if (!fail) { - fail = write_uint32_le(dest_file, threads[i].adler) || - ds_write(dest_file, threads[i].to, - threads[i].to_len); + fail = write_uint32_le(dest_file, thd->adler) || + ds_write(dest_file, thd->to, + thd->to_len); } - pthread_mutex_unlock(&threads[i].data_mutex); + thd->to_len = 0; + pthread_mutex_unlock(&thd->data_mutex); if (fail) { msg("compress: write to the destination stream " "failed."); return 1; } - pthread_mutex_unlock(&threads[i].ctrl_mutex); } } @@ -330,6 +326,24 @@ write_uint64_le(ds_file_t *file, ulonglong n) } static +void +destroy_worker_thread(comp_thread_ctxt_t *thd) +{ + pthread_mutex_lock(&thd->data_mutex); + thd->cancelled = TRUE; + pthread_cond_signal(&thd->data_cond); + pthread_mutex_unlock(&thd->data_mutex); + + pthread_join(thd->id, NULL); + + pthread_cond_destroy(&thd->data_cond); + pthread_cond_destroy(&thd->done_cond); + pthread_mutex_destroy(&thd->data_mutex); + + my_free(thd->to); +} + +static comp_thread_ctxt_t * create_worker_threads(uint n) { @@ -337,60 +351,36 @@ create_worker_threads(uint n) uint i; threads = (comp_thread_ctxt_t *) - my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE)); + my_malloc(n * sizeof *threads, MYF(MY_ZEROFILL|MY_FAE)); for (i = 0; i < n; i++) { comp_thread_ctxt_t *thd = threads + i; thd->num = i + 1; - thd->started = FALSE; - thd->cancelled = FALSE; - thd->data_avail = FALSE; - thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE + MY_QLZ_COMPRESS_OVERHEAD, MYF(MY_FAE)); - /* Initialize the control mutex and condition var */ - if (pthread_mutex_init(&thd->ctrl_mutex, NULL) || - pthread_cond_init(&thd->ctrl_cond, NULL)) { - goto err; - } - /* Initialize and data mutex and condition var */ if (pthread_mutex_init(&thd->data_mutex, NULL) || - pthread_cond_init(&thd->data_cond, NULL)) { + pthread_cond_init(&thd->data_cond, NULL) || + pthread_cond_init(&thd->done_cond, NULL)) { goto err; } - pthread_mutex_lock(&thd->ctrl_mutex); - if (pthread_create(&thd->id, NULL, compress_worker_thread_func, thd)) { msg("compress: pthread_create() failed: " "errno = %d", errno); - pthread_mutex_unlock(&thd->ctrl_mutex); goto err; } } - /* Wait for the threads to start */ - for (i = 0; i < n; i++) { - comp_thread_ctxt_t *thd = threads + i; - - while (thd->started == FALSE) - pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex); - pthread_mutex_unlock(&thd->ctrl_mutex); - } - return threads; err: - while (i > 0) { - comp_thread_ctxt_t *thd; - i--; - thd = threads + i; - pthread_mutex_unlock(&thd->ctrl_mutex); + for (; i; i--) { + destroy_worker_thread(threads + i); } my_free(threads); @@ -404,21 +394,7 @@ destroy_worker_threads(comp_thread_ctxt_t *threads, uint n) uint i; for (i = 0; i < n; i++) { - comp_thread_ctxt_t *thd = threads + i; - - pthread_mutex_lock(&thd->data_mutex); - threads[i].cancelled = TRUE; - pthread_cond_signal(&thd->data_cond); - pthread_mutex_unlock(&thd->data_mutex); - - pthread_join(thd->id, NULL); - - pthread_cond_destroy(&thd->data_cond); - pthread_mutex_destroy(&thd->data_mutex); - pthread_cond_destroy(&thd->ctrl_cond); - pthread_mutex_destroy(&thd->ctrl_mutex); - - my_free(thd->to); + destroy_worker_thread(threads + i); } my_free(threads); @@ -430,26 +406,16 @@ compress_worker_thread_func(void *arg) { comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg; - pthread_mutex_lock(&thd->ctrl_mutex); - pthread_mutex_lock(&thd->data_mutex); - thd->started = TRUE; - pthread_cond_signal(&thd->ctrl_cond); - - pthread_mutex_unlock(&thd->ctrl_mutex); - while (1) { - thd->data_avail = FALSE; - pthread_cond_signal(&thd->data_cond); - while (!thd->data_avail && !thd->cancelled) { pthread_cond_wait(&thd->data_cond, &thd->data_mutex); } if (thd->cancelled) break; - + thd->data_avail = FALSE; thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len, &thd->state); @@ -464,6 +430,7 @@ compress_worker_thread_func(void *arg) thd->adler = adler32(0x00000001, (uchar *) thd->to, (uInt)thd->to_len); + pthread_cond_signal(&thd->done_cond); } pthread_mutex_unlock(&thd->data_mutex); |