diff options
Diffstat (limited to 'storage/tokudb/ft-index/ft/ftloader.cc')
-rw-r--r-- | storage/tokudb/ft-index/ft/ftloader.cc | 165 |
1 files changed, 93 insertions, 72 deletions
diff --git a/storage/tokudb/ft-index/ft/ftloader.cc b/storage/tokudb/ft-index/ft/ftloader.cc index 2df6d0a1cda..67b3cf9905e 100644 --- a/storage/tokudb/ft-index/ft/ftloader.cc +++ b/storage/tokudb/ft-index/ft/ftloader.cc @@ -356,6 +356,8 @@ int ft_loader_open_temp_file (FTLOADER bl, FIDX *file_idx) */ { int result = 0; + if (result) // debug hack + return result; FILE *f = NULL; int fd = -1; char *fname = toku_strdup(bl->temp_file_template); @@ -420,6 +422,10 @@ void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error) { } destroy_rowset(&bl->primary_rowset); + if (bl->primary_rowset_queue) { + queue_destroy(bl->primary_rowset_queue); + bl->primary_rowset_queue = nullptr; + } for (int i=0; i<bl->N; i++) { if ( bl->fractal_queues ) { @@ -543,7 +549,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, TOKUTXN txn, bool reserve_memory, uint64_t reserve_memory_size, - bool compress_intermediates) + bool compress_intermediates, + bool allow_puts) // Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread. { FTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC) @@ -560,10 +567,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB. } bl->compress_intermediates = compress_intermediates; - if (0) { // debug - fprintf(stderr, "%s Reserved memory=%" PRId64 "\n", __FUNCTION__, bl->reserved_memory); - } - + bl->allow_puts = allow_puts; bl->src_db = src_db; bl->N = N; bl->load_lsn = load_lsn; @@ -628,7 +632,6 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, { int r = queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH); if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; } } - //printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__); { ft_loader_lock_init(bl); } @@ -650,34 +653,38 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp, TOKUTXN txn, bool reserve_memory, uint64_t reserve_memory_size, - bool compress_intermediates) -/* Effect: called by DB_ENV->create_loader to create an ft loader. - * Arguments: - * blp Return the ft loader here. - * g The function for generating a row - * src_db The source database. Needed by g. May be NULL if that's ok with g. - * N The number of dbs to create. - * dbs An array of open databases. Used by g. The data will be put in these database. - * new_fnames The file names (these strings are owned by the caller: we make a copy for our own purposes). - * temp_file_template A template suitable for mkstemp() - * Return value: 0 on success, an error number otherwise. - */ -{ + bool compress_intermediates, + bool allow_puts) { +// Effect: called by DB_ENV->create_loader to create a brt loader. +// Arguments: +// blp Return the brt loader here. +// g The function for generating a row +// src_db The source database. Needed by g. May be NULL if that's ok with g. +// N The number of dbs to create. +// dbs An array of open databases. Used by g. The data will be put in these database. +// new_fnames The file names (these strings are owned by the caller: we make a copy for our own purposes). +// temp_file_template A template suitable for mkstemp() +// reserve_memory Cause the loader to reserve memory for its use from the cache table. +// compress_intermediates Cause the loader to compress intermediate loader files. +// allow_puts Prepare the loader for rows to insert. When puts are disabled, the loader does not run the +// extractor or the fractal tree writer threads. +// Return value: 0 on success, an error number otherwise. int result = 0; { int r = toku_ft_loader_internal_init(blp, cachetable, g, src_db, - N, fts, dbs, - new_fnames_in_env, - bt_compare_functions, - temp_file_template, - load_lsn, - txn, - reserve_memory, - reserve_memory_size, - compress_intermediates); + N, fts, dbs, + new_fnames_in_env, + bt_compare_functions, + temp_file_template, + load_lsn, + txn, + reserve_memory, + reserve_memory_size, + compress_intermediates, + allow_puts); if (r!=0) result = r; } - if (result==0) { + if (result==0 && allow_puts) { FTLOADER bl = *blp; int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); if (r==0) { @@ -1213,6 +1220,7 @@ finish_extractor (FTLOADER bl) { { int r = queue_destroy(bl->primary_rowset_queue); invariant(r==0); + bl->primary_rowset_queue = nullptr; } rval = ft_loader_fi_close_all(&bl->file_infos); @@ -1374,10 +1382,9 @@ int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val) * Return value: 0 on success, an error number otherwise. */ { - if (ft_loader_get_error(&bl->error_callback)) + if (!bl->allow_puts || ft_loader_get_error(&bl->error_callback)) return EINVAL; // previous panic bl->n_rows++; -// return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl); return loader_do_put(bl, key, val); } @@ -2425,6 +2432,8 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, if (r) { result = r; drain_writer_q(q); + r = toku_os_close(fd); + assert_zero(r); return result; } FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file); @@ -2714,12 +2723,7 @@ static int loader_do_i (FTLOADER bl, struct rowset *rows = &(bl->rows[which_db]); invariant(rows->data==NULL); // the rows should be all cleaned up already - // a better allocation would be to figure out roughly how many merge passes we'll need. - int allocation_for_merge = (2*progress_allocation)/3; - progress_allocation -= allocation_for_merge; - - int r; - r = queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH); + int r = queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH); if (r) goto error; { @@ -2740,49 +2744,62 @@ static int loader_do_i (FTLOADER bl, r = dest_db->get_fanout(dest_db, &target_fanout); invariant_zero(r); - // This structure must stay live until the join below. - struct fractal_thread_args fta = { bl, - descriptor, - fd, - progress_allocation, - bl->fractal_queues[which_db], - bl->extracted_datasizes[which_db], - 0, - which_db, - target_nodesize, - target_basementnodesize, - target_compression_method, - target_fanout - }; - - r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta); - if (r) { - int r2 __attribute__((__unused__)) = queue_destroy(bl->fractal_queues[which_db]); - // ignore r2, since we already have an error - goto error; - } - invariant(bl->fractal_threads_live[which_db]==false); - bl->fractal_threads_live[which_db] = true; + if (bl->allow_puts) { + // a better allocation would be to figure out roughly how many merge passes we'll need. + int allocation_for_merge = (2*progress_allocation)/3; + progress_allocation -= allocation_for_merge; + + // This structure must stay live until the join below. + struct fractal_thread_args fta = { + bl, + descriptor, + fd, + progress_allocation, + bl->fractal_queues[which_db], + bl->extracted_datasizes[which_db], + 0, + which_db, + target_nodesize, + target_basementnodesize, + target_compression_method, + target_fanout + }; + + r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta); + if (r) { + int r2 __attribute__((__unused__)) = queue_destroy(bl->fractal_queues[which_db]); + // ignore r2, since we already have an error + bl->fractal_queues[which_db] = nullptr; + goto error; + } + invariant(bl->fractal_threads_live[which_db]==false); + bl->fractal_threads_live[which_db] = true; - r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]); + r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]); - { - void *toku_pthread_retval; - int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval); - invariant(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here. A previous bug but that struct into a C block statement. - resource_assert_zero(r2); - invariant(toku_pthread_retval==NULL); - invariant(bl->fractal_threads_live[which_db]); - bl->fractal_threads_live[which_db] = false; - if (r == 0) r = fta.errno_result; + { + void *toku_pthread_retval; + int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval); + invariant(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here. A previous bug put that struct into a C block statement. + resource_assert_zero(r2); + invariant(toku_pthread_retval==NULL); + invariant(bl->fractal_threads_live[which_db]); + bl->fractal_threads_live[which_db] = false; + if (r == 0) r = fta.errno_result; + } + } else { + queue_eof(bl->fractal_queues[which_db]); + r = toku_loader_write_ft_from_q(bl, descriptor, fd, progress_allocation, + bl->fractal_queues[which_db], bl->extracted_datasizes[which_db], which_db, + target_nodesize, target_basementnodesize, target_compression_method, target_fanout); } } error: // this is the cleanup code. Even if r==0 (no error) we fall through to here. - { + if (bl->fractal_queues[which_db]) { int r2 = queue_destroy(bl->fractal_queues[which_db]); invariant(r2==0); - bl->fractal_queues[which_db]=NULL; + bl->fractal_queues[which_db] = nullptr; } // if we get here we need to free up the merge_fileset and the rowset, as well as the keys @@ -2851,6 +2868,10 @@ int toku_ft_loader_close (FTLOADER bl, if (r) result = r; invariant(!bl->extractor_live); + } else { + r = finish_primary_rows(bl); + if (r) + result = r; } // check for an error during extraction |