summaryrefslogtreecommitdiff
path: root/storage/tokudb/PerconaFT/ft/loader/dbufio.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/PerconaFT/ft/loader/dbufio.cc')
-rw-r--r--storage/tokudb/PerconaFT/ft/loader/dbufio.cc163
1 files changed, 92 insertions, 71 deletions
diff --git a/storage/tokudb/PerconaFT/ft/loader/dbufio.cc b/storage/tokudb/PerconaFT/ft/loader/dbufio.cc
index ad084a4fbdc..90f76cecf90 100644
--- a/storage/tokudb/PerconaFT/ft/loader/dbufio.cc
+++ b/storage/tokudb/PerconaFT/ft/loader/dbufio.cc
@@ -49,6 +49,10 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
#include "loader/dbufio.h"
#include "loader/loader-internal.h"
+toku_instr_key *bfs_mutex_key;
+toku_instr_key *bfs_cond_key;
+toku_instr_key *io_thread_key;
+
struct dbufio_file {
// i/o thread owns these
int fd;
@@ -276,39 +280,44 @@ static void* io_thread (void *v)
toku_mutex_lock(&bfs->mutex);
//printf("%s:%d Locked\n", __FILE__, __LINE__);
while (1) {
+ if (paniced(bfs)) {
+ toku_mutex_unlock(&bfs->mutex); // ignore any error
+ toku_instr_delete_current_thread();
+ return toku_pthread_done(nullptr);
+ }
+ // printf("n_not_done=%d\n", bfs->n_not_done);
+ if (bfs->n_not_done == 0) {
+ // all done (meaning we stored EOF (or another error) in
+ // error_code[0] for the file.
+ // printf("unlocked\n");
+ toku_mutex_unlock(&bfs->mutex);
+ toku_instr_delete_current_thread();
+ return toku_pthread_done(nullptr);
+ }
- if (paniced(bfs)) {
- toku_mutex_unlock(&bfs->mutex); // ignore any error
- return 0;
- }
- //printf("n_not_done=%d\n", bfs->n_not_done);
- if (bfs->n_not_done==0) {
- // all done (meaning we stored EOF (or another error) in error_code[0] for the file.
- //printf("unlocked\n");
- toku_mutex_unlock(&bfs->mutex);
- return 0;
- }
-
- struct dbufio_file *dbf = bfs->head;
- if (dbf==NULL) {
- // No I/O needs to be done yet.
- // Wait until something happens that will wake us up.
- toku_cond_wait(&bfs->cond, &bfs->mutex);
- if (paniced(bfs)) {
- toku_mutex_unlock(&bfs->mutex); // ignore any error
- return 0;
- }
- // Have the lock so go around.
- } else {
- // Some I/O needs to be done.
- //printf("%s:%d Need I/O\n", __FILE__, __LINE__);
- assert(dbf->second_buf_ready == false);
- assert(!dbf->io_done);
- bfs->head = dbf->next;
- if (bfs->head==NULL) bfs->tail=NULL;
-
- // Unlock the mutex now that we have ownership of dbf to allow consumers to get the mutex and perform swaps. They won't swap
- // this buffer because second_buf_ready is false.
+ struct dbufio_file *dbf = bfs->head;
+ if (dbf == NULL) {
+ // No I/O needs to be done yet.
+ // Wait until something happens that will wake us up.
+ toku_cond_wait(&bfs->cond, &bfs->mutex);
+ if (paniced(bfs)) {
+ toku_mutex_unlock(&bfs->mutex); // ignore any error
+ toku_instr_delete_current_thread();
+ return toku_pthread_done(nullptr);
+ }
+ // Have the lock so go around.
+ } else {
+ // Some I/O needs to be done.
+ // printf("%s:%d Need I/O\n", __FILE__, __LINE__);
+ assert(dbf->second_buf_ready == false);
+ assert(!dbf->io_done);
+ bfs->head = dbf->next;
+ if (bfs->head == NULL)
+ bfs->tail = NULL;
+
+ // Unlock the mutex now that we have ownership of dbf to allow
+ // consumers to get the mutex and perform swaps. They won't swap
+ // this buffer because second_buf_ready is false.
toku_mutex_unlock(&bfs->mutex);
//printf("%s:%d Doing read fd=%d\n", __FILE__, __LINE__, dbf->fd);
{
@@ -339,14 +348,16 @@ static void* io_thread (void *v)
//printf("%s:%d locking mutex again=%ld\n", __FILE__, __LINE__, readcode);
{
- toku_mutex_lock(&bfs->mutex);
- if (paniced(bfs)) {
- toku_mutex_unlock(&bfs->mutex); // ignore any error
- return 0;
- }
- }
- // Now that we have the mutex, we can decrement n_not_done (if applicable) and set second_buf_ready
- if (readcode<=0) {
+ toku_mutex_lock(&bfs->mutex);
+ if (paniced(bfs)) {
+ toku_mutex_unlock(&bfs->mutex); // ignore any error
+ toku_instr_delete_current_thread();
+ return toku_pthread_done(nullptr);
+ }
+ }
+ // Now that we have the mutex, we can decrement n_not_done (if
+ // applicable) and set second_buf_ready
+ if (readcode<=0) {
bfs->n_not_done--;
}
//printf("%s:%d n_not_done=%d\n", __FILE__, __LINE__, bfs->n_not_done);
@@ -377,34 +388,36 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b
}
}
}
- //printf("%s:%d here\n", __FILE__, __LINE__);
- if (result==0) {
- toku_mutex_init(&bfs->mutex, NULL);
- mutex_inited = true;
+ // printf("%s:%d here\n", __FILE__, __LINE__);
+ if (result == 0) {
+ toku_mutex_init(*bfs_mutex_key, &bfs->mutex, nullptr);
+ mutex_inited = true;
}
- if (result==0) {
- toku_cond_init(&bfs->cond, NULL);
- cond_inited = true;
+ if (result == 0) {
+ toku_cond_init(*bfs_cond_key, &bfs->cond, nullptr);
+ cond_inited = true;
}
- if (result==0) {
- bfs->N = N;
- bfs->n_not_done = N;
- bfs->head = bfs->tail = NULL;
- for (int i=0; i<N; i++) {
- bfs->files[i].fd = fds[i];
- bfs->files[i].offset_in_buf = 0;
- bfs->files[i].offset_in_uncompressed_file = 0;
- bfs->files[i].next = NULL;
- bfs->files[i].second_buf_ready = false;
- for (int j=0; j<2; j++) {
- if (result==0) {
- MALLOC_N(bufsize, bfs->files[i].buf[j]);
- if (bfs->files[i].buf[j]==NULL) { result=get_error_errno(); }
- }
- bfs->files[i].n_in_buf[j] = 0;
- bfs->files[i].error_code[j] = 0;
- }
- bfs->files[i].io_done = false;
+ if (result == 0) {
+ bfs->N = N;
+ bfs->n_not_done = N;
+ bfs->head = bfs->tail = NULL;
+ for (int i = 0; i < N; i++) {
+ bfs->files[i].fd = fds[i];
+ bfs->files[i].offset_in_buf = 0;
+ bfs->files[i].offset_in_uncompressed_file = 0;
+ bfs->files[i].next = NULL;
+ bfs->files[i].second_buf_ready = false;
+ for (int j = 0; j < 2; j++) {
+ if (result == 0) {
+ MALLOC_N(bufsize, bfs->files[i].buf[j]);
+ if (bfs->files[i].buf[j] == NULL) {
+ result = get_error_errno();
+ }
+ }
+ bfs->files[i].n_in_buf[j] = 0;
+ bfs->files[i].error_code[j] = 0;
+ }
+ bfs->files[i].io_done = false;
ssize_t r;
if (bfs->compressed) {
r = dbf_read_compressed(&bfs->files[i], bfs->files[i].buf[0], bufsize);
@@ -431,13 +444,21 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b
bfs->panic = false;
bfs->panic_errno = 0;
}
- //printf("Creating IO thread\n");
- if (result==0) {
- result = toku_pthread_create(&bfs->iothread, NULL, io_thread, (void*)bfs);
+ // printf("Creating IO thread\n");
+ if (result == 0) {
+ result = toku_pthread_create(*io_thread_key,
+ &bfs->iothread,
+ nullptr,
+ io_thread,
+ static_cast<void *>(bfs));
+ }
+ if (result == 0) {
+ *bfsp = bfs;
+ return 0;
}
- if (result==0) { *bfsp = bfs; return 0; }
// Now undo everything.
- // If we got here, there is no thread (either result was zero before the thread was created, or else the thread creation itself failed.
+ // If we got here, there is no thread (either result was zero before the
+ // thread was created, or else the thread creation itself failed.
if (bfs) {
if (bfs->files) {
// the files were allocated, so we have to free all the bufs.