summaryrefslogtreecommitdiff
path: root/storage/tokudb/ft-index/ft/ftloader.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/ft-index/ft/ftloader.cc')
-rw-r--r--storage/tokudb/ft-index/ft/ftloader.cc165
1 files changed, 93 insertions, 72 deletions
diff --git a/storage/tokudb/ft-index/ft/ftloader.cc b/storage/tokudb/ft-index/ft/ftloader.cc
index 2df6d0a1cda..67b3cf9905e 100644
--- a/storage/tokudb/ft-index/ft/ftloader.cc
+++ b/storage/tokudb/ft-index/ft/ftloader.cc
@@ -356,6 +356,8 @@ int ft_loader_open_temp_file (FTLOADER bl, FIDX *file_idx)
*/
{
int result = 0;
+ if (result) // debug hack
+ return result;
FILE *f = NULL;
int fd = -1;
char *fname = toku_strdup(bl->temp_file_template);
@@ -420,6 +422,10 @@ void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error) {
}
destroy_rowset(&bl->primary_rowset);
+ if (bl->primary_rowset_queue) {
+ queue_destroy(bl->primary_rowset_queue);
+ bl->primary_rowset_queue = nullptr;
+ }
for (int i=0; i<bl->N; i++) {
if ( bl->fractal_queues ) {
@@ -543,7 +549,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
TOKUTXN txn,
bool reserve_memory,
uint64_t reserve_memory_size,
- bool compress_intermediates)
+ bool compress_intermediates,
+ bool allow_puts)
// Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread.
{
FTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC)
@@ -560,10 +567,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB.
}
bl->compress_intermediates = compress_intermediates;
- if (0) { // debug
- fprintf(stderr, "%s Reserved memory=%" PRId64 "\n", __FUNCTION__, bl->reserved_memory);
- }
-
+ bl->allow_puts = allow_puts;
bl->src_db = src_db;
bl->N = N;
bl->load_lsn = load_lsn;
@@ -628,7 +632,6 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
{ int r = queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH);
if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
}
- //printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__);
{
ft_loader_lock_init(bl);
}
@@ -650,34 +653,38 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
TOKUTXN txn,
bool reserve_memory,
uint64_t reserve_memory_size,
- bool compress_intermediates)
-/* Effect: called by DB_ENV->create_loader to create an ft loader.
- * Arguments:
- * blp Return the ft loader here.
- * g The function for generating a row
- * src_db The source database. Needed by g. May be NULL if that's ok with g.
- * N The number of dbs to create.
- * dbs An array of open databases. Used by g. The data will be put in these database.
- * new_fnames The file names (these strings are owned by the caller: we make a copy for our own purposes).
- * temp_file_template A template suitable for mkstemp()
- * Return value: 0 on success, an error number otherwise.
- */
-{
+ bool compress_intermediates,
+ bool allow_puts) {
+// Effect: called by DB_ENV->create_loader to create a brt loader.
+// Arguments:
+// blp Return the brt loader here.
+// g The function for generating a row
+// src_db The source database. Needed by g. May be NULL if that's ok with g.
+// N The number of dbs to create.
+// dbs An array of open databases. Used by g. The data will be put in these database.
+// new_fnames The file names (these strings are owned by the caller: we make a copy for our own purposes).
+// temp_file_template A template suitable for mkstemp()
+// reserve_memory Cause the loader to reserve memory for its use from the cache table.
+// compress_intermediates Cause the loader to compress intermediate loader files.
+// allow_puts Prepare the loader for rows to insert. When puts are disabled, the loader does not run the
+// extractor or the fractal tree writer threads.
+// Return value: 0 on success, an error number otherwise.
int result = 0;
{
int r = toku_ft_loader_internal_init(blp, cachetable, g, src_db,
- N, fts, dbs,
- new_fnames_in_env,
- bt_compare_functions,
- temp_file_template,
- load_lsn,
- txn,
- reserve_memory,
- reserve_memory_size,
- compress_intermediates);
+ N, fts, dbs,
+ new_fnames_in_env,
+ bt_compare_functions,
+ temp_file_template,
+ load_lsn,
+ txn,
+ reserve_memory,
+ reserve_memory_size,
+ compress_intermediates,
+ allow_puts);
if (r!=0) result = r;
}
- if (result==0) {
+ if (result==0 && allow_puts) {
FTLOADER bl = *blp;
int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl);
if (r==0) {
@@ -1213,6 +1220,7 @@ finish_extractor (FTLOADER bl) {
{
int r = queue_destroy(bl->primary_rowset_queue);
invariant(r==0);
+ bl->primary_rowset_queue = nullptr;
}
rval = ft_loader_fi_close_all(&bl->file_infos);
@@ -1374,10 +1382,9 @@ int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val)
* Return value: 0 on success, an error number otherwise.
*/
{
- if (ft_loader_get_error(&bl->error_callback))
+ if (!bl->allow_puts || ft_loader_get_error(&bl->error_callback))
return EINVAL; // previous panic
bl->n_rows++;
-// return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl);
return loader_do_put(bl, key, val);
}
@@ -2425,6 +2432,8 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
if (r) {
result = r;
drain_writer_q(q);
+ r = toku_os_close(fd);
+ assert_zero(r);
return result;
}
FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
@@ -2714,12 +2723,7 @@ static int loader_do_i (FTLOADER bl,
struct rowset *rows = &(bl->rows[which_db]);
invariant(rows->data==NULL); // the rows should be all cleaned up already
- // a better allocation would be to figure out roughly how many merge passes we'll need.
- int allocation_for_merge = (2*progress_allocation)/3;
- progress_allocation -= allocation_for_merge;
-
- int r;
- r = queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH);
+ int r = queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH);
if (r) goto error;
{
@@ -2740,49 +2744,62 @@ static int loader_do_i (FTLOADER bl,
r = dest_db->get_fanout(dest_db, &target_fanout);
invariant_zero(r);
- // This structure must stay live until the join below.
- struct fractal_thread_args fta = { bl,
- descriptor,
- fd,
- progress_allocation,
- bl->fractal_queues[which_db],
- bl->extracted_datasizes[which_db],
- 0,
- which_db,
- target_nodesize,
- target_basementnodesize,
- target_compression_method,
- target_fanout
- };
-
- r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
- if (r) {
- int r2 __attribute__((__unused__)) = queue_destroy(bl->fractal_queues[which_db]);
- // ignore r2, since we already have an error
- goto error;
- }
- invariant(bl->fractal_threads_live[which_db]==false);
- bl->fractal_threads_live[which_db] = true;
+ if (bl->allow_puts) {
+ // a better allocation would be to figure out roughly how many merge passes we'll need.
+ int allocation_for_merge = (2*progress_allocation)/3;
+ progress_allocation -= allocation_for_merge;
+
+ // This structure must stay live until the join below.
+ struct fractal_thread_args fta = {
+ bl,
+ descriptor,
+ fd,
+ progress_allocation,
+ bl->fractal_queues[which_db],
+ bl->extracted_datasizes[which_db],
+ 0,
+ which_db,
+ target_nodesize,
+ target_basementnodesize,
+ target_compression_method,
+ target_fanout
+ };
+
+ r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
+ if (r) {
+ int r2 __attribute__((__unused__)) = queue_destroy(bl->fractal_queues[which_db]);
+ // ignore r2, since we already have an error
+ bl->fractal_queues[which_db] = nullptr;
+ goto error;
+ }
+ invariant(bl->fractal_threads_live[which_db]==false);
+ bl->fractal_threads_live[which_db] = true;
- r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);
+ r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);
- {
- void *toku_pthread_retval;
- int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
- invariant(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here. A previous bug but that struct into a C block statement.
- resource_assert_zero(r2);
- invariant(toku_pthread_retval==NULL);
- invariant(bl->fractal_threads_live[which_db]);
- bl->fractal_threads_live[which_db] = false;
- if (r == 0) r = fta.errno_result;
+ {
+ void *toku_pthread_retval;
+ int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
+ invariant(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here. A previous bug put that struct into a C block statement.
+ resource_assert_zero(r2);
+ invariant(toku_pthread_retval==NULL);
+ invariant(bl->fractal_threads_live[which_db]);
+ bl->fractal_threads_live[which_db] = false;
+ if (r == 0) r = fta.errno_result;
+ }
+ } else {
+ queue_eof(bl->fractal_queues[which_db]);
+ r = toku_loader_write_ft_from_q(bl, descriptor, fd, progress_allocation,
+ bl->fractal_queues[which_db], bl->extracted_datasizes[which_db], which_db,
+ target_nodesize, target_basementnodesize, target_compression_method, target_fanout);
}
}
error: // this is the cleanup code. Even if r==0 (no error) we fall through to here.
- {
+ if (bl->fractal_queues[which_db]) {
int r2 = queue_destroy(bl->fractal_queues[which_db]);
invariant(r2==0);
- bl->fractal_queues[which_db]=NULL;
+ bl->fractal_queues[which_db] = nullptr;
}
// if we get here we need to free up the merge_fileset and the rowset, as well as the keys
@@ -2851,6 +2868,10 @@ int toku_ft_loader_close (FTLOADER bl,
if (r)
result = r;
invariant(!bl->extractor_live);
+ } else {
+ r = finish_primary_rows(bl);
+ if (r)
+ result = r;
}
// check for an error during extraction