diff options
author | Alex Gorrod <alexg@wiredtiger.com> | 2014-09-12 09:45:37 +1000 |
---|---|---|
committer | Alex Gorrod <alexg@wiredtiger.com> | 2014-09-12 09:45:37 +1000 |
commit | 72815f6745cdb63bf06dd5b777397acceb539e05 (patch) | |
tree | b19c78676471b799d97e06234eaa217dcf6a8b5b | |
parent | 34222b005ee0f5eab62e4e05dc108d4351e0dc78 (diff) | |
parent | 7179a227e579b49d91608e53c2df4a0d2a0a3030 (diff) | |
download | mongo-72815f6745cdb63bf06dd5b777397acceb539e05.tar.gz |
Merge pull request #1213 from wiredtiger/force-all
LSM compact wasn't working properly, leading to performance degradation. Refs #1200
-rw-r--r-- | src/btree/rec_track.c | 2 | ||||
-rw-r--r-- | src/include/extern.h | 1 | ||||
-rw-r--r-- | src/include/lsm.h | 11 | ||||
-rw-r--r-- | src/include/txn.i | 21 | ||||
-rw-r--r-- | src/lsm/lsm_manager.c | 8 | ||||
-rw-r--r-- | src/lsm/lsm_merge.c | 32 | ||||
-rw-r--r-- | src/lsm/lsm_tree.c | 115 | ||||
-rw-r--r-- | src/lsm/lsm_work_unit.c | 39 | ||||
-rw-r--r-- | src/lsm/lsm_worker.c | 22 |
9 files changed, 186 insertions, 65 deletions
diff --git a/src/btree/rec_track.c b/src/btree/rec_track.c index a4ef0aaa100..165df9d61e5 100644 --- a/src/btree/rec_track.c +++ b/src/btree/rec_track.c @@ -807,7 +807,7 @@ __wt_ovfl_txnc_add(WT_SESSION_IMPL *session, WT_PAGE *page, txnc->value_offset = WT_PTRDIFF32(p, txnc); txnc->value_size = WT_STORE_SIZE(value_size); memcpy(p, value, value_size); - txnc->current = __wt_txn_current_id(session); + txnc->current = __wt_txn_new_id(session); __wt_cache_page_inmem_incr(session, page, WT_OVFL_SIZE(WT_OVFL_TXNC) + addr_size + value_size); diff --git a/src/include/extern.h b/src/include/extern.h index c5e6a49fdf8..9783de0a7a6 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -1015,6 +1015,7 @@ extern int __wt_lsm_tree_worker(WT_SESSION_IMPL *session, extern int __wt_lsm_get_chunk_to_flush(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int force, + int *last, WT_LSM_CHUNK **chunkp); extern int __wt_lsm_work_switch( WT_SESSION_IMPL *session, WT_LSM_WORK_UNIT **entryp, diff --git a/src/include/lsm.h b/src/include/lsm.h index b4581b2b094..48984399acd 100644 --- a/src/include/lsm.h +++ b/src/include/lsm.h @@ -192,11 +192,12 @@ struct __wt_lsm_tree { int freeing_old_chunks; /* Whether chunks are being freed */ uint32_t merge_aggressiveness; /* Increase amount of work per merge */ -#define WT_LSM_TREE_ACTIVE 0x01 /* Workers are active */ -#define WT_LSM_TREE_COMPACTING 0x02 /* Tree is being compacted */ -#define WT_LSM_TREE_NEED_SWITCH 0x04 /* A new chunk should be created */ -#define WT_LSM_TREE_OPEN 0x08 /* The tree is open */ -#define WT_LSM_TREE_THROTTLE 0x10 /* Throttle updates */ +#define WT_LSM_TREE_ACTIVE 0x01 /* Workers are active */ +#define WT_LSM_TREE_COMPACT_FLUSH 0x02 /* Flushed for compact */ +#define WT_LSM_TREE_COMPACTING 0x04 /* Tree being compacted */ +#define WT_LSM_TREE_NEED_SWITCH 0x08 /* New chunk needs creating */ +#define WT_LSM_TREE_OPEN 0x10 /* The tree is open */ +#define WT_LSM_TREE_THROTTLE 0x20 /* Throttle updates */ uint32_t flags; #define WT_LSM_TREE_EXCLUSIVE 0x01 /* Tree is opened exclusively */ diff --git a/src/include/txn.i b/src/include/txn.i index 3854429f8e4..81559bfe490 100644 --- a/src/include/txn.i +++ b/src/include/txn.i @@ -179,7 +179,7 @@ __wt_txn_read(WT_SESSION_IMPL *session, WT_UPDATE *upd) /* * __wt_txn_autocommit_check -- - * If an auto-commit transaction is required, start one. + * If an auto-commit transaction is required, start one. */ static inline int __wt_txn_autocommit_check(WT_SESSION_IMPL *session) @@ -195,23 +195,20 @@ __wt_txn_autocommit_check(WT_SESSION_IMPL *session) } /* - * __wt_txn_current_id -- - * Get the current transaction ID. - */ -static inline uint64_t -__wt_txn_current_id(WT_SESSION_IMPL *session) -{ - return (S2C(session)->txn_global.current); -} - -/* * __wt_txn_new_id -- * Allocate a new transaction ID. */ static inline uint64_t __wt_txn_new_id(WT_SESSION_IMPL *session) { - return WT_ATOMIC_ADD(S2C(session)->txn_global.current, 1); + /* + * We want the global value to lead the allocated values, so that any + * allocated transaction ID eventually becomes globally visible. When + * there are no transactions running, the oldest_id will reach the + * global current ID, so we want post-increment semantics. Our atomic + * add primitive does pre-increment, so adjust the result here. + */ + return WT_ATOMIC_ADD(S2C(session)->txn_global.current, 1) - 1; } /* diff --git a/src/lsm/lsm_manager.c b/src/lsm/lsm_manager.c index 2ac20b9b92d..91affe53ef3 100644 --- a/src/lsm/lsm_manager.c +++ b/src/lsm/lsm_manager.c @@ -204,10 +204,10 @@ __lsm_manager_aggressive_update(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) if (lsm_tree->merge_aggressiveness > old_aggressive) WT_RET(__wt_verbose(session, WT_VERB_LSM, - "LSM merge got aggressive (%u), " - "%u / %" PRIu64, - lsm_tree->merge_aggressiveness, stallms, - lsm_tree->chunk_fill_ms)); + "LSM merge %s got aggressive (%u), " + "%u / %" PRIu64, + lsm_tree->name, lsm_tree->merge_aggressiveness, stallms, + lsm_tree->chunk_fill_ms)); return (0); } diff --git a/src/lsm/lsm_merge.c b/src/lsm/lsm_merge.c index bf758abd6b1..363fe77b93e 100644 --- a/src/lsm/lsm_merge.c +++ b/src/lsm/lsm_merge.c @@ -61,6 +61,7 @@ __wt_lsm_merge( uint32_t aggressive, generation, max_gap, max_gen, max_level, start_id; uint64_t insert_count, record_count, chunk_size; u_int dest_id, end_chunk, i, merge_max, merge_min, nchunks, start_chunk; + u_int verb; int create_bloom, locked, tret; const char *cfg[3]; const char *drop_cfg[] = @@ -72,16 +73,17 @@ __wt_lsm_merge( dest = src = NULL; locked = 0; start_id = 0; - aggressive = lsm_tree->merge_aggressiveness; /* - * If the tree is open read-only be very aggressive. Otherwise, we can - * spend a long time waiting for merges to start in read-only - * applications. + * If the tree is open read-only or we are compacting, be very + * aggressive. Otherwise, we can spend a long time waiting for merges + * to start in read-only applications. */ - if (!lsm_tree->modified) + if (!lsm_tree->modified || + F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING)) lsm_tree->merge_aggressiveness = 10; + aggressive = lsm_tree->merge_aggressiveness; merge_max = (aggressive > 5) ? 100 : lsm_tree->merge_min; merge_min = (aggressive > 5) ? 2 : lsm_tree->merge_min; max_gap = (aggressive + 4) / 5; @@ -249,10 +251,22 @@ __wt_lsm_merge( /* Allocate an ID for the merge. */ dest_id = WT_ATOMIC_ADD(lsm_tree->last, 1); - WT_RET(__wt_verbose(session, WT_VERB_LSM, - "Merging chunks %u-%u into %u (%" PRIu64 " records)" - ", generation %" PRIu32, - start_chunk, end_chunk, dest_id, record_count, generation)); + /* + * We only want to do the chunk loop if we're running with verbose, + * so we wrap these statements in the conditional. Avoid the loop + * in the normal path. + */ + if (WT_VERBOSE_ISSET(session, WT_VERB_LSM)) { + WT_RET(__wt_verbose(session, WT_VERB_LSM, + "Merging %s chunks %u-%u into %u (%" PRIu64 " records)" + ", generation %" PRIu32, + lsm_tree->name, + start_chunk, end_chunk, dest_id, record_count, generation)); + for (verb = start_chunk; verb <= end_chunk; verb++) + WT_RET(__wt_verbose(session, WT_VERB_LSM, + "%s: Chunk[%u] id %u", + lsm_tree->name, verb, lsm_tree->chunk[verb]->id)); + } WT_RET(__wt_calloc_def(session, 1, &chunk)); chunk->id = dest_id; diff --git a/src/lsm/lsm_tree.c b/src/lsm/lsm_tree.c index f8a7083efac..4eec6a9b559 100644 --- a/src/lsm/lsm_tree.c +++ b/src/lsm/lsm_tree.c @@ -733,7 +733,7 @@ __wt_lsm_tree_switch(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) /* Set the switch transaction in the previous chunk, if necessary. */ if (chunk != NULL && chunk->switch_txn == WT_TXN_NONE) - chunk->switch_txn = __wt_txn_current_id(session); + chunk->switch_txn = __wt_txn_new_id(session); /* Update the throttle time. */ __wt_lsm_tree_throttle(session, lsm_tree, 0); @@ -744,8 +744,8 @@ __wt_lsm_tree_switch(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) nchunks + 1, &lsm_tree->chunk)); WT_ERR(__wt_verbose(session, WT_VERB_LSM, - "Tree switch to: %" PRIu32 ", checkpoint throttle %ld, " - "merge throttle %ld", + "Tree %s switch to: %" PRIu32 ", checkpoint throttle %ld, " + "merge throttle %ld", lsm_tree->name, new_id, lsm_tree->ckpt_throttle, lsm_tree->merge_throttle)); WT_ERR(__wt_calloc_def(session, 1, &chunk)); @@ -1007,9 +1007,9 @@ __wt_lsm_compact(WT_SESSION_IMPL *session, const char *name, int *skip) WT_LSM_CHUNK *chunk; WT_LSM_TREE *lsm_tree; time_t begin, end; - int i, compacting, locked; + int i, compacting, flushing, locked, ref; - compacting = locked = 0; + compacting = flushing = locked = ref = 0; /* * This function is applied to all matching sources: ignore anything * that is not an LSM tree. @@ -1028,6 +1028,19 @@ __wt_lsm_compact(WT_SESSION_IMPL *session, const char *name, int *skip) WT_ERR(__wt_seconds(session, &begin)); + /* + * Compacting has two distinct phases. + * 1. All in-memory chunks up to and including the current + * current chunk must be flushed. Normally, the flush code + * does not flush the last, in-use chunk, so we set a force + * flag to include that last chunk. We monitor the state of the + * last chunk and periodically push another forced flush work + * unit until it is complete. + * 2. After all flushing is done, we move onto the merging + * phase for compaction. Again, we monitor the state and + * continue to push merge work units until all merging is done. + */ + /* Lock the tree: single-thread compaction. */ WT_ERR(__wt_lsm_tree_lock(session, lsm_tree, 1)); locked = 1; @@ -1036,39 +1049,83 @@ __wt_lsm_compact(WT_SESSION_IMPL *session, const char *name, int *skip) lsm_tree->merge_throttle = 0; lsm_tree->merge_aggressiveness = 0; - /* If another thread started compacting this tree, we're done. */ - if (F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING)) + /* If another thread started a compact on this tree, we're done. */ + if (F_ISSET(lsm_tree, + WT_LSM_TREE_COMPACT_FLUSH | WT_LSM_TREE_COMPACTING)) goto err; - compacting = 1; - F_SET(lsm_tree, WT_LSM_TREE_COMPACTING); + flushing = 1; + F_SET(lsm_tree, WT_LSM_TREE_COMPACT_FLUSH); /* * Set the switch transaction on the current chunk, if it * hasn't been set before. This prevents further writes, so it * can be flushed by the checkpoint worker. */ + chunk = NULL; if (lsm_tree->nchunks > 0 && - (chunk = lsm_tree->chunk[lsm_tree->nchunks - 1]) != NULL && - chunk->switch_txn == WT_TXN_NONE) - chunk->switch_txn = __wt_txn_current_id(session); + (chunk = lsm_tree->chunk[lsm_tree->nchunks - 1]) != NULL) { + if (chunk->switch_txn == WT_TXN_NONE) + chunk->switch_txn = __wt_txn_new_id(session); + /* + * If we have a chunk, we want to look for it to be on-disk. + * So we need to add a reference to keep it available. + */ + (void)WT_ATOMIC_ADD(chunk->refcnt, 1); + ref = 1; + } locked = 0; WT_ERR(__wt_lsm_tree_unlock(session, lsm_tree)); - /* Make sure the in-memory chunk gets flushed but not switched. */ - WT_ERR(__wt_lsm_manager_push_entry( - session, WT_LSM_WORK_FLUSH | WT_LSM_WORK_FORCE, lsm_tree)); + if (chunk != NULL) + WT_ERR(__wt_verbose(session, WT_VERB_LSM, + "Compact force flush %s flags 0x%" PRIx32 + " chunk %u flags 0x%" + PRIx32, name, lsm_tree->flags, chunk->id, chunk->flags)); + /* Make sure the in-memory chunk gets flushed but not switched. */ + WT_ERR(__wt_lsm_manager_push_entry(session, + WT_LSM_WORK_FLUSH | WT_LSM_WORK_FORCE, lsm_tree)); /* Wait for the work unit queues to drain. */ while (F_ISSET(lsm_tree, WT_LSM_TREE_ACTIVE)) { /* + * The flush flag is cleared when the the chunk has been + * flushed. Continue to push forced flushes until the + * chunk is on disk. Once it is on disk move to the compacting + * phase. + */ + if (flushing && !F_ISSET(lsm_tree, WT_LSM_TREE_COMPACT_FLUSH)) { + if (chunk != NULL && + !F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)) { + WT_ERR(__wt_verbose(session, WT_VERB_LSM, + "Compact flush retry %s chunk %u", + name, chunk->id)); + F_SET(lsm_tree, WT_LSM_TREE_COMPACT_FLUSH); + WT_ERR(__wt_lsm_manager_push_entry(session, + WT_LSM_WORK_FLUSH | WT_LSM_WORK_FORCE, + lsm_tree)); + } else { + if (ref) { + WT_ASSERT(session, chunk != NULL); + WT_ERR(__wt_verbose(session, + WT_VERB_LSM, + "Compact flush done %s chunk %u", + name, chunk->id)); + (void)WT_ATOMIC_SUB(chunk->refcnt, 1); + } + flushing = ref = 0; + compacting = 1; + F_SET(lsm_tree, WT_LSM_TREE_COMPACTING); + } + } + /* * The compacting flag is cleared when no merges can be done. * Ensure that we push through some aggressive merges before * stopping otherwise we might not do merges that would * span chunks with different generations. */ - if (!F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING)) { + if (compacting && !F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING)) { if (lsm_tree->merge_aggressiveness < 10) { F_SET(lsm_tree, WT_LSM_TREE_COMPACTING); lsm_tree->merge_aggressiveness = 10; @@ -1086,21 +1143,29 @@ __wt_lsm_compact(WT_SESSION_IMPL *session, const char *name, int *skip) * done. If we are pushing merges, make sure they are * aggressive, to avoid duplicating effort. */ + if (compacting) #define COMPACT_PARALLEL_MERGES 5 - for (i = lsm_tree->queue_ref; - i < COMPACT_PARALLEL_MERGES; i++) { - lsm_tree->merge_aggressiveness = 10; - WT_ERR(__wt_lsm_manager_push_entry( - session, WT_LSM_WORK_MERGE, lsm_tree)); - } + for (i = lsm_tree->queue_ref; + i < COMPACT_PARALLEL_MERGES; i++) { + lsm_tree->merge_aggressiveness = 10; + WT_ERR(__wt_lsm_manager_push_entry( + session, WT_LSM_WORK_MERGE, lsm_tree)); + } } -err: if (locked) - WT_ERR(__wt_lsm_tree_unlock(session, lsm_tree)); - /* Ensure the compacting flag is cleared if we set it. */ +err: + /* Ensure anything we set is cleared. */ + if (flushing) + F_CLR(lsm_tree, WT_LSM_TREE_COMPACT_FLUSH); + if (ref) + (void)WT_ATOMIC_SUB(chunk->refcnt, 1); if (compacting) { F_CLR(lsm_tree, WT_LSM_TREE_COMPACTING); lsm_tree->merge_aggressiveness = 0; + if (locked) + WT_ERR(__wt_lsm_tree_unlock(session, lsm_tree)); } + WT_ERR(__wt_verbose(session, WT_VERB_LSM, + "Compact %s complete, return %d", name, ret)); __wt_lsm_tree_release(session, lsm_tree); return (ret); diff --git a/src/lsm/lsm_work_unit.c b/src/lsm/lsm_work_unit.c index e0b4a6a808b..eb791f98f5f 100644 --- a/src/lsm/lsm_work_unit.c +++ b/src/lsm/lsm_work_unit.c @@ -67,22 +67,38 @@ err: WT_TRET(__wt_lsm_tree_unlock(session, lsm_tree)); */ int __wt_lsm_get_chunk_to_flush(WT_SESSION_IMPL *session, - WT_LSM_TREE *lsm_tree, int force, WT_LSM_CHUNK **chunkp) + WT_LSM_TREE *lsm_tree, int force, int *last, WT_LSM_CHUNK **chunkp) { u_int i, end; *chunkp = NULL; + *last = 0; WT_ASSERT(session, lsm_tree->queue_ref > 0); WT_RET(__wt_lsm_tree_lock(session, lsm_tree, 0)); if (!F_ISSET(lsm_tree, WT_LSM_TREE_ACTIVE)) return (__wt_lsm_tree_unlock(session, lsm_tree)); + /* + * Normally we don't want to force out the last chunk. But if we're + * doing a forced flush, likely from a compact call, then we want + * to include the final chunk. + */ end = force ? lsm_tree->nchunks : lsm_tree->nchunks - 1; for (i = 0; i < end; i++) { if (!F_ISSET(lsm_tree->chunk[i], WT_LSM_CHUNK_ONDISK)) { (void)WT_ATOMIC_ADD(lsm_tree->chunk[i]->refcnt, 1); + WT_RET(__wt_verbose(session, WT_VERB_LSM, + "Flush%s: return chunk %u of %u: %s", + force ? " w/ force" : "", i, end - 1, + lsm_tree->chunk[i]->uri)); *chunkp = lsm_tree->chunk[i]; + /* + * Let the caller know if this is the last chunk we + * could have selected or an earlier one. + */ + if (i == end - 1) + *last = 1; break; } } @@ -217,16 +233,25 @@ __wt_lsm_checkpoint_chunk(WT_SESSION_IMPL *session, else WT_RET_MSG(session, ret, "discard handle"); } - if (F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)) + if (F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)) { + WT_RET(__wt_verbose(session, WT_VERB_LSM, + "LSM worker %s already on disk", + chunk->uri)); return (0); + } /* Stop if a running transaction needs the chunk. */ __wt_txn_update_oldest(session); if (chunk->switch_txn == WT_TXN_NONE || - !__wt_txn_visible_all(session, chunk->switch_txn)) + !__wt_txn_visible_all(session, chunk->switch_txn)) { + WT_RET(__wt_verbose(session, WT_VERB_LSM, + "LSM worker %s: running transaction, return", + chunk->uri)); return (0); + } - WT_RET(__wt_verbose(session, WT_VERB_LSM, "LSM worker flushing")); + WT_RET(__wt_verbose(session, WT_VERB_LSM, "LSM worker flushing %s", + chunk->uri)); /* * Flush the file before checkpointing: this is the expensive part in @@ -249,7 +274,8 @@ __wt_lsm_checkpoint_chunk(WT_SESSION_IMPL *session, } WT_RET(ret); - WT_RET(__wt_verbose(session, WT_VERB_LSM, "LSM worker checkpointing")); + WT_RET(__wt_verbose(session, WT_VERB_LSM, "LSM worker checkpointing %s", + chunk->uri)); WT_WITH_SCHEMA_LOCK(session, ret = __wt_schema_worker(session, chunk->uri, @@ -290,7 +316,8 @@ __wt_lsm_checkpoint_chunk(WT_SESSION_IMPL *session, /* Make sure we aren't pinning a transaction ID. */ __wt_txn_release_snapshot(session); - WT_RET(__wt_verbose(session, WT_VERB_LSM, "LSM worker checkpointed")); + WT_RET(__wt_verbose(session, WT_VERB_LSM, "LSM worker checkpointed %s", + chunk->uri)); /* * Schedule a bloom filter create for our newly flushed chunk */ if (!FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_OFF)) diff --git a/src/lsm/lsm_worker.c b/src/lsm/lsm_worker.c index 4aab508896c..1f2b76ba720 100644 --- a/src/lsm/lsm_worker.c +++ b/src/lsm/lsm_worker.c @@ -32,7 +32,7 @@ __lsm_worker_general_op( WT_DECL_RET; WT_LSM_CHUNK *chunk; WT_LSM_WORK_UNIT *entry; - int force; + int force, last; *completed = 0; if (!F_ISSET(cookie, WT_LSM_WORK_FLUSH) && @@ -47,15 +47,31 @@ __lsm_worker_general_op( if ((entry->flags & WT_LSM_WORK_MASK) == WT_LSM_WORK_FLUSH) { force = F_ISSET(entry, WT_LSM_WORK_FORCE); F_CLR(entry, WT_LSM_WORK_FORCE); - WT_ERR(__wt_lsm_get_chunk_to_flush( - session, entry->lsm_tree, force, &chunk)); + last = 0; + WT_ERR(__wt_lsm_get_chunk_to_flush(session, + entry->lsm_tree, force, &last, &chunk)); + /* + * If we got a chunk to flush, checkpoint it. + */ if (chunk != NULL) { + WT_ERR(__wt_verbose(session, WT_VERB_LSM, + "Flush%s%s chunk %d %s", + force ? " w/ force" : "", + last ? " last" : "", + chunk->id, chunk->uri)); ret = __wt_lsm_checkpoint_chunk( session, entry->lsm_tree, chunk); WT_ASSERT(session, chunk->refcnt > 0); (void)WT_ATOMIC_SUB(chunk->refcnt, 1); WT_ERR(ret); } + /* + * If we flushed the last chunk for a compact, clear the + * flag so compact knows that is complete. + */ + if (last && force && + F_ISSET(entry->lsm_tree, WT_LSM_TREE_COMPACT_FLUSH)) + F_CLR(entry->lsm_tree, WT_LSM_TREE_COMPACT_FLUSH); } else if (entry->flags == WT_LSM_WORK_DROP) WT_ERR(__wt_lsm_free_chunks(session, entry->lsm_tree)); else if (entry->flags == WT_LSM_WORK_BLOOM) { |