/*- * Copyright (c) 2014-2015 MongoDB, Inc. * Copyright (c) 2008-2014 WiredTiger, Inc. * All rights reserved. * * See the file LICENSE for redistribution information. */ #include "wt_internal.h" static int __lsm_merge_span( WT_SESSION_IMPL *, WT_LSM_TREE *, u_int , u_int *, u_int *, uint64_t *); /* * __wt_lsm_merge_update_tree -- * Merge a set of chunks and populate a new one. * Must be called with the LSM lock held. */ int __wt_lsm_merge_update_tree(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, u_int start_chunk, u_int nchunks, WT_LSM_CHUNK *chunk) { size_t chunks_after_merge; WT_RET(__wt_lsm_tree_retire_chunks( session, lsm_tree, start_chunk, nchunks)); /* Update the current chunk list. */ chunks_after_merge = lsm_tree->nchunks - (nchunks + start_chunk); memmove(lsm_tree->chunk + start_chunk + 1, lsm_tree->chunk + start_chunk + nchunks, chunks_after_merge * sizeof(*lsm_tree->chunk)); lsm_tree->nchunks -= nchunks - 1; memset(lsm_tree->chunk + lsm_tree->nchunks, 0, (nchunks - 1) * sizeof(*lsm_tree->chunk)); lsm_tree->chunk[start_chunk] = chunk; return (0); } /* * __lsm_merge_span -- * Figure out the best span of chunks to merge. Return an error if * there is no need to do any merges. Called with the LSM tree * locked. */ static int __lsm_merge_span(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, u_int id, u_int *start, u_int *end, uint64_t *records) { WT_LSM_CHUNK *chunk, *previous, *youngest; uint32_t aggressive, max_gap, max_gen, max_level; uint64_t record_count, chunk_size; u_int end_chunk, i, merge_max, merge_min, nchunks, start_chunk; chunk_size = 0; nchunks = 0; record_count = 0; chunk = youngest = NULL; /* Clear the return parameters */ *start = 0; *end = 0; *records = 0; /* * If the tree is open read-only or we are compacting, be very * aggressive. Otherwise, we can spend a long time waiting for merges * to start in read-only applications. */ if (!lsm_tree->modified || F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING)) lsm_tree->merge_aggressiveness = 10; aggressive = lsm_tree->merge_aggressiveness; merge_max = (aggressive > WT_LSM_AGGRESSIVE_THRESHOLD) ? 100 : lsm_tree->merge_min; merge_min = (aggressive > WT_LSM_AGGRESSIVE_THRESHOLD) ? 2 : lsm_tree->merge_min; max_gap = (aggressive + 4) / 5; max_level = (lsm_tree->merge_throttle > 0) ? 0 : id + aggressive; /* * If there aren't any chunks to merge, or some of the chunks aren't * yet written, we're done. A non-zero error indicates that the worker * should assume there is no work to do: if there are unwritten chunks, * the worker should write them immediately. */ if (lsm_tree->nchunks < merge_min) return (WT_NOTFOUND); /* * Only include chunks that already have a Bloom filter or are the * result of a merge and not involved in a merge. */ for (end_chunk = lsm_tree->nchunks - 1; end_chunk > 0; --end_chunk) { chunk = lsm_tree->chunk[end_chunk]; WT_ASSERT(session, chunk != NULL); if (F_ISSET(chunk, WT_LSM_CHUNK_MERGING)) continue; if (F_ISSET(chunk, WT_LSM_CHUNK_BLOOM) || chunk->generation > 0) break; else if (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_OFF) && F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)) break; } /* * Give up immediately if there aren't enough on disk chunks in the * tree for a merge. */ if (end_chunk < merge_min - 1) return (WT_NOTFOUND); /* * Look for the most efficient merge we can do. We define efficiency * as collapsing as many levels as possible while processing the * smallest number of rows. * * We make a distinction between "major" and "minor" merges. The * difference is whether the oldest chunk is involved: if it is, we can * discard tombstones, because there can be no older record to marked * deleted. * * Respect the configured limit on the number of chunks to merge: start * with the most recent set of chunks and work backwards until going * further becomes significantly less efficient. */ for (start_chunk = end_chunk + 1, record_count = 0; start_chunk > 0; ) { chunk = lsm_tree->chunk[start_chunk - 1]; youngest = lsm_tree->chunk[end_chunk]; nchunks = (end_chunk + 1) - start_chunk; /* * If the chunk is already involved in a merge or a Bloom * filter is being built for it, stop. */ if (F_ISSET(chunk, WT_LSM_CHUNK_MERGING) || chunk->bloom_busy) break; /* * Look for small merges before trying a big one: some threads * should stay in low levels until we get more aggressive. */ if (chunk->generation > max_level) break; /* * If the size of the chunks selected so far exceeds the * configured maximum chunk size, stop. Keep going if we can * slide the window further into the tree: we don't want to * leave small chunks in the middle. */ if ((chunk_size += chunk->size) > lsm_tree->chunk_max) if (nchunks < merge_min || (chunk->generation > youngest->generation && chunk_size - youngest->size > lsm_tree->chunk_max)) break; /* * If we have enough chunks for a merge and the next chunk is * in too high a generation, stop. */ if (nchunks >= merge_min) { previous = lsm_tree->chunk[start_chunk]; max_gen = youngest->generation + max_gap; if (previous->generation <= max_gen && chunk->generation > max_gen) break; } F_SET(chunk, WT_LSM_CHUNK_MERGING); record_count += chunk->count; --start_chunk; /* * If we have a full window, or the merge would be too big, * remove the youngest chunk. */ if (nchunks == merge_max || chunk_size > lsm_tree->chunk_max) { WT_ASSERT(session, F_ISSET(youngest, WT_LSM_CHUNK_MERGING)); F_CLR(youngest, WT_LSM_CHUNK_MERGING); record_count -= youngest->count; chunk_size -= youngest->size; --end_chunk; } } nchunks = (end_chunk + 1) - start_chunk; /* Be paranoid, check that we setup the merge properly. */ WT_ASSERT(session, start_chunk + nchunks <= lsm_tree->nchunks); #ifdef HAVE_DIAGNOSTIC for (i = 0; i < nchunks; i++) { chunk = lsm_tree->chunk[start_chunk + i]; WT_ASSERT(session, F_ISSET(chunk, WT_LSM_CHUNK_MERGING)); } #endif WT_ASSERT(session, nchunks == 0 || (chunk != NULL && youngest != NULL)); /* * Don't do merges that are too small or across too many * generations. */ if (nchunks < merge_min || lsm_tree->chunk[end_chunk]->generation > youngest->generation + max_gap) { for (i = 0; i < nchunks; i++) { chunk = lsm_tree->chunk[start_chunk + i]; WT_ASSERT(session, F_ISSET(chunk, WT_LSM_CHUNK_MERGING)); F_CLR(chunk, WT_LSM_CHUNK_MERGING); } return (WT_NOTFOUND); } *records = record_count; *start = start_chunk; *end = end_chunk; return (0); } /* * __wt_lsm_merge -- * Merge a set of chunks of an LSM tree. */ int __wt_lsm_merge(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, u_int id) { WT_BLOOM *bloom; WT_CURSOR *dest, *src; WT_DECL_RET; WT_ITEM key, value; WT_LSM_CHUNK *chunk; uint32_t generation; uint64_t insert_count, record_count; u_int dest_id, end_chunk, i, nchunks, start_chunk, start_id, verb; int tret; bool created_chunk, create_bloom, locked, in_sync; const char *cfg[3]; const char *drop_cfg[] = { WT_CONFIG_BASE(session, session_drop), "force", NULL }; bloom = NULL; chunk = NULL; dest = src = NULL; start_id = 0; created_chunk = create_bloom = locked = in_sync = false; /* Fast path if it's obvious no merges could be done. */ if (lsm_tree->nchunks < lsm_tree->merge_min && lsm_tree->merge_aggressiveness < WT_LSM_AGGRESSIVE_THRESHOLD) return (WT_NOTFOUND); /* * Use the lsm_tree lock to read the chunks (so no switches occur), but * avoid holding it while the merge is in progress: that may take a * long time. */ WT_RET(__wt_lsm_tree_writelock(session, lsm_tree)); locked = true; WT_ERR(__lsm_merge_span(session, lsm_tree, id, &start_chunk, &end_chunk, &record_count)); nchunks = (end_chunk + 1) - start_chunk; WT_ASSERT(session, nchunks > 0); start_id = lsm_tree->chunk[start_chunk]->id; /* Find the merge generation. */ for (generation = 0, i = 0; i < nchunks; i++) generation = WT_MAX(generation, lsm_tree->chunk[start_chunk + i]->generation + 1); WT_ERR(__wt_lsm_tree_writeunlock(session, lsm_tree)); locked = false; /* Allocate an ID for the merge. */ dest_id = __wt_atomic_add32(&lsm_tree->last, 1); /* * We only want to do the chunk loop if we're running with verbose, * so we wrap these statements in the conditional. Avoid the loop * in the normal path. */ if (WT_VERBOSE_ISSET(session, WT_VERB_LSM)) { WT_ERR(__wt_verbose(session, WT_VERB_LSM, "Merging %s chunks %u-%u into %u (%" PRIu64 " records)" ", generation %" PRIu32, lsm_tree->name, start_chunk, end_chunk, dest_id, record_count, generation)); for (verb = start_chunk; verb <= end_chunk; verb++) WT_ERR(__wt_verbose(session, WT_VERB_LSM, "%s: Chunk[%u] id %u", lsm_tree->name, verb, lsm_tree->chunk[verb]->id)); } WT_ERR(__wt_calloc_one(session, &chunk)); created_chunk = true; chunk->id = dest_id; if (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_MERGED) && (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_OLDEST) || start_chunk > 0) && record_count > 0) create_bloom = true; /* * Special setup for the merge cursor: * first, reset to open the dependent cursors; * then restrict the cursor to a specific number of chunks; * then set MERGE so the cursor doesn't track updates to the tree. */ WT_ERR(__wt_open_cursor(session, lsm_tree->name, NULL, NULL, &src)); F_SET(src, WT_CURSTD_RAW); WT_ERR(__wt_clsm_init_merge(src, start_chunk, start_id, nchunks)); WT_WITH_SCHEMA_LOCK(session, ret = __wt_lsm_tree_setup_chunk(session, lsm_tree, chunk)); WT_ERR(ret); if (create_bloom) { WT_ERR(__wt_lsm_tree_setup_bloom(session, lsm_tree, chunk)); WT_ERR(__wt_bloom_create(session, chunk->bloom_uri, lsm_tree->bloom_config, record_count, lsm_tree->bloom_bit_count, lsm_tree->bloom_hash_count, &bloom)); } /* Discard pages we read as soon as we're done with them. */ F_SET(session, WT_SESSION_NO_CACHE); cfg[0] = WT_CONFIG_BASE(session, session_open_cursor); cfg[1] = "bulk,raw,skip_sort_check"; cfg[2] = NULL; WT_ERR(__wt_open_cursor(session, chunk->uri, NULL, cfg, &dest)); #define LSM_MERGE_CHECK_INTERVAL 1000 for (insert_count = 0; (ret = src->next(src)) == 0; insert_count++) { if (insert_count % LSM_MERGE_CHECK_INTERVAL == 0) { if (!F_ISSET(lsm_tree, WT_LSM_TREE_ACTIVE)) WT_ERR(EINTR); WT_STAT_FAST_CONN_INCRV(session, lsm_rows_merged, LSM_MERGE_CHECK_INTERVAL); ++lsm_tree->merge_progressing; } WT_ERR(src->get_key(src, &key)); dest->set_key(dest, &key); WT_ERR(src->get_value(src, &value)); dest->set_value(dest, &value); WT_ERR(dest->insert(dest)); if (create_bloom) WT_ERR(__wt_bloom_insert(bloom, &key)); } WT_ERR_NOTFOUND_OK(ret); WT_STAT_FAST_CONN_INCRV(session, lsm_rows_merged, insert_count % LSM_MERGE_CHECK_INTERVAL); ++lsm_tree->merge_progressing; WT_ERR(__wt_verbose(session, WT_VERB_LSM, "Bloom size for %" PRIu64 " has %" PRIu64 " items inserted.", record_count, insert_count)); /* * Closing and syncing the files can take a while. Set the * merge_syncing field so that compact knows it is still in * progress. */ (void)__wt_atomic_add32(&lsm_tree->merge_syncing, 1); in_sync = true; /* * We've successfully created the new chunk. Now install it. We need * to ensure that the NO_CACHE flag is cleared and the bloom filter * is closed (even if a step fails), so track errors but don't return * until we've cleaned up. */ WT_TRET(src->close(src)); WT_TRET(dest->close(dest)); src = dest = NULL; F_CLR(session, WT_SESSION_NO_CACHE); /* * We're doing advisory reads to fault the new trees into cache. * Don't block if the cache is full: our next unit of work may be to * discard some trees to free space. */ F_SET(session, WT_SESSION_NO_CACHE_CHECK); if (create_bloom) { if (ret == 0) WT_TRET(__wt_bloom_finalize(bloom)); /* * Read in a key to make sure the Bloom filters btree handle is * open before it becomes visible to application threads. * Otherwise application threads will stall while it is opened * and internal pages are read into cache. */ if (ret == 0) { WT_CLEAR(key); WT_TRET_NOTFOUND_OK(__wt_bloom_get(bloom, &key)); } WT_TRET(__wt_bloom_close(bloom)); bloom = NULL; } WT_ERR(ret); /* * Open a handle on the new chunk before application threads attempt * to access it, opening it pre-loads internal pages into the file * system cache. */ cfg[1] = "checkpoint=" WT_CHECKPOINT; WT_ERR(__wt_open_cursor(session, chunk->uri, NULL, cfg, &dest)); WT_TRET(dest->close(dest)); dest = NULL; ++lsm_tree->merge_progressing; (void)__wt_atomic_sub32(&lsm_tree->merge_syncing, 1); in_sync = false; WT_ERR_NOTFOUND_OK(ret); WT_ERR(__wt_lsm_tree_set_chunk_size(session, chunk)); WT_ERR(__wt_lsm_tree_writelock(session, lsm_tree)); locked = true; /* * Check whether we raced with another merge, and adjust the chunk * array offset as necessary. */ if (start_chunk >= lsm_tree->nchunks || lsm_tree->chunk[start_chunk]->id != start_id) for (start_chunk = 0; start_chunk < lsm_tree->nchunks; start_chunk++) if (lsm_tree->chunk[start_chunk]->id == start_id) break; /* * It is safe to error out here - since the update can only fail * prior to making updates to the tree. */ WT_ERR(__wt_lsm_merge_update_tree( session, lsm_tree, start_chunk, nchunks, chunk)); if (create_bloom) F_SET(chunk, WT_LSM_CHUNK_BLOOM); chunk->count = insert_count; chunk->generation = generation; F_SET(chunk, WT_LSM_CHUNK_ONDISK); /* * We have no current way of continuing if the metadata update fails, * so we will panic in that case. Put some effort into cleaning up * after ourselves here - so things have a chance of shutting down. * * Any errors that happened after the tree was locked are * fatal - we can't guarantee the state of the tree. */ if ((ret = __wt_lsm_meta_write(session, lsm_tree)) != 0) WT_PANIC_ERR(session, ret, "Failed finalizing LSM merge"); lsm_tree->dsk_gen++; /* Update the throttling while holding the tree lock. */ __wt_lsm_tree_throttle(session, lsm_tree, true); /* Schedule a pass to discard old chunks */ WT_ERR(__wt_lsm_manager_push_entry( session, WT_LSM_WORK_DROP, 0, lsm_tree)); err: if (locked) WT_TRET(__wt_lsm_tree_writeunlock(session, lsm_tree)); if (in_sync) (void)__wt_atomic_sub32(&lsm_tree->merge_syncing, 1); if (src != NULL) WT_TRET(src->close(src)); if (dest != NULL) WT_TRET(dest->close(dest)); if (bloom != NULL) WT_TRET(__wt_bloom_close(bloom)); if (ret != 0 && created_chunk) { /* Drop the newly-created files on error. */ if (chunk->uri != NULL) { WT_WITH_SCHEMA_LOCK(session, tret = __wt_schema_drop(session, chunk->uri, drop_cfg)); WT_TRET(tret); } if (create_bloom && chunk->bloom_uri != NULL) { WT_WITH_SCHEMA_LOCK(session, tret = __wt_schema_drop( session, chunk->bloom_uri, drop_cfg)); WT_TRET(tret); } __wt_free(session, chunk->bloom_uri); __wt_free(session, chunk->uri); __wt_free(session, chunk); if (ret == EINTR) WT_TRET(__wt_verbose(session, WT_VERB_LSM, "Merge aborted due to close")); else WT_TRET(__wt_verbose(session, WT_VERB_LSM, "Merge failed with %s", __wt_strerror(session, ret, NULL, 0))); } F_CLR(session, WT_SESSION_NO_CACHE | WT_SESSION_NO_CACHE_CHECK); return (ret); }