summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Gorrod <alexg@wiredtiger.com>2013-12-06 16:40:56 +1100
committerAlex Gorrod <alexg@wiredtiger.com>2013-12-06 16:40:56 +1100
commit7b6b95615a006b57f065debb46977a62f9de1912 (patch)
tree61d5112856130890aed4ae5a37fbf83dfdc81747
parentecc3065d6673db50144d652adf61c78bc5a27365 (diff)
downloadmongo-7b6b95615a006b57f065debb46977a62f9de1912.tar.gz
Implement compact for LSM trees. Add a new timeout compact configuration.
-rw-r--r--dist/api_data.py8
-rw-r--r--src/config/config_def.c9
-rw-r--r--src/include/compact.h12
-rw-r--r--src/include/extern.h2
-rw-r--r--src/include/lsm.h2
-rw-r--r--src/include/session.h1
-rw-r--r--src/include/wiredtiger.in7
-rw-r--r--src/include/wt_internal.h3
-rw-r--r--src/lsm/lsm_merge.c9
-rw-r--r--src/lsm/lsm_tree.c37
-rw-r--r--src/lsm/lsm_worker.c6
-rw-r--r--src/schema/schema_worker.c5
-rw-r--r--src/session/session_api.c17
-rw-r--r--src/session/session_compact.c140
14 files changed, 211 insertions, 47 deletions
diff --git a/dist/api_data.py b/dist/api_data.py
index df360c17b8b..80e3f6c76a3 100644
--- a/dist/api_data.py
+++ b/dist/api_data.py
@@ -388,7 +388,13 @@ methods = {
'session.close' : Method([]),
-'session.compact' : Method([]),
+'session.compact' : Method([
+ Config('timeout', '1200', r'''
+ maximum amount of time to allow for compact in seconds. The
+ actual amount of time spent in compact may exceed the configured
+ value. A value of zero disables the timeout''',
+ type='int'),
+]),
'session.create' :
Method(table_only_meta + file_config + lsm_config + source_meta + [
diff --git a/src/config/config_def.c b/src/config/config_def.c
index 52ff532e98f..b7820d7d2ec 100644
--- a/src/config/config_def.c
+++ b/src/config/config_def.c
@@ -115,6 +115,11 @@ static const WT_CONFIG_CHECK confchk_session_checkpoint[] = {
{ NULL, NULL, NULL, NULL }
};
+static const WT_CONFIG_CHECK confchk_session_compact[] = {
+ { "timeout", "int", NULL, NULL},
+ { NULL, NULL, NULL, NULL }
+};
+
static const WT_CONFIG_CHECK confchk_lsm_subconfigs[] = {
{ "auto_throttle", "boolean", NULL, NULL },
{ "bloom", "boolean", NULL, NULL },
@@ -355,8 +360,8 @@ static const WT_CONFIG_ENTRY config_entries[] = {
NULL
},
{ "session.compact",
- "",
- NULL
+ "timeout=1200",
+ confchk_session_compact
},
{ "session.create",
"allocation_size=4KB,block_allocation=best,block_compressor=,"
diff --git a/src/include/compact.h b/src/include/compact.h
new file mode 100644
index 00000000000..f799b75ebca
--- /dev/null
+++ b/src/include/compact.h
@@ -0,0 +1,12 @@
+/*-
+ * Copyright (c) 2008-2013 WiredTiger, Inc.
+ * All rights reserved.
+ *
+ * See the file LICENSE for redistribution information.
+ */
+
+struct __wt_compact {
+ uint32_t lsm_count; /* Number of LSM trees seen */
+ uint32_t file_count; /* Number of files seen */
+ uint64_t max_time; /* Configured timeout */
+};
diff --git a/src/include/extern.h b/src/include/extern.h
index 0f9a098d2d2..66042de0955 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -899,6 +899,7 @@ extern int __wt_lsm_tree_lock( WT_SESSION_IMPL *session,
int exclusive);
extern int __wt_lsm_tree_unlock( WT_SESSION_IMPL *session,
WT_LSM_TREE *lsm_tree);
+extern int __wt_lsm_compact(WT_SESSION_IMPL *session, const char *name);
extern int __wt_lsm_tree_worker(WT_SESSION_IMPL *session,
const char *uri,
int (*file_func)(WT_SESSION_IMPL *,
@@ -1279,6 +1280,7 @@ extern int __wt_open_session(WT_CONNECTION_IMPL *conn,
WT_EVENT_HANDLER *event_handler,
const char *config,
WT_SESSION_IMPL **sessionp);
+extern int __wt_compact_uri_analyze(WT_SESSION_IMPL *session, const char *uri);
extern int __wt_session_compact( WT_SESSION *wt_session,
const char *uri,
const char *config);
diff --git a/src/include/lsm.h b/src/include/lsm.h
index f4c819d4eaf..b7aa1df1b24 100644
--- a/src/include/lsm.h
+++ b/src/include/lsm.h
@@ -90,6 +90,7 @@ struct __wt_lsm_tree {
long throttle_sleep; /* Rate limiting */
uint64_t chunk_fill_ms; /* Estimate of time to fill a chunk */
+ uint64_t merge_progressing; /* Bumped when merges are active */
/* Configuration parameters */
uint32_t bloom_bit_count;
@@ -125,6 +126,7 @@ struct __wt_lsm_tree {
#define WT_LSM_TREE_OPEN 0x02
#define WT_LSM_TREE_THROTTLE 0x04
#define WT_LSM_TREE_WORKING 0x08
+#define WT_LSM_TREE_COMPACTING 0x10
uint32_t flags;
};
diff --git a/src/include/session.h b/src/include/session.h
index 7ceb3fdda5f..b00f26e1345 100644
--- a/src/include/session.h
+++ b/src/include/session.h
@@ -65,6 +65,7 @@ struct __wt_session_impl {
TAILQ_HEAD(__cursors, __wt_cursor) cursors;
WT_CURSOR_BACKUP *bkp_cursor; /* Hot backup cursor */
+ WT_COMPACT *compact; /* Compact state */
WT_BTREE *metafile; /* Metadata file */
void *meta_track; /* Metadata operation tracking */
diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in
index 6dbc980ea65..55a258923a2 100644
--- a/src/include/wiredtiger.in
+++ b/src/include/wiredtiger.in
@@ -901,7 +901,12 @@ struct __wt_session {
* @param session the session handle
* @param name the URI of the object to compact, such as
* \c "table:stock"
- * @configempty{session.compact, see dist/api_data.py}
+ * @configstart{session.compact, see dist/api_data.py}
+ * @config{timeout, maximum amount of time to allow for compact in
+ * seconds. The actual amount of time spent in compact may exceed the
+ * configured value. A value of zero disables the timeout., an integer;
+ * default \c 1200.}
+ * @configend
* @errors
*/
int __F(compact)(WT_SESSION *session,
diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h
index d36656be947..3377f4d721e 100644
--- a/src/include/wt_internal.h
+++ b/src/include/wt_internal.h
@@ -83,6 +83,8 @@ struct __wt_col_rle;
typedef struct __wt_col_rle WT_COL_RLE;
struct __wt_colgroup;
typedef struct __wt_colgroup WT_COLGROUP;
+struct __wt_compact;
+ typedef struct __wt_compact WT_COMPACT;
struct __wt_condvar;
typedef struct __wt_condvar WT_CONDVAR;
struct __wt_config;
@@ -237,6 +239,7 @@ struct __wt_update;
#include "btree.h"
#include "cache.h"
#include "config.h"
+#include "compact.h"
#include "cursor.h"
#include "dlh.h"
#include "error.h"
diff --git a/src/lsm/lsm_merge.c b/src/lsm/lsm_merge.c
index ee42fc3629e..c7c436ab801 100644
--- a/src/lsm/lsm_merge.c
+++ b/src/lsm/lsm_merge.c
@@ -61,6 +61,7 @@ __wt_lsm_merge(
uint32_t generation, start_id;
uint64_t insert_count, record_count, chunk_size;
u_int dest_id, end_chunk, i, merge_min, nchunks, start_chunk;
+ u_int max_generation_gap;
int create_bloom;
const char *cfg[3];
@@ -91,10 +92,11 @@ __wt_lsm_merge(
* can spend a long time waiting for merges to start in read-only
* applications.
*/
- if (F_ISSET(
+ if (F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING) || F_ISSET(
lsm_tree->chunk[lsm_tree->nchunks - 1], WT_LSM_CHUNK_ONDISK))
aggressive = 100;
merge_min = aggressive ? 2 : lsm_tree->merge_min;
+ max_generation_gap = aggressive > 10 ? 3 : 1;
/*
* Only include chunks that are stable on disk and not involved in a
@@ -188,7 +190,8 @@ __wt_lsm_merge(
* generations.
*/
if (nchunks < merge_min ||
- chunk->generation > youngest->generation + 1) {
+ chunk->generation >
+ youngest->generation + max_generation_gap) {
for (i = 0; i < nchunks; i++)
F_CLR(lsm_tree->chunk[start_chunk + i],
WT_LSM_CHUNK_MERGING);
@@ -262,6 +265,7 @@ __wt_lsm_merge(
WT_ERR(EINTR);
WT_STAT_FAST_CONN_INCRV(session,
lsm_rows_merged, LSM_MERGE_CHECK_INTERVAL);
+ ++lsm_tree->merge_progressing;
}
WT_ERR(src->get_key(src, &key));
@@ -276,6 +280,7 @@ __wt_lsm_merge(
WT_STAT_FAST_CONN_INCRV(session,
lsm_rows_merged, insert_count % LSM_MERGE_CHECK_INTERVAL);
+ ++lsm_tree->merge_progressing;
WT_VERBOSE_ERR(session, lsm,
"Bloom size for %" PRIu64 " has %" PRIu64 " items inserted.",
record_count, insert_count);
diff --git a/src/lsm/lsm_tree.c b/src/lsm/lsm_tree.c
index 151d4d7ba95..0badd5841d7 100644
--- a/src/lsm/lsm_tree.c
+++ b/src/lsm/lsm_tree.c
@@ -909,6 +909,43 @@ __wt_lsm_tree_unlock(
}
/*
+ * __wt_lsm_compact --
+ * Compact an LSM tree called via __wt_schema_worker.
+ */
+int
+__wt_lsm_compact(WT_SESSION_IMPL *session, const char *name)
+{
+ WT_DECL_RET;
+ WT_LSM_TREE *lsm_tree;
+ struct timespec begin, end;
+ uint64_t last_merge_progressing;
+
+ /* Ignore non LSM names. */
+ if (!WT_PREFIX_MATCH(name, "lsm:"))
+ return (0);
+
+ WT_RET(__wt_lsm_tree_get(session, name, 0, &lsm_tree));
+
+ WT_RET(__wt_epoch(session, &begin));
+
+ F_SET(lsm_tree, WT_LSM_TREE_COMPACTING);
+ /* Wait for merge activity to stop. */
+ do {
+ last_merge_progressing = lsm_tree->merge_progressing;
+ __wt_sleep(10, 0);
+ WT_RET(__wt_epoch(session, &end));
+ if (session->compact->max_time > 0 &&
+ session->compact->max_time <
+ WT_TIMEDIFF(end, begin) / WT_BILLION)
+ WT_ERR(ETIMEDOUT);
+ } while (lsm_tree->merge_progressing != last_merge_progressing);
+
+err: F_CLR(lsm_tree, WT_LSM_TREE_COMPACTING);
+
+ return (ret);
+}
+
+/*
* __wt_lsm_tree_worker --
* Run a schema worker operation on each level of a LSM tree.
*/
diff --git a/src/lsm/lsm_worker.c b/src/lsm/lsm_worker.c
index 2b288cad6d4..7fa9de38b5e 100644
--- a/src/lsm/lsm_worker.c
+++ b/src/lsm/lsm_worker.c
@@ -465,7 +465,11 @@ __lsm_bloom_create(
WT_RET(wt_session->drop(wt_session, chunk->bloom_uri, "force"));
bloom = NULL;
-
+ /*
+ * This is merge-like activity, and we don't want compacts to give up
+ * because we are creating a bunch of bloom filters before merging.
+ */
+ ++lsm_tree->merge_progressing;
WT_RET(__wt_bloom_create(session, chunk->bloom_uri,
lsm_tree->bloom_config, chunk->count,
lsm_tree->bloom_bit_count, lsm_tree->bloom_hash_count, &bloom));
diff --git a/src/schema/schema_worker.c b/src/schema/schema_worker.c
index 7c774e178d8..60143f97f77 100644
--- a/src/schema/schema_worker.c
+++ b/src/schema/schema_worker.c
@@ -52,9 +52,8 @@ __wt_schema_worker(WT_SESSION_IMPL *session,
WT_ERR(__wt_schema_worker(session, idx->source,
file_func, name_func, cfg, open_flags));
} else if (WT_PREFIX_MATCH(uri, "lsm:")) {
- if (file_func != __wt_compact)
- WT_ERR(__wt_lsm_tree_worker(session,
- uri, file_func, name_func, cfg, open_flags));
+ WT_ERR(__wt_lsm_tree_worker(session,
+ uri, file_func, name_func, cfg, open_flags));
} else if (WT_PREFIX_SKIP(tablename, "table:")) {
WT_ERR(__wt_schema_get_table(session,
tablename, strlen(tablename), 0, &table));
diff --git a/src/session/session_api.c b/src/session/session_api.c
index 133348f2a01..94fca3c731a 100644
--- a/src/session/session_api.c
+++ b/src/session/session_api.c
@@ -382,31 +382,19 @@ static int
__session_compact(WT_SESSION *wt_session, const char *uri, const char *config)
{
WT_SESSION_IMPL *session;
- WT_TXN *txn;
session = (WT_SESSION_IMPL *)wt_session;
- txn = &session->txn;
/* Disallow objects in the WiredTiger name space. */
WT_RET(__wt_schema_name_check(session, uri));
- /* Compaction makes no sense for LSM objects, ignore requests. */
- if (WT_PREFIX_MATCH(uri, "lsm:"))
- return (0);
if (!WT_PREFIX_MATCH(uri, "colgroup:") &&
!WT_PREFIX_MATCH(uri, "file:") &&
!WT_PREFIX_MATCH(uri, "index:") &&
+ !WT_PREFIX_MATCH(uri, "lsm:") &&
!WT_PREFIX_MATCH(uri, "table:"))
return (__wt_bad_object_type(session, uri));
- /*
- * Compaction requires checkpoints, which will fail in a transactional
- * context. Check now so the error message isn't confusing.
- */
- if (F_ISSET(txn, TXN_RUNNING))
- WT_RET_MSG(session, EINVAL,
- "Compaction not permitted in a transaction");
-
return (__wt_session_compact(wt_session, uri, config));
}
@@ -693,10 +681,11 @@ __session_checkpoint(WT_SESSION *wt_session, const char *config)
WT_TXN *txn;
session = (WT_SESSION_IMPL *)wt_session;
+ SESSION_API_CALL(session, checkpoint, config, cfg);
+
txn = &session->txn;
WT_STAT_FAST_CONN_INCR(session, txn_checkpoint);
- SESSION_API_CALL(session, checkpoint, config, cfg);
/*
* Checkpoints require a snapshot to write a transactionally consistent
diff --git a/src/session/session_compact.c b/src/session/session_compact.c
index 03d519004ec..76b67119a05 100644
--- a/src/session/session_compact.c
+++ b/src/session/session_compact.c
@@ -96,63 +96,157 @@
*/
/*
- * __session_compact_worker --
- * Worker function to do the actual compaction call.
+ * __wt_compact_uri_analyze --
+ * Extract information relevant to deciding what work compact needs to
+ * do from a URI that is part of a table schema.
+ * Called via the schema_worker function.
*/
-static int
-__session_compact_worker(
- WT_SESSION *wt_session, const char *uri, const char *config)
+int
+__wt_compact_uri_analyze(WT_SESSION_IMPL *session, const char *uri)
{
- WT_DECL_RET;
- WT_SESSION_IMPL *session;
+ /*
+ * Add references to schema URI objects to the list of objects to be
+ * compacted.
+ */
+ if (WT_PREFIX_MATCH(uri, "lsm:"))
+ session->compact->lsm_count++;
+ else if (WT_PREFIX_MATCH(uri, "file:"))
+ session->compact->file_count++;
- session = (WT_SESSION_IMPL *)wt_session;
- SESSION_API_CALL(session, compact, config, cfg);
+ return (0);
+}
- WT_WITH_SCHEMA_LOCK(session,
- ret = __wt_schema_worker(session, uri, __wt_compact, NULL, cfg, 0));
+/*
+ * __session_compact_check_timeout --
+ *
+ */
+static int
+__session_compact_check_timeout(
+ WT_SESSION_IMPL *session, struct timespec begin)
+{
+ struct timespec end;
-err: API_END_NOTFOUND_MAP(session, ret);
+ if (session->compact->max_time == 0)
+ return (0);
+
+ WT_RET(__wt_epoch(session, &end));
+ if (session->compact->max_time <
+ WT_TIMEDIFF(end, begin) / WT_BILLION)
+ WT_RET(ETIMEDOUT);
+ return (0);
}
/*
- * __wt_session_compact --
- * Function to alternate between checkpoints and compaction calls.
+ * __compact_file --
+ *
*/
-int
-__wt_session_compact(
- WT_SESSION *wt_session, const char *uri, const char *config)
+static int
+__compact_file(WT_SESSION_IMPL *session, const char *uri, const char *cfg[])
{
- WT_DECL_ITEM(t);
WT_DECL_RET;
- WT_SESSION_IMPL *session;
+ WT_DECL_ITEM(t);
+ WT_SESSION *wt_session;
+ WT_TXN *txn;
int i;
+ struct timespec start_time;
- session = (WT_SESSION_IMPL *)wt_session;
+ wt_session = (WT_SESSION *)session;
+ txn = &session->txn;
+
+ /*
+ * File compaction requires checkpoints, which will fail in a
+ * transactional context. Check now so the error message isn't
+ * confusing.
+ */
+ if (session->compact->file_count != 0 && F_ISSET(txn, TXN_RUNNING))
+ WT_ERR_MSG(session, EINVAL,
+ " File compaction not permitted in a transaction");
/*
* Force the checkpoint: we don't want to skip it because the work we
* need to have done is done in the underlying block manager.
*/
- WT_RET(__wt_scr_alloc(session, 128, &t));
+ WT_ERR(__wt_scr_alloc(session, 128, &t));
WT_ERR(__wt_buf_fmt(session, t, "target=(\"%s\"),force=1", uri));
+ WT_ERR(__wt_epoch(session, &start_time));
+
/*
* We compact 10% of the file on each pass, try 10 times (which is
- * probably overkill), and quit if we make no progress.
+ * probably overkill), and quit if we make no progress. Check for a
+ * timeout between each operation, to be as responsive to the user
+ * as is practical.
*/
for (i = 0; i < 10; ++i) {
WT_ERR(wt_session->checkpoint(wt_session, t->data));
+ WT_ERR(__session_compact_check_timeout(session, start_time));
session->compaction = 0;
- WT_ERR(__session_compact_worker(wt_session, uri, config));
+ WT_WITH_SCHEMA_LOCK(session,
+ ret = __wt_schema_worker(
+ session, uri, __wt_compact, NULL, cfg, 0));
+ WT_ERR(ret);
if (!session->compaction)
break;
+ WT_ERR(__session_compact_check_timeout(session, start_time));
WT_ERR(wt_session->checkpoint(wt_session, t->data));
+ WT_ERR(__session_compact_check_timeout(session, start_time));
WT_ERR(wt_session->checkpoint(wt_session, t->data));
+ WT_ERR(__session_compact_check_timeout(session, start_time));
}
err: __wt_scr_free(&t);
return (ret);
}
+
+/*
+ * __wt_session_compact --
+ * Function to alternate between checkpoints and compaction calls.
+ */
+int
+__wt_session_compact(
+ WT_SESSION *wt_session, const char *uri, const char *config)
+{
+ WT_COMPACT compact;
+ WT_CONFIG_ITEM cval;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+
+ session = (WT_SESSION_IMPL *)wt_session;
+
+ SESSION_API_CALL(session, compact, config, cfg);
+
+ /* Setup the structure in the session handle */
+ memset(&compact, 0, sizeof(WT_COMPACT));
+ session->compact = &compact;
+
+ /*
+ * Find what types of data sources are being compacted.
+ */
+ WT_ERR(__wt_schema_worker(
+ session, uri, NULL, __wt_compact_uri_analyze, cfg, 0));
+
+ /* We are done if there aren't any files we can compact. */
+ if (session->compact->lsm_count == 0 &&
+ session->compact->file_count == 0)
+ goto err;
+
+ WT_ERR(__wt_config_gets(session, cfg, "timeout", &cval));
+ session->compact->max_time = (uint64_t)cval.val;
+
+ /*
+ * TODO: We can't hold the schema lock here - LSM acquires the
+ * schema lock when completing merges. We probably do want to stop
+ * "external" schema changes while we are compacting though.
+ */
+ if (session->compact->lsm_count != 0)
+ WT_ERR(__wt_schema_worker(
+ session, uri, NULL, __wt_lsm_compact, cfg, 0));
+ if (session->compact->file_count != 0)
+ WT_ERR(__compact_file(session, uri, cfg));
+
+err: session->compact = NULL;
+ API_END_NOTFOUND_MAP(session, ret);
+ return (ret);
+}