diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2022-06-02 16:51:13 +0300 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2022-06-02 16:51:13 +0300 |
commit | 4b3c3e526e7067da921d0aef8451b6e2736a5fe0 (patch) | |
tree | 40a6d851d7c39c443c587e29a725afda85c65929 /extra | |
parent | e7de50a82187cbaaa192c2065d64c0041cd9a6a1 (diff) | |
parent | 96f4b4a55b449a29af7e4b70bebf0ff238ae7f80 (diff) | |
download | mariadb-git-4b3c3e526e7067da921d0aef8451b6e2736a5fe0.tar.gz |
Merge 10.4 into 10.5
Diffstat (limited to 'extra')
-rw-r--r-- | extra/mariabackup/ds_compress.cc | 100 |
1 files changed, 33 insertions, 67 deletions
diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc index 40566a1a8b1..eed3467b7f4 100644 --- a/extra/mariabackup/ds_compress.cc +++ b/extra/mariabackup/ds_compress.cc @@ -1,5 +1,6 @@ /****************************************************** Copyright (c) 2011-2013 Percona LLC and/or its affiliates. +Copyright (c) 2022, MariaDB Corporation. Compressing datasink implementation for XtraBackup. @@ -32,11 +33,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA typedef struct { pthread_t id; uint num; - pthread_mutex_t ctrl_mutex; - pthread_cond_t ctrl_cond; pthread_mutex_t data_mutex; pthread_cond_t data_cond; - my_bool started; my_bool data_avail; my_bool cancelled; const char *from; @@ -206,14 +204,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) thd = threads + i; - pthread_mutex_lock(&thd->ctrl_mutex); + pthread_mutex_lock(&thd->data_mutex); chunk_len = (len > COMPRESS_CHUNK_SIZE) ? COMPRESS_CHUNK_SIZE : len; thd->from = ptr; thd->from_len = chunk_len; - pthread_mutex_lock(&thd->data_mutex); thd->data_avail = TRUE; pthread_cond_signal(&thd->data_cond); pthread_mutex_unlock(&thd->data_mutex); @@ -239,26 +236,24 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) xb_a(threads[i].to_len > 0); - if (ds_write(dest_file, "NEWBNEWB", 8) || - write_uint64_le(dest_file, - comp_file->bytes_processed)) { - msg("compress: write to the destination stream " - "failed."); - return 1; + bool fail = ds_write(dest_file, "NEWBNEWB", 8) || + write_uint64_le(dest_file, + comp_file->bytes_processed); + comp_file->bytes_processed += threads[i].from_len; + + if (!fail) { + fail = write_uint32_le(dest_file, threads[i].adler) || + ds_write(dest_file, threads[i].to, + threads[i].to_len); } - comp_file->bytes_processed += threads[i].from_len; + pthread_mutex_unlock(&threads[i].data_mutex); - if (write_uint32_le(dest_file, threads[i].adler) || - ds_write(dest_file, threads[i].to, - threads[i].to_len)) { + if (fail) { msg("compress: write to the destination stream " "failed."); return 1; } - - pthread_mutex_unlock(&threads[i].data_mutex); - pthread_mutex_unlock(&threads[i].ctrl_mutex); } } @@ -329,6 +324,23 @@ write_uint64_le(ds_file_t *file, ulonglong n) } static +void +destroy_worker_thread(comp_thread_ctxt_t *thd) +{ + pthread_mutex_lock(&thd->data_mutex); + thd->cancelled = TRUE; + pthread_cond_signal(&thd->data_cond); + pthread_mutex_unlock(&thd->data_mutex); + + pthread_join(thd->id, NULL); + + pthread_cond_destroy(&thd->data_cond); + pthread_mutex_destroy(&thd->data_mutex); + + my_free(thd->to); +} + +static comp_thread_ctxt_t * create_worker_threads(uint n) { @@ -342,53 +354,31 @@ create_worker_threads(uint n) comp_thread_ctxt_t *thd = threads + i; thd->num = i + 1; - thd->started = FALSE; thd->cancelled = FALSE; thd->data_avail = FALSE; thd->to = (char *) my_malloc(PSI_NOT_INSTRUMENTED, COMPRESS_CHUNK_SIZE + MY_QLZ_COMPRESS_OVERHEAD, MYF(MY_FAE)); - /* Initialize the control mutex and condition var */ - if (pthread_mutex_init(&thd->ctrl_mutex, NULL) || - pthread_cond_init(&thd->ctrl_cond, NULL)) { - goto err; - } - /* Initialize and data mutex and condition var */ if (pthread_mutex_init(&thd->data_mutex, NULL) || pthread_cond_init(&thd->data_cond, NULL)) { goto err; } - pthread_mutex_lock(&thd->ctrl_mutex); - if (pthread_create(&thd->id, NULL, compress_worker_thread_func, thd)) { msg("compress: pthread_create() failed: " "errno = %d", errno); - pthread_mutex_unlock(&thd->ctrl_mutex); goto err; } } - /* Wait for the threads to start */ - for (i = 0; i < n; i++) { - comp_thread_ctxt_t *thd = threads + i; - - while (thd->started == FALSE) - pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex); - pthread_mutex_unlock(&thd->ctrl_mutex); - } - return threads; err: - while (i > 0) { - comp_thread_ctxt_t *thd; - i--; - thd = threads + i; - pthread_mutex_unlock(&thd->ctrl_mutex); + for (; i; i--) { + destroy_worker_thread(threads + i); } my_free(threads); @@ -402,21 +392,7 @@ destroy_worker_threads(comp_thread_ctxt_t *threads, uint n) uint i; for (i = 0; i < n; i++) { - comp_thread_ctxt_t *thd = threads + i; - - pthread_mutex_lock(&thd->data_mutex); - threads[i].cancelled = TRUE; - pthread_cond_signal(&thd->data_cond); - pthread_mutex_unlock(&thd->data_mutex); - - pthread_join(thd->id, NULL); - - pthread_cond_destroy(&thd->data_cond); - pthread_mutex_destroy(&thd->data_mutex); - pthread_cond_destroy(&thd->ctrl_cond); - pthread_mutex_destroy(&thd->ctrl_mutex); - - my_free(thd->to); + destroy_worker_thread(threads + i); } my_free(threads); @@ -428,19 +404,9 @@ compress_worker_thread_func(void *arg) { comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg; - pthread_mutex_lock(&thd->ctrl_mutex); - pthread_mutex_lock(&thd->data_mutex); - thd->started = TRUE; - pthread_cond_signal(&thd->ctrl_cond); - - pthread_mutex_unlock(&thd->ctrl_mutex); - while (1) { - thd->data_avail = FALSE; - pthread_cond_signal(&thd->data_cond); - while (!thd->data_avail && !thd->cancelled) { pthread_cond_wait(&thd->data_cond, &thd->data_mutex); } |