diff options
author | Oleksandr Byelkin <sanja@mariadb.com> | 2022-07-27 11:02:57 +0200 |
---|---|---|
committer | Oleksandr Byelkin <sanja@mariadb.com> | 2022-07-27 11:02:57 +0200 |
commit | 3bb36e949534fc4a24d68d4297663ae8b80ba336 (patch) | |
tree | 09907f6c2e82d718f261323075ffb33cb350fef7 /extra | |
parent | 9a897335eb4387980aed7698b832a893dbaa3d81 (diff) | |
parent | bd935a41060199a17019453d6e187e8edd7929ba (diff) | |
download | mariadb-git-3bb36e949534fc4a24d68d4297663ae8b80ba336.tar.gz |
Merge branch '10.3' into 10.4
Diffstat (limited to 'extra')
-rw-r--r-- | extra/mariabackup/ds_compress.cc | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc index 39a72cdca34..dc1140edfee 100644 --- a/extra/mariabackup/ds_compress.cc +++ b/extra/mariabackup/ds_compress.cc @@ -35,6 +35,7 @@ typedef struct { uint num; pthread_mutex_t data_mutex; pthread_cond_t data_cond; + pthread_cond_t done_cond; my_bool data_avail; my_bool cancelled; const char *from; @@ -231,25 +232,24 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) thd = threads + i; pthread_mutex_lock(&thd->data_mutex); - while (thd->data_avail == TRUE) { - pthread_cond_wait(&thd->data_cond, + while (!thd->to_len) { + pthread_cond_wait(&thd->done_cond, &thd->data_mutex); } - xb_a(threads[i].to_len > 0); - bool fail = ds_write(dest_file, "NEWBNEWB", 8) || write_uint64_le(dest_file, comp_file->bytes_processed); - comp_file->bytes_processed += threads[i].from_len; + comp_file->bytes_processed += thd->from_len; if (!fail) { - fail = write_uint32_le(dest_file, threads[i].adler) || - ds_write(dest_file, threads[i].to, - threads[i].to_len); + fail = write_uint32_le(dest_file, thd->adler) || + ds_write(dest_file, thd->to, + thd->to_len); } - pthread_mutex_unlock(&threads[i].data_mutex); + thd->to_len = 0; + pthread_mutex_unlock(&thd->data_mutex); if (fail) { msg("compress: write to the destination stream " @@ -337,6 +337,7 @@ destroy_worker_thread(comp_thread_ctxt_t *thd) pthread_join(thd->id, NULL); pthread_cond_destroy(&thd->data_cond); + pthread_cond_destroy(&thd->done_cond); pthread_mutex_destroy(&thd->data_mutex); my_free(thd->to); @@ -350,22 +351,20 @@ create_worker_threads(uint n) uint i; threads = (comp_thread_ctxt_t *) - my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE)); + my_malloc(n * sizeof *threads, MYF(MY_ZEROFILL|MY_FAE)); for (i = 0; i < n; i++) { comp_thread_ctxt_t *thd = threads + i; thd->num = i + 1; - thd->cancelled = FALSE; - thd->data_avail = FALSE; - thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE + MY_QLZ_COMPRESS_OVERHEAD, MYF(MY_FAE)); /* Initialize and data mutex and condition var */ if (pthread_mutex_init(&thd->data_mutex, NULL) || - pthread_cond_init(&thd->data_cond, NULL)) { + pthread_cond_init(&thd->data_cond, NULL) || + pthread_cond_init(&thd->done_cond, NULL)) { goto err; } @@ -416,7 +415,7 @@ compress_worker_thread_func(void *arg) if (thd->cancelled) break; - + thd->data_avail = FALSE; thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len, &thd->state); @@ -431,6 +430,7 @@ compress_worker_thread_func(void *arg) thd->adler = adler32(0x00000001, (uchar *) thd->to, (uInt)thd->to_len); + pthread_cond_signal(&thd->done_cond); } pthread_mutex_unlock(&thd->data_mutex); |