diff options
Diffstat (limited to 'storage/tokudb/PerconaFT/ft/loader/dbufio.cc')
-rw-r--r-- | storage/tokudb/PerconaFT/ft/loader/dbufio.cc | 163 |
1 files changed, 92 insertions, 71 deletions
diff --git a/storage/tokudb/PerconaFT/ft/loader/dbufio.cc b/storage/tokudb/PerconaFT/ft/loader/dbufio.cc index ad084a4fbdc..90f76cecf90 100644 --- a/storage/tokudb/PerconaFT/ft/loader/dbufio.cc +++ b/storage/tokudb/PerconaFT/ft/loader/dbufio.cc @@ -49,6 +49,10 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #include "loader/dbufio.h" #include "loader/loader-internal.h" +toku_instr_key *bfs_mutex_key; +toku_instr_key *bfs_cond_key; +toku_instr_key *io_thread_key; + struct dbufio_file { // i/o thread owns these int fd; @@ -276,39 +280,44 @@ static void* io_thread (void *v) toku_mutex_lock(&bfs->mutex); //printf("%s:%d Locked\n", __FILE__, __LINE__); while (1) { + if (paniced(bfs)) { + toku_mutex_unlock(&bfs->mutex); // ignore any error + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } + // printf("n_not_done=%d\n", bfs->n_not_done); + if (bfs->n_not_done == 0) { + // all done (meaning we stored EOF (or another error) in + // error_code[0] for the file. + // printf("unlocked\n"); + toku_mutex_unlock(&bfs->mutex); + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } - if (paniced(bfs)) { - toku_mutex_unlock(&bfs->mutex); // ignore any error - return 0; - } - //printf("n_not_done=%d\n", bfs->n_not_done); - if (bfs->n_not_done==0) { - // all done (meaning we stored EOF (or another error) in error_code[0] for the file. - //printf("unlocked\n"); - toku_mutex_unlock(&bfs->mutex); - return 0; - } - - struct dbufio_file *dbf = bfs->head; - if (dbf==NULL) { - // No I/O needs to be done yet. - // Wait until something happens that will wake us up. - toku_cond_wait(&bfs->cond, &bfs->mutex); - if (paniced(bfs)) { - toku_mutex_unlock(&bfs->mutex); // ignore any error - return 0; - } - // Have the lock so go around. - } else { - // Some I/O needs to be done. - //printf("%s:%d Need I/O\n", __FILE__, __LINE__); - assert(dbf->second_buf_ready == false); - assert(!dbf->io_done); - bfs->head = dbf->next; - if (bfs->head==NULL) bfs->tail=NULL; - - // Unlock the mutex now that we have ownership of dbf to allow consumers to get the mutex and perform swaps. They won't swap - // this buffer because second_buf_ready is false. + struct dbufio_file *dbf = bfs->head; + if (dbf == NULL) { + // No I/O needs to be done yet. + // Wait until something happens that will wake us up. + toku_cond_wait(&bfs->cond, &bfs->mutex); + if (paniced(bfs)) { + toku_mutex_unlock(&bfs->mutex); // ignore any error + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } + // Have the lock so go around. + } else { + // Some I/O needs to be done. + // printf("%s:%d Need I/O\n", __FILE__, __LINE__); + assert(dbf->second_buf_ready == false); + assert(!dbf->io_done); + bfs->head = dbf->next; + if (bfs->head == NULL) + bfs->tail = NULL; + + // Unlock the mutex now that we have ownership of dbf to allow + // consumers to get the mutex and perform swaps. They won't swap + // this buffer because second_buf_ready is false. toku_mutex_unlock(&bfs->mutex); //printf("%s:%d Doing read fd=%d\n", __FILE__, __LINE__, dbf->fd); { @@ -339,14 +348,16 @@ static void* io_thread (void *v) //printf("%s:%d locking mutex again=%ld\n", __FILE__, __LINE__, readcode); { - toku_mutex_lock(&bfs->mutex); - if (paniced(bfs)) { - toku_mutex_unlock(&bfs->mutex); // ignore any error - return 0; - } - } - // Now that we have the mutex, we can decrement n_not_done (if applicable) and set second_buf_ready - if (readcode<=0) { + toku_mutex_lock(&bfs->mutex); + if (paniced(bfs)) { + toku_mutex_unlock(&bfs->mutex); // ignore any error + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } + } + // Now that we have the mutex, we can decrement n_not_done (if + // applicable) and set second_buf_ready + if (readcode<=0) { bfs->n_not_done--; } //printf("%s:%d n_not_done=%d\n", __FILE__, __LINE__, bfs->n_not_done); @@ -377,34 +388,36 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b } } } - //printf("%s:%d here\n", __FILE__, __LINE__); - if (result==0) { - toku_mutex_init(&bfs->mutex, NULL); - mutex_inited = true; + // printf("%s:%d here\n", __FILE__, __LINE__); + if (result == 0) { + toku_mutex_init(*bfs_mutex_key, &bfs->mutex, nullptr); + mutex_inited = true; } - if (result==0) { - toku_cond_init(&bfs->cond, NULL); - cond_inited = true; + if (result == 0) { + toku_cond_init(*bfs_cond_key, &bfs->cond, nullptr); + cond_inited = true; } - if (result==0) { - bfs->N = N; - bfs->n_not_done = N; - bfs->head = bfs->tail = NULL; - for (int i=0; i<N; i++) { - bfs->files[i].fd = fds[i]; - bfs->files[i].offset_in_buf = 0; - bfs->files[i].offset_in_uncompressed_file = 0; - bfs->files[i].next = NULL; - bfs->files[i].second_buf_ready = false; - for (int j=0; j<2; j++) { - if (result==0) { - MALLOC_N(bufsize, bfs->files[i].buf[j]); - if (bfs->files[i].buf[j]==NULL) { result=get_error_errno(); } - } - bfs->files[i].n_in_buf[j] = 0; - bfs->files[i].error_code[j] = 0; - } - bfs->files[i].io_done = false; + if (result == 0) { + bfs->N = N; + bfs->n_not_done = N; + bfs->head = bfs->tail = NULL; + for (int i = 0; i < N; i++) { + bfs->files[i].fd = fds[i]; + bfs->files[i].offset_in_buf = 0; + bfs->files[i].offset_in_uncompressed_file = 0; + bfs->files[i].next = NULL; + bfs->files[i].second_buf_ready = false; + for (int j = 0; j < 2; j++) { + if (result == 0) { + MALLOC_N(bufsize, bfs->files[i].buf[j]); + if (bfs->files[i].buf[j] == NULL) { + result = get_error_errno(); + } + } + bfs->files[i].n_in_buf[j] = 0; + bfs->files[i].error_code[j] = 0; + } + bfs->files[i].io_done = false; ssize_t r; if (bfs->compressed) { r = dbf_read_compressed(&bfs->files[i], bfs->files[i].buf[0], bufsize); @@ -431,13 +444,21 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b bfs->panic = false; bfs->panic_errno = 0; } - //printf("Creating IO thread\n"); - if (result==0) { - result = toku_pthread_create(&bfs->iothread, NULL, io_thread, (void*)bfs); + // printf("Creating IO thread\n"); + if (result == 0) { + result = toku_pthread_create(*io_thread_key, + &bfs->iothread, + nullptr, + io_thread, + static_cast<void *>(bfs)); + } + if (result == 0) { + *bfsp = bfs; + return 0; } - if (result==0) { *bfsp = bfs; return 0; } // Now undo everything. - // If we got here, there is no thread (either result was zero before the thread was created, or else the thread creation itself failed. + // If we got here, there is no thread (either result was zero before the + // thread was created, or else the thread creation itself failed. if (bfs) { if (bfs->files) { // the files were allocated, so we have to free all the bufs. |