diff options
author | Alex Gorrod <alexg@wiredtiger.com> | 2013-12-06 16:40:56 +1100 |
---|---|---|
committer | Alex Gorrod <alexg@wiredtiger.com> | 2013-12-06 16:40:56 +1100 |
commit | 7b6b95615a006b57f065debb46977a62f9de1912 (patch) | |
tree | 61d5112856130890aed4ae5a37fbf83dfdc81747 | |
parent | ecc3065d6673db50144d652adf61c78bc5a27365 (diff) | |
download | mongo-7b6b95615a006b57f065debb46977a62f9de1912.tar.gz |
Implement compact for LSM trees. Add a new timeout compact configuration.
-rw-r--r-- | dist/api_data.py | 8 | ||||
-rw-r--r-- | src/config/config_def.c | 9 | ||||
-rw-r--r-- | src/include/compact.h | 12 | ||||
-rw-r--r-- | src/include/extern.h | 2 | ||||
-rw-r--r-- | src/include/lsm.h | 2 | ||||
-rw-r--r-- | src/include/session.h | 1 | ||||
-rw-r--r-- | src/include/wiredtiger.in | 7 | ||||
-rw-r--r-- | src/include/wt_internal.h | 3 | ||||
-rw-r--r-- | src/lsm/lsm_merge.c | 9 | ||||
-rw-r--r-- | src/lsm/lsm_tree.c | 37 | ||||
-rw-r--r-- | src/lsm/lsm_worker.c | 6 | ||||
-rw-r--r-- | src/schema/schema_worker.c | 5 | ||||
-rw-r--r-- | src/session/session_api.c | 17 | ||||
-rw-r--r-- | src/session/session_compact.c | 140 |
14 files changed, 211 insertions, 47 deletions
diff --git a/dist/api_data.py b/dist/api_data.py index df360c17b8b..80e3f6c76a3 100644 --- a/dist/api_data.py +++ b/dist/api_data.py @@ -388,7 +388,13 @@ methods = { 'session.close' : Method([]), -'session.compact' : Method([]), +'session.compact' : Method([ + Config('timeout', '1200', r''' + maximum amount of time to allow for compact in seconds. The + actual amount of time spent in compact may exceed the configured + value. A value of zero disables the timeout''', + type='int'), +]), 'session.create' : Method(table_only_meta + file_config + lsm_config + source_meta + [ diff --git a/src/config/config_def.c b/src/config/config_def.c index 52ff532e98f..b7820d7d2ec 100644 --- a/src/config/config_def.c +++ b/src/config/config_def.c @@ -115,6 +115,11 @@ static const WT_CONFIG_CHECK confchk_session_checkpoint[] = { { NULL, NULL, NULL, NULL } }; +static const WT_CONFIG_CHECK confchk_session_compact[] = { + { "timeout", "int", NULL, NULL}, + { NULL, NULL, NULL, NULL } +}; + static const WT_CONFIG_CHECK confchk_lsm_subconfigs[] = { { "auto_throttle", "boolean", NULL, NULL }, { "bloom", "boolean", NULL, NULL }, @@ -355,8 +360,8 @@ static const WT_CONFIG_ENTRY config_entries[] = { NULL }, { "session.compact", - "", - NULL + "timeout=1200", + confchk_session_compact }, { "session.create", "allocation_size=4KB,block_allocation=best,block_compressor=," diff --git a/src/include/compact.h b/src/include/compact.h new file mode 100644 index 00000000000..f799b75ebca --- /dev/null +++ b/src/include/compact.h @@ -0,0 +1,12 @@ +/*- + * Copyright (c) 2008-2013 WiredTiger, Inc. + * All rights reserved. + * + * See the file LICENSE for redistribution information. + */ + +struct __wt_compact { + uint32_t lsm_count; /* Number of LSM trees seen */ + uint32_t file_count; /* Number of files seen */ + uint64_t max_time; /* Configured timeout */ +}; diff --git a/src/include/extern.h b/src/include/extern.h index 0f9a098d2d2..66042de0955 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -899,6 +899,7 @@ extern int __wt_lsm_tree_lock( WT_SESSION_IMPL *session, int exclusive); extern int __wt_lsm_tree_unlock( WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree); +extern int __wt_lsm_compact(WT_SESSION_IMPL *session, const char *name); extern int __wt_lsm_tree_worker(WT_SESSION_IMPL *session, const char *uri, int (*file_func)(WT_SESSION_IMPL *, @@ -1279,6 +1280,7 @@ extern int __wt_open_session(WT_CONNECTION_IMPL *conn, WT_EVENT_HANDLER *event_handler, const char *config, WT_SESSION_IMPL **sessionp); +extern int __wt_compact_uri_analyze(WT_SESSION_IMPL *session, const char *uri); extern int __wt_session_compact( WT_SESSION *wt_session, const char *uri, const char *config); diff --git a/src/include/lsm.h b/src/include/lsm.h index f4c819d4eaf..b7aa1df1b24 100644 --- a/src/include/lsm.h +++ b/src/include/lsm.h @@ -90,6 +90,7 @@ struct __wt_lsm_tree { long throttle_sleep; /* Rate limiting */ uint64_t chunk_fill_ms; /* Estimate of time to fill a chunk */ + uint64_t merge_progressing; /* Bumped when merges are active */ /* Configuration parameters */ uint32_t bloom_bit_count; @@ -125,6 +126,7 @@ struct __wt_lsm_tree { #define WT_LSM_TREE_OPEN 0x02 #define WT_LSM_TREE_THROTTLE 0x04 #define WT_LSM_TREE_WORKING 0x08 +#define WT_LSM_TREE_COMPACTING 0x10 uint32_t flags; }; diff --git a/src/include/session.h b/src/include/session.h index 7ceb3fdda5f..b00f26e1345 100644 --- a/src/include/session.h +++ b/src/include/session.h @@ -65,6 +65,7 @@ struct __wt_session_impl { TAILQ_HEAD(__cursors, __wt_cursor) cursors; WT_CURSOR_BACKUP *bkp_cursor; /* Hot backup cursor */ + WT_COMPACT *compact; /* Compact state */ WT_BTREE *metafile; /* Metadata file */ void *meta_track; /* Metadata operation tracking */ diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in index 6dbc980ea65..55a258923a2 100644 --- a/src/include/wiredtiger.in +++ b/src/include/wiredtiger.in @@ -901,7 +901,12 @@ struct __wt_session { * @param session the session handle * @param name the URI of the object to compact, such as * \c "table:stock" - * @configempty{session.compact, see dist/api_data.py} + * @configstart{session.compact, see dist/api_data.py} + * @config{timeout, maximum amount of time to allow for compact in + * seconds. The actual amount of time spent in compact may exceed the + * configured value. A value of zero disables the timeout., an integer; + * default \c 1200.} + * @configend * @errors */ int __F(compact)(WT_SESSION *session, diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h index d36656be947..3377f4d721e 100644 --- a/src/include/wt_internal.h +++ b/src/include/wt_internal.h @@ -83,6 +83,8 @@ struct __wt_col_rle; typedef struct __wt_col_rle WT_COL_RLE; struct __wt_colgroup; typedef struct __wt_colgroup WT_COLGROUP; +struct __wt_compact; + typedef struct __wt_compact WT_COMPACT; struct __wt_condvar; typedef struct __wt_condvar WT_CONDVAR; struct __wt_config; @@ -237,6 +239,7 @@ struct __wt_update; #include "btree.h" #include "cache.h" #include "config.h" +#include "compact.h" #include "cursor.h" #include "dlh.h" #include "error.h" diff --git a/src/lsm/lsm_merge.c b/src/lsm/lsm_merge.c index ee42fc3629e..c7c436ab801 100644 --- a/src/lsm/lsm_merge.c +++ b/src/lsm/lsm_merge.c @@ -61,6 +61,7 @@ __wt_lsm_merge( uint32_t generation, start_id; uint64_t insert_count, record_count, chunk_size; u_int dest_id, end_chunk, i, merge_min, nchunks, start_chunk; + u_int max_generation_gap; int create_bloom; const char *cfg[3]; @@ -91,10 +92,11 @@ __wt_lsm_merge( * can spend a long time waiting for merges to start in read-only * applications. */ - if (F_ISSET( + if (F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING) || F_ISSET( lsm_tree->chunk[lsm_tree->nchunks - 1], WT_LSM_CHUNK_ONDISK)) aggressive = 100; merge_min = aggressive ? 2 : lsm_tree->merge_min; + max_generation_gap = aggressive > 10 ? 3 : 1; /* * Only include chunks that are stable on disk and not involved in a @@ -188,7 +190,8 @@ __wt_lsm_merge( * generations. */ if (nchunks < merge_min || - chunk->generation > youngest->generation + 1) { + chunk->generation > + youngest->generation + max_generation_gap) { for (i = 0; i < nchunks; i++) F_CLR(lsm_tree->chunk[start_chunk + i], WT_LSM_CHUNK_MERGING); @@ -262,6 +265,7 @@ __wt_lsm_merge( WT_ERR(EINTR); WT_STAT_FAST_CONN_INCRV(session, lsm_rows_merged, LSM_MERGE_CHECK_INTERVAL); + ++lsm_tree->merge_progressing; } WT_ERR(src->get_key(src, &key)); @@ -276,6 +280,7 @@ __wt_lsm_merge( WT_STAT_FAST_CONN_INCRV(session, lsm_rows_merged, insert_count % LSM_MERGE_CHECK_INTERVAL); + ++lsm_tree->merge_progressing; WT_VERBOSE_ERR(session, lsm, "Bloom size for %" PRIu64 " has %" PRIu64 " items inserted.", record_count, insert_count); diff --git a/src/lsm/lsm_tree.c b/src/lsm/lsm_tree.c index 151d4d7ba95..0badd5841d7 100644 --- a/src/lsm/lsm_tree.c +++ b/src/lsm/lsm_tree.c @@ -909,6 +909,43 @@ __wt_lsm_tree_unlock( } /* + * __wt_lsm_compact -- + * Compact an LSM tree called via __wt_schema_worker. + */ +int +__wt_lsm_compact(WT_SESSION_IMPL *session, const char *name) +{ + WT_DECL_RET; + WT_LSM_TREE *lsm_tree; + struct timespec begin, end; + uint64_t last_merge_progressing; + + /* Ignore non LSM names. */ + if (!WT_PREFIX_MATCH(name, "lsm:")) + return (0); + + WT_RET(__wt_lsm_tree_get(session, name, 0, &lsm_tree)); + + WT_RET(__wt_epoch(session, &begin)); + + F_SET(lsm_tree, WT_LSM_TREE_COMPACTING); + /* Wait for merge activity to stop. */ + do { + last_merge_progressing = lsm_tree->merge_progressing; + __wt_sleep(10, 0); + WT_RET(__wt_epoch(session, &end)); + if (session->compact->max_time > 0 && + session->compact->max_time < + WT_TIMEDIFF(end, begin) / WT_BILLION) + WT_ERR(ETIMEDOUT); + } while (lsm_tree->merge_progressing != last_merge_progressing); + +err: F_CLR(lsm_tree, WT_LSM_TREE_COMPACTING); + + return (ret); +} + +/* * __wt_lsm_tree_worker -- * Run a schema worker operation on each level of a LSM tree. */ diff --git a/src/lsm/lsm_worker.c b/src/lsm/lsm_worker.c index 2b288cad6d4..7fa9de38b5e 100644 --- a/src/lsm/lsm_worker.c +++ b/src/lsm/lsm_worker.c @@ -465,7 +465,11 @@ __lsm_bloom_create( WT_RET(wt_session->drop(wt_session, chunk->bloom_uri, "force")); bloom = NULL; - + /* + * This is merge-like activity, and we don't want compacts to give up + * because we are creating a bunch of bloom filters before merging. + */ + ++lsm_tree->merge_progressing; WT_RET(__wt_bloom_create(session, chunk->bloom_uri, lsm_tree->bloom_config, chunk->count, lsm_tree->bloom_bit_count, lsm_tree->bloom_hash_count, &bloom)); diff --git a/src/schema/schema_worker.c b/src/schema/schema_worker.c index 7c774e178d8..60143f97f77 100644 --- a/src/schema/schema_worker.c +++ b/src/schema/schema_worker.c @@ -52,9 +52,8 @@ __wt_schema_worker(WT_SESSION_IMPL *session, WT_ERR(__wt_schema_worker(session, idx->source, file_func, name_func, cfg, open_flags)); } else if (WT_PREFIX_MATCH(uri, "lsm:")) { - if (file_func != __wt_compact) - WT_ERR(__wt_lsm_tree_worker(session, - uri, file_func, name_func, cfg, open_flags)); + WT_ERR(__wt_lsm_tree_worker(session, + uri, file_func, name_func, cfg, open_flags)); } else if (WT_PREFIX_SKIP(tablename, "table:")) { WT_ERR(__wt_schema_get_table(session, tablename, strlen(tablename), 0, &table)); diff --git a/src/session/session_api.c b/src/session/session_api.c index 133348f2a01..94fca3c731a 100644 --- a/src/session/session_api.c +++ b/src/session/session_api.c @@ -382,31 +382,19 @@ static int __session_compact(WT_SESSION *wt_session, const char *uri, const char *config) { WT_SESSION_IMPL *session; - WT_TXN *txn; session = (WT_SESSION_IMPL *)wt_session; - txn = &session->txn; /* Disallow objects in the WiredTiger name space. */ WT_RET(__wt_schema_name_check(session, uri)); - /* Compaction makes no sense for LSM objects, ignore requests. */ - if (WT_PREFIX_MATCH(uri, "lsm:")) - return (0); if (!WT_PREFIX_MATCH(uri, "colgroup:") && !WT_PREFIX_MATCH(uri, "file:") && !WT_PREFIX_MATCH(uri, "index:") && + !WT_PREFIX_MATCH(uri, "lsm:") && !WT_PREFIX_MATCH(uri, "table:")) return (__wt_bad_object_type(session, uri)); - /* - * Compaction requires checkpoints, which will fail in a transactional - * context. Check now so the error message isn't confusing. - */ - if (F_ISSET(txn, TXN_RUNNING)) - WT_RET_MSG(session, EINVAL, - "Compaction not permitted in a transaction"); - return (__wt_session_compact(wt_session, uri, config)); } @@ -693,10 +681,11 @@ __session_checkpoint(WT_SESSION *wt_session, const char *config) WT_TXN *txn; session = (WT_SESSION_IMPL *)wt_session; + SESSION_API_CALL(session, checkpoint, config, cfg); + txn = &session->txn; WT_STAT_FAST_CONN_INCR(session, txn_checkpoint); - SESSION_API_CALL(session, checkpoint, config, cfg); /* * Checkpoints require a snapshot to write a transactionally consistent diff --git a/src/session/session_compact.c b/src/session/session_compact.c index 03d519004ec..76b67119a05 100644 --- a/src/session/session_compact.c +++ b/src/session/session_compact.c @@ -96,63 +96,157 @@ */ /* - * __session_compact_worker -- - * Worker function to do the actual compaction call. + * __wt_compact_uri_analyze -- + * Extract information relevant to deciding what work compact needs to + * do from a URI that is part of a table schema. + * Called via the schema_worker function. */ -static int -__session_compact_worker( - WT_SESSION *wt_session, const char *uri, const char *config) +int +__wt_compact_uri_analyze(WT_SESSION_IMPL *session, const char *uri) { - WT_DECL_RET; - WT_SESSION_IMPL *session; + /* + * Add references to schema URI objects to the list of objects to be + * compacted. + */ + if (WT_PREFIX_MATCH(uri, "lsm:")) + session->compact->lsm_count++; + else if (WT_PREFIX_MATCH(uri, "file:")) + session->compact->file_count++; - session = (WT_SESSION_IMPL *)wt_session; - SESSION_API_CALL(session, compact, config, cfg); + return (0); +} - WT_WITH_SCHEMA_LOCK(session, - ret = __wt_schema_worker(session, uri, __wt_compact, NULL, cfg, 0)); +/* + * __session_compact_check_timeout -- + * + */ +static int +__session_compact_check_timeout( + WT_SESSION_IMPL *session, struct timespec begin) +{ + struct timespec end; -err: API_END_NOTFOUND_MAP(session, ret); + if (session->compact->max_time == 0) + return (0); + + WT_RET(__wt_epoch(session, &end)); + if (session->compact->max_time < + WT_TIMEDIFF(end, begin) / WT_BILLION) + WT_RET(ETIMEDOUT); + return (0); } /* - * __wt_session_compact -- - * Function to alternate between checkpoints and compaction calls. + * __compact_file -- + * */ -int -__wt_session_compact( - WT_SESSION *wt_session, const char *uri, const char *config) +static int +__compact_file(WT_SESSION_IMPL *session, const char *uri, const char *cfg[]) { - WT_DECL_ITEM(t); WT_DECL_RET; - WT_SESSION_IMPL *session; + WT_DECL_ITEM(t); + WT_SESSION *wt_session; + WT_TXN *txn; int i; + struct timespec start_time; - session = (WT_SESSION_IMPL *)wt_session; + wt_session = (WT_SESSION *)session; + txn = &session->txn; + + /* + * File compaction requires checkpoints, which will fail in a + * transactional context. Check now so the error message isn't + * confusing. + */ + if (session->compact->file_count != 0 && F_ISSET(txn, TXN_RUNNING)) + WT_ERR_MSG(session, EINVAL, + " File compaction not permitted in a transaction"); /* * Force the checkpoint: we don't want to skip it because the work we * need to have done is done in the underlying block manager. */ - WT_RET(__wt_scr_alloc(session, 128, &t)); + WT_ERR(__wt_scr_alloc(session, 128, &t)); WT_ERR(__wt_buf_fmt(session, t, "target=(\"%s\"),force=1", uri)); + WT_ERR(__wt_epoch(session, &start_time)); + /* * We compact 10% of the file on each pass, try 10 times (which is - * probably overkill), and quit if we make no progress. + * probably overkill), and quit if we make no progress. Check for a + * timeout between each operation, to be as responsive to the user + * as is practical. */ for (i = 0; i < 10; ++i) { WT_ERR(wt_session->checkpoint(wt_session, t->data)); + WT_ERR(__session_compact_check_timeout(session, start_time)); session->compaction = 0; - WT_ERR(__session_compact_worker(wt_session, uri, config)); + WT_WITH_SCHEMA_LOCK(session, + ret = __wt_schema_worker( + session, uri, __wt_compact, NULL, cfg, 0)); + WT_ERR(ret); if (!session->compaction) break; + WT_ERR(__session_compact_check_timeout(session, start_time)); WT_ERR(wt_session->checkpoint(wt_session, t->data)); + WT_ERR(__session_compact_check_timeout(session, start_time)); WT_ERR(wt_session->checkpoint(wt_session, t->data)); + WT_ERR(__session_compact_check_timeout(session, start_time)); } err: __wt_scr_free(&t); return (ret); } + +/* + * __wt_session_compact -- + * Function to alternate between checkpoints and compaction calls. + */ +int +__wt_session_compact( + WT_SESSION *wt_session, const char *uri, const char *config) +{ + WT_COMPACT compact; + WT_CONFIG_ITEM cval; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + session = (WT_SESSION_IMPL *)wt_session; + + SESSION_API_CALL(session, compact, config, cfg); + + /* Setup the structure in the session handle */ + memset(&compact, 0, sizeof(WT_COMPACT)); + session->compact = &compact; + + /* + * Find what types of data sources are being compacted. + */ + WT_ERR(__wt_schema_worker( + session, uri, NULL, __wt_compact_uri_analyze, cfg, 0)); + + /* We are done if there aren't any files we can compact. */ + if (session->compact->lsm_count == 0 && + session->compact->file_count == 0) + goto err; + + WT_ERR(__wt_config_gets(session, cfg, "timeout", &cval)); + session->compact->max_time = (uint64_t)cval.val; + + /* + * TODO: We can't hold the schema lock here - LSM acquires the + * schema lock when completing merges. We probably do want to stop + * "external" schema changes while we are compacting though. + */ + if (session->compact->lsm_count != 0) + WT_ERR(__wt_schema_worker( + session, uri, NULL, __wt_lsm_compact, cfg, 0)); + if (session->compact->file_count != 0) + WT_ERR(__compact_file(session, uri, cfg)); + +err: session->compact = NULL; + API_END_NOTFOUND_MAP(session, ret); + return (ret); +} |