diff options
Diffstat (limited to 'storage/tokudb/PerconaFT/ft/loader')
-rw-r--r-- | storage/tokudb/PerconaFT/ft/loader/callbacks.cc | 6 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/ft/loader/dbufio.cc | 163 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/ft/loader/loader-internal.h | 37 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/ft/loader/loader.cc | 365 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/ft/loader/loader.h | 5 |
5 files changed, 323 insertions, 253 deletions
diff --git a/storage/tokudb/PerconaFT/ft/loader/callbacks.cc b/storage/tokudb/PerconaFT/ft/loader/callbacks.cc index 6a520dba3a3..ac69fb7e789 100644 --- a/storage/tokudb/PerconaFT/ft/loader/callbacks.cc +++ b/storage/tokudb/PerconaFT/ft/loader/callbacks.cc @@ -45,6 +45,8 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #include "loader/loader-internal.h" #include "util/dbt.h" +toku_instr_key *loader_error_mutex_key; + static void error_callback_lock(ft_loader_error_callback loader_error) { toku_mutex_lock(&loader_error->mutex); } @@ -57,10 +59,10 @@ void ft_loader_init_error_callback(ft_loader_error_callback loader_error) { memset(loader_error, 0, sizeof *loader_error); toku_init_dbt(&loader_error->key); toku_init_dbt(&loader_error->val); - toku_mutex_init(&loader_error->mutex, NULL); + toku_mutex_init(*loader_error_mutex_key, &loader_error->mutex, nullptr); } -void ft_loader_destroy_error_callback(ft_loader_error_callback loader_error) { +void ft_loader_destroy_error_callback(ft_loader_error_callback loader_error) { toku_mutex_destroy(&loader_error->mutex); toku_destroy_dbt(&loader_error->key); toku_destroy_dbt(&loader_error->val); diff --git a/storage/tokudb/PerconaFT/ft/loader/dbufio.cc b/storage/tokudb/PerconaFT/ft/loader/dbufio.cc index ad084a4fbdc..90f76cecf90 100644 --- a/storage/tokudb/PerconaFT/ft/loader/dbufio.cc +++ b/storage/tokudb/PerconaFT/ft/loader/dbufio.cc @@ -49,6 +49,10 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #include "loader/dbufio.h" #include "loader/loader-internal.h" +toku_instr_key *bfs_mutex_key; +toku_instr_key *bfs_cond_key; +toku_instr_key *io_thread_key; + struct dbufio_file { // i/o thread owns these int fd; @@ -276,39 +280,44 @@ static void* io_thread (void *v) toku_mutex_lock(&bfs->mutex); //printf("%s:%d Locked\n", __FILE__, __LINE__); while (1) { + if (paniced(bfs)) { + toku_mutex_unlock(&bfs->mutex); // ignore any error + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } + // printf("n_not_done=%d\n", bfs->n_not_done); + if (bfs->n_not_done == 0) { + // all done (meaning we stored EOF (or another error) in + // error_code[0] for the file. + // printf("unlocked\n"); + toku_mutex_unlock(&bfs->mutex); + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } - if (paniced(bfs)) { - toku_mutex_unlock(&bfs->mutex); // ignore any error - return 0; - } - //printf("n_not_done=%d\n", bfs->n_not_done); - if (bfs->n_not_done==0) { - // all done (meaning we stored EOF (or another error) in error_code[0] for the file. - //printf("unlocked\n"); - toku_mutex_unlock(&bfs->mutex); - return 0; - } - - struct dbufio_file *dbf = bfs->head; - if (dbf==NULL) { - // No I/O needs to be done yet. - // Wait until something happens that will wake us up. - toku_cond_wait(&bfs->cond, &bfs->mutex); - if (paniced(bfs)) { - toku_mutex_unlock(&bfs->mutex); // ignore any error - return 0; - } - // Have the lock so go around. - } else { - // Some I/O needs to be done. - //printf("%s:%d Need I/O\n", __FILE__, __LINE__); - assert(dbf->second_buf_ready == false); - assert(!dbf->io_done); - bfs->head = dbf->next; - if (bfs->head==NULL) bfs->tail=NULL; - - // Unlock the mutex now that we have ownership of dbf to allow consumers to get the mutex and perform swaps. They won't swap - // this buffer because second_buf_ready is false. + struct dbufio_file *dbf = bfs->head; + if (dbf == NULL) { + // No I/O needs to be done yet. + // Wait until something happens that will wake us up. + toku_cond_wait(&bfs->cond, &bfs->mutex); + if (paniced(bfs)) { + toku_mutex_unlock(&bfs->mutex); // ignore any error + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } + // Have the lock so go around. + } else { + // Some I/O needs to be done. + // printf("%s:%d Need I/O\n", __FILE__, __LINE__); + assert(dbf->second_buf_ready == false); + assert(!dbf->io_done); + bfs->head = dbf->next; + if (bfs->head == NULL) + bfs->tail = NULL; + + // Unlock the mutex now that we have ownership of dbf to allow + // consumers to get the mutex and perform swaps. They won't swap + // this buffer because second_buf_ready is false. toku_mutex_unlock(&bfs->mutex); //printf("%s:%d Doing read fd=%d\n", __FILE__, __LINE__, dbf->fd); { @@ -339,14 +348,16 @@ static void* io_thread (void *v) //printf("%s:%d locking mutex again=%ld\n", __FILE__, __LINE__, readcode); { - toku_mutex_lock(&bfs->mutex); - if (paniced(bfs)) { - toku_mutex_unlock(&bfs->mutex); // ignore any error - return 0; - } - } - // Now that we have the mutex, we can decrement n_not_done (if applicable) and set second_buf_ready - if (readcode<=0) { + toku_mutex_lock(&bfs->mutex); + if (paniced(bfs)) { + toku_mutex_unlock(&bfs->mutex); // ignore any error + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } + } + // Now that we have the mutex, we can decrement n_not_done (if + // applicable) and set second_buf_ready + if (readcode<=0) { bfs->n_not_done--; } //printf("%s:%d n_not_done=%d\n", __FILE__, __LINE__, bfs->n_not_done); @@ -377,34 +388,36 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b } } } - //printf("%s:%d here\n", __FILE__, __LINE__); - if (result==0) { - toku_mutex_init(&bfs->mutex, NULL); - mutex_inited = true; + // printf("%s:%d here\n", __FILE__, __LINE__); + if (result == 0) { + toku_mutex_init(*bfs_mutex_key, &bfs->mutex, nullptr); + mutex_inited = true; } - if (result==0) { - toku_cond_init(&bfs->cond, NULL); - cond_inited = true; + if (result == 0) { + toku_cond_init(*bfs_cond_key, &bfs->cond, nullptr); + cond_inited = true; } - if (result==0) { - bfs->N = N; - bfs->n_not_done = N; - bfs->head = bfs->tail = NULL; - for (int i=0; i<N; i++) { - bfs->files[i].fd = fds[i]; - bfs->files[i].offset_in_buf = 0; - bfs->files[i].offset_in_uncompressed_file = 0; - bfs->files[i].next = NULL; - bfs->files[i].second_buf_ready = false; - for (int j=0; j<2; j++) { - if (result==0) { - MALLOC_N(bufsize, bfs->files[i].buf[j]); - if (bfs->files[i].buf[j]==NULL) { result=get_error_errno(); } - } - bfs->files[i].n_in_buf[j] = 0; - bfs->files[i].error_code[j] = 0; - } - bfs->files[i].io_done = false; + if (result == 0) { + bfs->N = N; + bfs->n_not_done = N; + bfs->head = bfs->tail = NULL; + for (int i = 0; i < N; i++) { + bfs->files[i].fd = fds[i]; + bfs->files[i].offset_in_buf = 0; + bfs->files[i].offset_in_uncompressed_file = 0; + bfs->files[i].next = NULL; + bfs->files[i].second_buf_ready = false; + for (int j = 0; j < 2; j++) { + if (result == 0) { + MALLOC_N(bufsize, bfs->files[i].buf[j]); + if (bfs->files[i].buf[j] == NULL) { + result = get_error_errno(); + } + } + bfs->files[i].n_in_buf[j] = 0; + bfs->files[i].error_code[j] = 0; + } + bfs->files[i].io_done = false; ssize_t r; if (bfs->compressed) { r = dbf_read_compressed(&bfs->files[i], bfs->files[i].buf[0], bufsize); @@ -431,13 +444,21 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b bfs->panic = false; bfs->panic_errno = 0; } - //printf("Creating IO thread\n"); - if (result==0) { - result = toku_pthread_create(&bfs->iothread, NULL, io_thread, (void*)bfs); + // printf("Creating IO thread\n"); + if (result == 0) { + result = toku_pthread_create(*io_thread_key, + &bfs->iothread, + nullptr, + io_thread, + static_cast<void *>(bfs)); + } + if (result == 0) { + *bfsp = bfs; + return 0; } - if (result==0) { *bfsp = bfs; return 0; } // Now undo everything. - // If we got here, there is no thread (either result was zero before the thread was created, or else the thread creation itself failed. + // If we got here, there is no thread (either result was zero before the + // thread was created, or else the thread creation itself failed. if (bfs) { if (bfs->files) { // the files were allocated, so we have to free all the bufs. diff --git a/storage/tokudb/PerconaFT/ft/loader/loader-internal.h b/storage/tokudb/PerconaFT/ft/loader/loader-internal.h index 1aa2c203831..6f7b0147b21 100644 --- a/storage/tokudb/PerconaFT/ft/loader/loader-internal.h +++ b/storage/tokudb/PerconaFT/ft/loader/loader-internal.h @@ -64,10 +64,10 @@ enum { /* These structures maintain a collection of all the open temporary files used by the loader. */ struct file_info { bool is_open; - bool is_extant; // if true, the file must be unlinked. + bool is_extant; // if true, the file must be unlinked. char *fname; - FILE *file; - uint64_t n_rows; // how many rows were written into that file + TOKU_FILE *file; + uint64_t n_rows; // how many rows were written into that file size_t buffer_size; void *buffer; }; @@ -80,11 +80,11 @@ struct file_infos { }; typedef struct fidx { int idx; } FIDX; static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1}; -static int fidx_is_null (const FIDX f) __attribute__((__unused__)); -static int fidx_is_null (const FIDX f) { return f.idx==-1; } -FILE *toku_bl_fidx2file (FTLOADER bl, FIDX i); +static int fidx_is_null(const FIDX f) __attribute__((__unused__)); +static int fidx_is_null(const FIDX f) { return f.idx == -1; } +TOKU_FILE *toku_bl_fidx2file(FTLOADER bl, FIDX i); -int ft_loader_open_temp_file (FTLOADER bl, FIDX*file_idx); +int ft_loader_open_temp_file(FTLOADER bl, FIDX *file_idx); /* These data structures are used for manipulating a collection of rows in main memory. */ struct row { @@ -100,11 +100,17 @@ struct rowset { }; int init_rowset (struct rowset *rows, uint64_t memory_budget); -void destroy_rowset (struct rowset *rows); -int add_row (struct rowset *rows, DBT *key, DBT *val); - -int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl); -int loader_read_row (FILE *f, DBT *key, DBT *val); +void destroy_rowset(struct rowset *rows); +int add_row(struct rowset *rows, DBT *key, DBT *val); + +int loader_write_row(DBT *key, + DBT *val, + FIDX data, + TOKU_FILE *, + uint64_t *dataoff, + struct wbuf *wb, + FTLOADER bl); +int loader_read_row(TOKU_FILE *f, DBT *key, DBT *val); struct merge_fileset { bool have_sorted_output; // Is there an previous key? @@ -195,12 +201,13 @@ struct ft_loader_s { bool did_reserve_memory; bool compress_intermediates; bool allow_puts; - uint64_t reserved_memory; // how much memory are we allowed to use? + uint64_t reserved_memory; // how much memory are we allowed to use? - /* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */ + /* To make it easier to recover from errors, we don't use TOKU_FILE*, + * instead we use an index into the file_infos. */ struct file_infos file_infos; -#define PROGRESS_MAX (1<<16) +#define PROGRESS_MAX (1 << 16) int progress; // Progress runs from 0 to PROGRESS_MAX. When we call the poll function we convert to a float from 0.0 to 1.0 // We use an integer so that we can add to the progress using a fetch-and-add instruction. diff --git a/storage/tokudb/PerconaFT/ft/loader/loader.cc b/storage/tokudb/PerconaFT/ft/loader/loader.cc index f867639b953..5f57b473bc5 100644 --- a/storage/tokudb/PerconaFT/ft/loader/loader.cc +++ b/storage/tokudb/PerconaFT/ft/loader/loader.cc @@ -63,21 +63,17 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #include "util/x1764.h" -static size_t (*os_fwrite_fun)(const void *,size_t,size_t,FILE*)=NULL; -void ft_loader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) { - os_fwrite_fun=fwrite_fun; -} +toku_instr_key *loader_bl_mutex_key; +toku_instr_key *loader_fi_lock_mutex_key; +toku_instr_key *loader_out_mutex_key; -static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) { - if (os_fwrite_fun) { - return os_fwrite_fun(ptr, size, nmemb, stream); - } else { - return fwrite(ptr, size, nmemb, stream); - } -} +toku_instr_key *extractor_thread_key; +toku_instr_key *fractal_thread_key; +toku_instr_key *tokudb_file_tmp_key; +toku_instr_key *tokudb_file_load_key; -// 1024 is the right size_factor for production. +// 1024 is the right size_factor for production. // Different values for these sizes may be used for testing. static uint32_t size_factor = 1024; static uint32_t default_loader_nodesize = FT_DEFAULT_NODE_SIZE; @@ -99,7 +95,7 @@ toku_ft_loader_get_rowset_budget_for_testing (void) void ft_loader_lock_init(FTLOADER bl) { invariant(!bl->mutex_init); - toku_mutex_init(&bl->mutex, NULL); + toku_mutex_init(*loader_bl_mutex_key, &bl->mutex, nullptr); bl->mutex_init = true; } @@ -131,7 +127,10 @@ static int add_big_buffer(struct file_info *file) { newbuffer = true; } if (result == 0) { - int r = setvbuf(file->file, (char *) file->buffer, _IOFBF, file->buffer_size); + int r = setvbuf(file->file->file, + static_cast<char *>(file->buffer), + _IOFBF, + file->buffer_size); if (r != 0) { result = get_error_errno(); if (newbuffer) { @@ -150,9 +149,9 @@ static void cleanup_big_buffer(struct file_info *file) { } } -int ft_loader_init_file_infos (struct file_infos *fi) { +int ft_loader_init_file_infos(struct file_infos *fi) { int result = 0; - toku_mutex_init(&fi->lock, NULL); + toku_mutex_init(*loader_fi_lock_mutex_key, &fi->lock, nullptr); fi->n_files = 0; fi->n_files_limit = 1; fi->n_files_open = 0; @@ -196,11 +195,10 @@ void ft_loader_fi_destroy (struct file_infos *fi, bool is_error) fi->file_infos = NULL; } -static int open_file_add (struct file_infos *fi, - FILE *file, - char *fname, - /* out */ FIDX *idx) -{ +static int open_file_add(struct file_infos *fi, + TOKU_FILE *file, + char *fname, + /* out */ FIDX *idx) { int result = 0; toku_mutex_lock(&fi->lock); if (fi->n_files >= fi->n_files_limit) { @@ -230,11 +228,12 @@ int ft_loader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode) { int result = 0; toku_mutex_lock(&fi->lock); int i = idx.idx; - invariant(i>=0 && i<fi->n_files); + invariant(i >= 0 && i < fi->n_files); invariant(!fi->file_infos[i].is_open); invariant(fi->file_infos[i].is_extant); - fi->file_infos[i].file = toku_os_fopen(fi->file_infos[i].fname, mode); - if (fi->file_infos[i].file == NULL) { + fi->file_infos[i].file = + toku_os_fopen(fi->file_infos[i].fname, mode, *tokudb_file_load_key); + if (fi->file_infos[i].file == NULL) { result = get_error_errno(); } else { fi->file_infos[i].is_open = true; @@ -307,20 +306,20 @@ int ft_loader_open_temp_file (FTLOADER bl, FIDX *file_idx) */ { int result = 0; - if (result) // debug hack + if (result) // debug hack return result; - FILE *f = NULL; + TOKU_FILE *f = NULL; int fd = -1; - char *fname = toku_strdup(bl->temp_file_template); + char *fname = toku_strdup(bl->temp_file_template); if (fname == NULL) result = get_error_errno(); else { fd = mkstemp(fname); - if (fd < 0) { + if (fd < 0) { result = get_error_errno(); } else { - f = toku_os_fdopen(fd, "r+"); - if (f == NULL) + f = toku_os_fdopen(fd, "r+", fname, *tokudb_file_tmp_key); + if (f->file == nullptr) result = get_error_errno(); else result = open_file_add(&bl->file_infos, f, fname, file_idx); @@ -339,7 +338,7 @@ int ft_loader_open_temp_file (FTLOADER bl, FIDX *file_idx) return result; } -void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error) { +void toku_ft_loader_internal_destroy(FTLOADER bl, bool is_error) { ft_loader_lock_destroy(bl); // These frees rely on the fact that if you free a NULL pointer then nothing bad happens. @@ -635,12 +634,16 @@ int toku_ft_loader_open (FTLOADER *blp, /* out */ allow_puts); if (r!=0) result = r; } - if (result==0 && allow_puts) { + if (result == 0 && allow_puts) { FTLOADER bl = *blp; - int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); - if (r==0) { + int r = toku_pthread_create(*extractor_thread_key, + &bl->extractor_thread, + nullptr, + extractor_thread, + static_cast<void *>(bl)); + if (r == 0) { bl->extractor_live = true; - } else { + } else { result = r; (void) toku_ft_loader_internal_destroy(bl, true); } @@ -659,17 +662,17 @@ static void ft_loader_set_panic(FTLOADER bl, int error, bool callback, int which } // One of the tests uses this. -FILE *toku_bl_fidx2file (FTLOADER bl, FIDX i) { +TOKU_FILE *toku_bl_fidx2file(FTLOADER bl, FIDX i) { toku_mutex_lock(&bl->file_infos.lock); - invariant(i.idx >=0 && i.idx < bl->file_infos.n_files); + invariant(i.idx >= 0 && i.idx < bl->file_infos.n_files); invariant(bl->file_infos.file_infos[i.idx].is_open); - FILE *result=bl->file_infos.file_infos[i.idx].file; + TOKU_FILE *result = bl->file_infos.file_infos[i.idx].file; toku_mutex_unlock(&bl->file_infos.lock); return result; } -static int bl_finish_compressed_write(FILE *stream, struct wbuf *wb) { - int r; +static int bl_finish_compressed_write(TOKU_FILE *stream, struct wbuf *wb) { + int r = 0; char *compressed_buf = NULL; const size_t data_size = wb->ndone; invariant(data_size > 0); @@ -720,31 +723,23 @@ static int bl_finish_compressed_write(FILE *stream, struct wbuf *wb) { // Mark as written wb->ndone = 0; - size_t size_to_write = total_size + 4; // Includes writing total_size + size_t size_to_write = total_size + 4; // Includes writing total_size + + r = toku_os_fwrite(compressed_buf, 1, size_to_write, stream); - { - size_t written = do_fwrite(compressed_buf, 1, size_to_write, stream); - if (written!=size_to_write) { - if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ... - r = get_maybe_error_errno(); // ... then there is no error in the stream, but there is one in errno - else - r = ferror(stream); - invariant(r!=0); - goto exit; - } - } - r = 0; -exit: if (compressed_buf) { toku_free(compressed_buf); } return r; } -static int bl_compressed_write(void *ptr, size_t nbytes, FILE *stream, struct wbuf *wb) { +static int bl_compressed_write(void *ptr, + size_t nbytes, + TOKU_FILE *stream, + struct wbuf *wb) { invariant(wb->size <= MAX_UNCOMPRESSED_BUF); size_t bytes_left = nbytes; - char *buf = (char*)ptr; + char *buf = (char *)ptr; while (bytes_left > 0) { size_t bytes_to_copy = bytes_left; @@ -767,29 +762,28 @@ static int bl_compressed_write(void *ptr, size_t nbytes, FILE *stream, struct wb return 0; } -static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, struct wbuf *wb, FTLOADER bl) -/* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise returns an error number. +static int bl_fwrite(void *ptr, + size_t size, + size_t nmemb, + TOKU_FILE *stream, + struct wbuf *wb, + FTLOADER bl) +/* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise + * returns an error number. * Arguments: * ptr the data to be writen. * size the amount of data to be written. * nmemb the number of units of size to be written. * stream write the data here. - * wb where to write uncompressed data (if we're compressing) or ignore if NULL - * bl passed so we can panic the ft_loader if something goes wrong (recording the error number). + * wb where to write uncompressed data (if we're compressing) or ignore if + * NULL + * bl passed so we can panic the ft_loader if something goes wrong + * (recording the error number). * Return value: 0 on success, an error number otherwise. */ { if (!bl->compress_intermediates || !wb) { - size_t r = do_fwrite(ptr, size, nmemb, stream); - if (r!=nmemb) { - int e; - if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ... - e = get_maybe_error_errno(); // ... then there is no error in the stream, but there is one in errno - else - e = ferror(stream); - invariant(e!=0); - return e; - } + return toku_os_fwrite(ptr, size, nmemb, stream); } else { size_t num_bytes = size * nmemb; int r = bl_compressed_write(ptr, num_bytes, stream, wb); @@ -800,8 +794,9 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, struct return 0; } -static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream) -/* Effect: this is a wrapper for fread that returns 0 on success, otherwise returns an error number. +static int bl_fread(void *ptr, size_t size, size_t nmemb, TOKU_FILE *stream) +/* Effect: this is a wrapper for fread that returns 0 on success, otherwise + * returns an error number. * Arguments: * ptr read data into here. * size size of data element to be read. @@ -810,24 +805,14 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream) * Return value: 0 on success, an error number otherwise. */ { - size_t r = fread(ptr, size, nmemb, stream); - if (r==0) { - if (feof(stream)) return EOF; - else { - do_error: ; - int e = ferror(stream); - // r == 0 && !feof && e == 0, how does this happen? invariant(e!=0); - return e; - } - } else if (r<nmemb) { - goto do_error; - } else { - return 0; - } + return toku_os_fread(ptr, size, nmemb, stream); } -static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl) -{ +static int bl_write_dbt(DBT *dbt, + TOKU_FILE *datafile, + uint64_t *dataoff, + struct wbuf *wb, + FTLOADER bl) { int r; int dlen = dbt->size; if ((r=bl_fwrite(&dlen, sizeof(dlen), 1, datafile, wb, bl))) return r; @@ -837,8 +822,7 @@ static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, struct wbu return 0; } -static int bl_read_dbt (/*in*/DBT *dbt, FILE *stream) -{ +static int bl_read_dbt(/*in*/ DBT *dbt, TOKU_FILE *stream) { int len; { int r; @@ -892,13 +876,20 @@ static int bl_read_dbt_from_dbufio (/*in*/DBT *dbt, DBUFIO_FILESET bfs, int file return result; } - -int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl) -/* Effect: Given a key and a val (both DBTs), write them to a file. Increment *dataoff so that it's up to date. +int loader_write_row(DBT *key, + DBT *val, + FIDX data, + TOKU_FILE *dataf, + uint64_t *dataoff, + struct wbuf *wb, + FTLOADER bl) +/* Effect: Given a key and a val (both DBTs), write them to a file. Increment + * *dataoff so that it's up to date. * Arguments: * key, val write these. * data the file to write them to - * dataoff a pointer to a counter that keeps track of the amount of data written so far. + * dataoff a pointer to a counter that keeps track of the amount of data + * written so far. * wb a pointer (possibly NULL) to buffer uncompressed output * bl the ft_loader (passed so we can panic if needed). * Return value: 0 on success, an error number otherwise. @@ -916,8 +907,9 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *datao return 0; } -int loader_read_row (FILE *f, DBT *key, DBT *val) -/* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC set. +int loader_read_row(TOKU_FILE *f, DBT *key, DBT *val) +/* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC + * set. * Arguments: * f where to read it from. * key, val read it into these. @@ -1087,7 +1079,7 @@ static void* extractor_thread (void *blv) { FTLOADER bl = (FTLOADER)blv; int r = 0; while (1) { - void *item; + void *item = nullptr; { int rq = toku_queue_deq(bl->primary_rowset_queue, &item, NULL, NULL); if (rq==EOF) break; @@ -1108,14 +1100,14 @@ static void* extractor_thread (void *blv) { //printf("%s:%d extractor finishing\n", __FILE__, __LINE__); if (r == 0) { r = finish_primary_rows(bl); - if (r) + if (r) ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr); - } - return NULL; + toku_instr_delete_current_thread(); + return nullptr; } -static void enqueue_for_extraction (FTLOADER bl) { +static void enqueue_for_extraction(FTLOADER bl) { //printf("%s:%d enqueing %ld items\n", __FILE__, __LINE__, bl->primary_rowset.n_rows); struct rowset *XMALLOC(enqueue_me); *enqueue_me = bl->primary_rowset; @@ -1626,11 +1618,12 @@ static int write_rowset_to_file (FTLOADER bl, FIDX sfile, const struct rowset ro struct wbuf wb; wbuf_init(&wb, uncompressed_buffer, MAX_UNCOMPRESSED_BUF); - FILE *sstream = toku_bl_fidx2file(bl, sfile); - for (size_t i=0; i<rows.n_rows; i++) { - DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen); - DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen); - + TOKU_FILE *sstream = toku_bl_fidx2file(bl, sfile); + for (size_t i = 0; i < rows.n_rows; i++) { + DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen); + DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, + rows.rows[i].vlen); + uint64_t soffset=0; // don't really need this. r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, &wb, bl); if (r != 0) { @@ -1727,14 +1720,30 @@ int ft_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare); } -int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare, int progress_allocation) -/* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to an output. All the files remain open after the merge. - * This merge is performed in one pass, so don't pass too many files in. If you need a tree of merges do it elsewhere. - * If TO_Q is true then we write rowsets into queue Q. Otherwise we write into dest_data. - * Modifies: May modify the arrays of files (but if modified, it must be a permutation so the caller can use that array to close everything.) - * Requires: The number of sources is at least one, and each of the input files must have at least one row in it. +int toku_merge_some_files_using_dbufio(const bool to_q, + FIDX dest_data, + QUEUE q, + int n_sources, + DBUFIO_FILESET bfs, + FIDX srcs_fidxs[/*n_sources*/], + FTLOADER bl, + int which_db, + DB *dest_db, + ft_compare_func compare, + int progress_allocation) +/* Effect: Given an array of FILE*'s each containing sorted, merge the data and + * write it to an output. All the files remain open after the merge. + * This merge is performed in one pass, so don't pass too many files in. If + * you need a tree of merges do it elsewhere. + * If TO_Q is true then we write rowsets into queue Q. Otherwise we write + * into dest_data. + * Modifies: May modify the arrays of files (but if modified, it must be a + * permutation so the caller can use that array to close everything.) + * Requires: The number of sources is at least one, and each of the input files + * must have at least one row in it. * Arguments: - * to_q boolean indicating that output is queue (true) or a file (false) + * to_q boolean indicating that output is queue (true) or a file + * (false) * dest_data where to write the sorted data * q where to write the sorted data * n_sources how many source files. @@ -1747,9 +1756,10 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q { int result = 0; - FILE *dest_stream = to_q ? NULL : toku_bl_fidx2file(bl, dest_data); + TOKU_FILE *dest_stream = to_q ? nullptr : toku_bl_fidx2file(bl, dest_data); - //printf(" merge_some_files progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation); + // printf(" merge_some_files progress=%d fin at %d\n", bl->progress, + // bl->progress+progress_allocation); DBT keys[n_sources]; DBT vals[n_sources]; uint64_t dataoff[n_sources]; @@ -1943,12 +1953,18 @@ static int merge_some_files (const bool to_q, FIDX dest_data, QUEUE q, int n_sou int result = 0; DBUFIO_FILESET bfs = NULL; int *MALLOC_N(n_sources, fds); - if (fds==NULL) result=get_error_errno(); - if (result==0) { - for (int i=0; i<n_sources; i++) { - int r = fileno(toku_bl_fidx2file(bl, srcs_fidxs[i])); // we rely on the fact that when the files are closed, the fd is also closed. - if (r==-1) { - result=get_error_errno(); + if (fds == NULL) + result = get_error_errno(); + if (result == 0) { + for (int i = 0; i < n_sources; i++) { + int r = fileno( + toku_bl_fidx2file(bl, srcs_fidxs[i])->file); // we rely on the + // fact that when + // the files are + // closed, the fd + // is also closed. + if (r == -1) { + result = get_error_errno(); break; } fds[i] = r; @@ -2178,7 +2194,7 @@ static inline void dbout_init(struct dbout *out, FT ft) { out->current_off = 0; out->n_translations = out->n_translations_limit = 0; out->translation = NULL; - toku_mutex_init(&out->mutex, NULL); + toku_mutex_init(*loader_out_mutex_key, &out->mutex, nullptr); out->ft = ft; } @@ -2418,7 +2434,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, assert_zero(r); return result; } - FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file); + TOKU_FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file); TXNID root_xid_that_created = TXNID_NONE; if (bl->root_xids_that_created) @@ -2705,21 +2721,35 @@ int toku_loader_write_ft_from_q_in_C (FTLOADER bl, static void* fractal_thread (void *ftav) { struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav; - int r = toku_loader_write_ft_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize, fta->target_basementnodesize, fta->target_compression_method, fta->target_fanout); + int r = toku_loader_write_ft_from_q(fta->bl, + fta->descriptor, + fta->fd, + fta->progress_allocation, + fta->q, + fta->total_disksize_estimate, + fta->which_db, + fta->target_nodesize, + fta->target_basementnodesize, + fta->target_compression_method, + fta->target_fanout); fta->errno_result = r; - return NULL; -} - -static int loader_do_i (FTLOADER bl, - int which_db, - DB *dest_db, - ft_compare_func compare, - const DESCRIPTOR descriptor, - const char *new_fname, - int progress_allocation // how much progress do I need to add into bl->progress by the end.. - ) + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); +} + +static int loader_do_i(FTLOADER bl, + int which_db, + DB *dest_db, + ft_compare_func compare, + const DESCRIPTOR descriptor, + const char *new_fname, + int progress_allocation // how much progress do I need + // to add into bl->progress by + // the end.. + ) /* Effect: Handle the file creating for one particular DB in the bulk loader. */ -/* Requires: The data is fully extracted, so we can do merges out of files and write the ft file. */ +/* Requires: The data is fully extracted, so we can do merges out of files and + write the ft file. */ { //printf("doing i use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation); struct merge_fileset *fs = &(bl->fs[which_db]); @@ -2730,10 +2760,14 @@ static int loader_do_i (FTLOADER bl, if (r) goto error; { - mode_t mode = S_IRUSR+S_IWUSR + S_IRGRP+S_IWGRP; - int fd = toku_os_open(new_fname, O_RDWR| O_CREAT | O_BINARY, mode); // #2621 + mode_t mode = S_IRUSR + S_IWUSR + S_IRGRP + S_IWGRP; + int fd = toku_os_open(new_fname, + O_RDWR | O_CREAT | O_BINARY, + mode, + *tokudb_file_load_key); // #2621 if (fd < 0) { - r = get_error_errno(); goto error; + r = get_error_errno(); + goto error; } uint32_t target_nodesize, target_basementnodesize, target_fanout; @@ -2753,24 +2787,27 @@ static int loader_do_i (FTLOADER bl, progress_allocation -= allocation_for_merge; // This structure must stay live until the join below. - struct fractal_thread_args fta = { - bl, - descriptor, - fd, - progress_allocation, - bl->fractal_queues[which_db], - bl->extracted_datasizes[which_db], - 0, - which_db, - target_nodesize, - target_basementnodesize, - target_compression_method, - target_fanout - }; - - r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta); + struct fractal_thread_args fta = {bl, + descriptor, + fd, + progress_allocation, + bl->fractal_queues[which_db], + bl->extracted_datasizes[which_db], + 0, + which_db, + target_nodesize, + target_basementnodesize, + target_compression_method, + target_fanout}; + + r = toku_pthread_create(*fractal_thread_key, + bl->fractal_threads + which_db, + nullptr, + fractal_thread, + static_cast<void *>(&fta)); if (r) { - int r2 __attribute__((__unused__)) = toku_queue_destroy(bl->fractal_queues[which_db]); + int r2 __attribute__((__unused__)) = + toku_queue_destroy(bl->fractal_queues[which_db]); // ignore r2, since we already have an error bl->fractal_queues[which_db] = nullptr; goto error; @@ -3107,7 +3144,7 @@ static int read_some_pivots (FIDX pivots_file, int n_to_read, FTLOADER bl, for (int i = 0; i < n_to_read; i++) pivots[i] = zero_dbt; - FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file); + TOKU_FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file); int result = 0; for (int i = 0; i < n_to_read; i++) { @@ -3159,8 +3196,9 @@ static int setup_nonleaf_block (int n_children, } if (result == 0) { - FILE *next_pivots_stream = toku_bl_fidx2file(bl, next_pivots_file); - int r = bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, nullptr, bl); + TOKU_FILE *next_pivots_stream = toku_bl_fidx2file(bl, next_pivots_file); + int r = bl_write_dbt( + &pivots[n_children - 1], next_pivots_stream, NULL, nullptr, bl); if (r) result = r; } @@ -3275,8 +3313,11 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st // 2) We put the 15 pivots and 16 blocks into an non-leaf node. // 3) We put the 16th pivot into the next pivots file. { - int r = fseek(toku_bl_fidx2file(bl, pivots_fidx), 0, SEEK_SET); - if (r!=0) { return get_error_errno(); } + int r = + fseek(toku_bl_fidx2file(bl, pivots_fidx)->file, 0, SEEK_SET); + if (r != 0) { + return get_error_errno(); + } } FIDX next_pivots_file; @@ -3296,7 +3337,7 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st while (sts->n_subtrees - n_subtrees_used >= n_per_block*2) { // grab the first N_PER_BLOCK and build a node. DBT *pivots; - int64_t blocknum_of_new_node; + int64_t blocknum_of_new_node = 0; struct subtree_info *subtree_info; int r = setup_nonleaf_block (n_per_block, sts, pivots_fidx, n_subtrees_used, diff --git a/storage/tokudb/PerconaFT/ft/loader/loader.h b/storage/tokudb/PerconaFT/ft/loader/loader.h index 9c1bdab1ee2..cea2e8dfda2 100644 --- a/storage/tokudb/PerconaFT/ft/loader/loader.h +++ b/storage/tokudb/PerconaFT/ft/loader/loader.h @@ -38,6 +38,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #pragma once +#include "toku_portability.h" #include "ft/txn/txn.h" #include "ft/cachetable/cachetable.h" #include "ft/comparator.h" @@ -77,8 +78,6 @@ int toku_ft_loader_abort(FTLOADER bl, bool is_error); // For test purposes only -void toku_ft_loader_set_size_factor (uint32_t factor); - -void ft_loader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)); +void toku_ft_loader_set_size_factor(uint32_t factor); size_t ft_loader_leafentry_size(size_t key_size, size_t val_size, TXNID xid); |