summaryrefslogtreecommitdiff
path: root/extra
diff options
context:
space:
mode:
authorOleksandr Byelkin <sanja@mariadb.com>2022-07-27 11:02:57 +0200
committerOleksandr Byelkin <sanja@mariadb.com>2022-07-27 11:02:57 +0200
commit3bb36e949534fc4a24d68d4297663ae8b80ba336 (patch)
tree09907f6c2e82d718f261323075ffb33cb350fef7 /extra
parent9a897335eb4387980aed7698b832a893dbaa3d81 (diff)
parentbd935a41060199a17019453d6e187e8edd7929ba (diff)
downloadmariadb-git-3bb36e949534fc4a24d68d4297663ae8b80ba336.tar.gz
Merge branch '10.3' into 10.4
Diffstat (limited to 'extra')
-rw-r--r--extra/mariabackup/ds_compress.cc30
1 files changed, 15 insertions, 15 deletions
diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc
index 39a72cdca34..dc1140edfee 100644
--- a/extra/mariabackup/ds_compress.cc
+++ b/extra/mariabackup/ds_compress.cc
@@ -35,6 +35,7 @@ typedef struct {
uint num;
pthread_mutex_t data_mutex;
pthread_cond_t data_cond;
+ pthread_cond_t done_cond;
my_bool data_avail;
my_bool cancelled;
const char *from;
@@ -231,25 +232,24 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i;
pthread_mutex_lock(&thd->data_mutex);
- while (thd->data_avail == TRUE) {
- pthread_cond_wait(&thd->data_cond,
+ while (!thd->to_len) {
+ pthread_cond_wait(&thd->done_cond,
&thd->data_mutex);
}
- xb_a(threads[i].to_len > 0);
-
bool fail = ds_write(dest_file, "NEWBNEWB", 8) ||
write_uint64_le(dest_file,
comp_file->bytes_processed);
- comp_file->bytes_processed += threads[i].from_len;
+ comp_file->bytes_processed += thd->from_len;
if (!fail) {
- fail = write_uint32_le(dest_file, threads[i].adler) ||
- ds_write(dest_file, threads[i].to,
- threads[i].to_len);
+ fail = write_uint32_le(dest_file, thd->adler) ||
+ ds_write(dest_file, thd->to,
+ thd->to_len);
}
- pthread_mutex_unlock(&threads[i].data_mutex);
+ thd->to_len = 0;
+ pthread_mutex_unlock(&thd->data_mutex);
if (fail) {
msg("compress: write to the destination stream "
@@ -337,6 +337,7 @@ destroy_worker_thread(comp_thread_ctxt_t *thd)
pthread_join(thd->id, NULL);
pthread_cond_destroy(&thd->data_cond);
+ pthread_cond_destroy(&thd->done_cond);
pthread_mutex_destroy(&thd->data_mutex);
my_free(thd->to);
@@ -350,22 +351,20 @@ create_worker_threads(uint n)
uint i;
threads = (comp_thread_ctxt_t *)
- my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE));
+ my_malloc(n * sizeof *threads, MYF(MY_ZEROFILL|MY_FAE));
for (i = 0; i < n; i++) {
comp_thread_ctxt_t *thd = threads + i;
thd->num = i + 1;
- thd->cancelled = FALSE;
- thd->data_avail = FALSE;
-
thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE +
MY_QLZ_COMPRESS_OVERHEAD,
MYF(MY_FAE));
/* Initialize and data mutex and condition var */
if (pthread_mutex_init(&thd->data_mutex, NULL) ||
- pthread_cond_init(&thd->data_cond, NULL)) {
+ pthread_cond_init(&thd->data_cond, NULL) ||
+ pthread_cond_init(&thd->done_cond, NULL)) {
goto err;
}
@@ -416,7 +415,7 @@ compress_worker_thread_func(void *arg)
if (thd->cancelled)
break;
-
+ thd->data_avail = FALSE;
thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
&thd->state);
@@ -431,6 +430,7 @@ compress_worker_thread_func(void *arg)
thd->adler = adler32(0x00000001, (uchar *) thd->to,
(uInt)thd->to_len);
+ pthread_cond_signal(&thd->done_cond);
}
pthread_mutex_unlock(&thd->data_mutex);