From 24202a6321035b2714e74a1065e4a01e41f142a0 Mon Sep 17 00:00:00 2001 From: Michael Cahill Date: Fri, 5 Oct 2012 17:48:50 +1000 Subject: Add a connection-wide flag to disable LSM merges. --- dist/api_data.py | 6 +++++- src/block/block_ckpt.c | 2 +- src/config/config_def.c | 5 +++-- src/conn/conn_api.c | 7 +++++-- src/include/api.h | 3 ++- src/include/lsm.h | 23 ++++++++++++++++++++--- src/include/wiredtiger.in | 2 ++ src/lsm/lsm_tree.c | 11 ++++++----- src/lsm/lsm_worker.c | 27 ++++++++++----------------- 9 files changed, 54 insertions(+), 32 deletions(-) diff --git a/dist/api_data.py b/dist/api_data.py index 84d75031386..691eafc96a4 100644 --- a/dist/api_data.py +++ b/dist/api_data.py @@ -425,6 +425,9 @@ methods = { Config('logging', 'false', r''' enable logging''', type='boolean'), + Config('lsm_merge', 'true', r''' + merge LSM chunks where possible''', + type='boolean'), Config('multiprocess', 'false', r''' permit sharing between processes (will automatically start an RPC server for primary processes and use RPC for secondary @@ -477,7 +480,8 @@ flags = { # Structure flag declarations ################################################### 'conn' : [ - 'CONN_NOSYNC', + 'CONN_LSM_MERGE', + 'CONN_SYNC', 'CONN_TRANSACTIONAL', 'SERVER_RUN' ], diff --git a/src/block/block_ckpt.c b/src/block/block_ckpt.c index b496a7def81..69d5501e44d 100644 --- a/src/block/block_ckpt.c +++ b/src/block/block_ckpt.c @@ -219,7 +219,7 @@ __wt_block_checkpoint(WT_SESSION_IMPL *session, * lazy checkpoints, but we don't support them yet). Regardless, we're * not holding any locks, other writers can proceed while we wait. */ - if (!F_ISSET(S2C(session), WT_CONN_NOSYNC)) + if (F_ISSET(S2C(session), WT_CONN_SYNC)) WT_RET(__wt_fsync(session, block->fh)); return (0); diff --git a/src/config/config_def.c b/src/config/config_def.c index 0a976dfecff..9f69b8f87c7 100644 --- a/src/config/config_def.c +++ b/src/config/config_def.c @@ -387,8 +387,8 @@ const char * __wt_confdfl_wiredtiger_open = "buffer_alignment=-1,cache_size=100MB,create=0,direct_io=," "error_prefix=,eviction_target=80,eviction_trigger=95,extensions=," - "hazard_max=1000,logging=0,multiprocess=0,session_max=50,sync=," - "transactional=,use_environment_priv=0,verbose="; + "hazard_max=1000,logging=0,lsm_merge=,multiprocess=0,session_max=50," + "sync=,transactional=,use_environment_priv=0,verbose="; WT_CONFIG_CHECK __wt_confchk_wiredtiger_open[] = { @@ -402,6 +402,7 @@ __wt_confchk_wiredtiger_open[] = { { "extensions", "list", NULL }, { "hazard_max", "int", "min=15" }, { "logging", "boolean", NULL }, + { "lsm_merge", "boolean", NULL }, { "multiprocess", "boolean", NULL }, { "session_max", "int", "min=1" }, { "sync", "boolean", NULL }, diff --git a/src/conn/conn_api.c b/src/conn/conn_api.c index 1ce5b2d5ca5..5ee5f8cb93d 100644 --- a/src/conn/conn_api.c +++ b/src/conn/conn_api.c @@ -848,9 +848,12 @@ wiredtiger_open(const char *home, WT_EVENT_HANDLER *event_handler, conn->hazard_max = (uint32_t)cval.val; WT_ERR(__wt_config_gets(session, cfg, "session_max", &cval)); conn->session_size = (uint32_t)cval.val + WT_NUM_INTERNAL_SESSIONS; + WT_ERR(__wt_config_gets(session, cfg, "lsm_merge", &cval)); + if (cval.val) + F_SET(conn, WT_CONN_LSM_MERGE); WT_ERR(__wt_config_gets(session, cfg, "sync", &cval)); - if (!cval.val) - F_SET(conn, WT_CONN_NOSYNC); + if (cval.val) + F_SET(conn, WT_CONN_SYNC); WT_ERR(__wt_config_gets(session, cfg, "transactional", &cval)); if (cval.val) F_SET(conn, WT_CONN_TRANSACTIONAL); diff --git a/src/include/api.h b/src/include/api.h index ebc658fa327..4a1512c8e9b 100644 --- a/src/include/api.h +++ b/src/include/api.h @@ -386,7 +386,8 @@ extern WT_PROCESS __wt_process; * DO NOT EDIT: automatically built by dist/api_flags.py. * API flags section: BEGIN */ -#define WT_CONN_NOSYNC 0x00000004 +#define WT_CONN_LSM_MERGE 0x00000008 +#define WT_CONN_SYNC 0x00000004 #define WT_CONN_TRANSACTIONAL 0x00000002 #define WT_DIRECTIO_DATA 0x00000002 #define WT_DIRECTIO_LOG 0x00000001 diff --git a/src/include/lsm.h b/src/include/lsm.h index 31514744701..0dc7759fde7 100644 --- a/src/include/lsm.h +++ b/src/include/lsm.h @@ -5,6 +5,10 @@ * See the file LICENSE for redistribution information. */ +/* + * WT_CURSOR_LSM -- + * An LSM cursor. + */ struct __wt_cursor_lsm { WT_CURSOR iface; @@ -28,6 +32,10 @@ struct __wt_cursor_lsm { uint32_t flags; }; +/* + * WT_LSM_CHUNK -- + * A single chunk (file) in an LSM tree. + */ struct __wt_lsm_chunk { const char *uri; /* Data source for this chunk. */ const char *bloom_uri; /* URI of Bloom filter, if any. */ @@ -38,6 +46,10 @@ struct __wt_lsm_chunk { uint32_t flags; }; +/* + * WT_LSM_TREE -- + * An LSM tree. + */ struct __wt_lsm_tree { const char *name, *config, *filename; const char *key_format, *value_format, *file_config; @@ -78,17 +90,22 @@ struct __wt_lsm_tree { uint32_t flags; }; +/* + * WT_LSM_DATA_SOURCE -- + * Implementation of the WT_DATA_SOURCE interface for LSM. + */ struct __wt_lsm_data_source { WT_DATA_SOURCE iface; WT_RWLOCK *rwlock; }; +/* + * WT_LSM_WORKER_COOKIE -- + * State for an LSM worker thread. + */ struct __wt_lsm_worker_cookie { WT_LSM_CHUNK **chunk_array; size_t chunk_alloc; int nchunks; -#define WT_LSM_WORKER_MERGE 0x01 -#define WT_LSM_WORKER_CHECKPOINT 0x02 - uint32_t flags; }; diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in index 9c66c9c5460..a15e7a5001d 100644 --- a/src/include/wiredtiger.in +++ b/src/include/wiredtiger.in @@ -1152,6 +1152,8 @@ struct __wt_connection { * @config{hazard_max, maximum number of simultaneous hazard references per * session handle.,an integer greater than or equal to 15; default \c 1000.} * @config{logging, enable logging.,a boolean flag; default \c false.} + * @config{lsm_merge, merge LSM chunks where possible.,a boolean flag; default + * \c true.} * @config{multiprocess, permit sharing between processes (will automatically * start an RPC server for primary processes and use RPC for secondary * processes). Not yet supported in WiredTiger.,a boolean flag; default diff --git a/src/lsm/lsm_tree.c b/src/lsm/lsm_tree.c index a49505cfbeb..5b0c82563d7 100644 --- a/src/lsm/lsm_tree.c +++ b/src/lsm/lsm_tree.c @@ -62,7 +62,8 @@ __lsm_tree_close(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) if (F_ISSET(lsm_tree, WT_LSM_TREE_WORKING)) { F_CLR(lsm_tree, WT_LSM_TREE_WORKING); - WT_TRET(__wt_thread_join(lsm_tree->worker_tid)); + if (F_ISSET(S2C(session), WT_CONN_LSM_MERGE)) + WT_TRET(__wt_thread_join(lsm_tree->worker_tid)); WT_TRET(__wt_thread_join(lsm_tree->ckpt_tid)); } @@ -160,8 +161,7 @@ __wt_lsm_tree_create_chunk( WT_RET(__wt_scr_alloc(session, 0, &buf)); WT_ERR(__wt_lsm_tree_chunk_name(session, lsm_tree, i, buf)); - WT_ERR(__wt_schema_create(session, - buf->data, lsm_tree->file_config)); + WT_ERR(__wt_schema_create(session, buf->data, lsm_tree->file_config)); *urip = __wt_buf_steal(session, buf, NULL); err: __wt_scr_free(&buf); @@ -190,8 +190,9 @@ __lsm_tree_start_worker(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) F_SET(lsm_tree, WT_LSM_TREE_WORKING); /* The new thread will rely on the WORKING value being visible. */ WT_FULL_BARRIER(); - WT_RET(__wt_thread_create( - &lsm_tree->worker_tid, __wt_lsm_worker, lsm_tree)); + if (F_ISSET(S2C(session), WT_CONN_LSM_MERGE)) + WT_RET(__wt_thread_create( + &lsm_tree->worker_tid, __wt_lsm_worker, lsm_tree)); WT_RET(__wt_thread_create( &lsm_tree->ckpt_tid, __wt_lsm_checkpoint_worker, lsm_tree)); diff --git a/src/lsm/lsm_worker.c b/src/lsm/lsm_worker.c index 5f65aa79a86..545b941d7ce 100644 --- a/src/lsm/lsm_worker.c +++ b/src/lsm/lsm_worker.c @@ -8,7 +8,8 @@ #include "wt_internal.h" static int __lsm_free_chunks(WT_SESSION_IMPL *, WT_LSM_TREE *); -static int __lsm_copy_chunks(WT_LSM_TREE *, WT_LSM_WORKER_COOKIE *); +static int __lsm_copy_chunks( + WT_SESSION_IMPL *, WT_LSM_TREE *, WT_LSM_WORKER_COOKIE *); /* * __wt_lsm_worker -- @@ -67,17 +68,19 @@ __wt_lsm_checkpoint_worker(void *arg) lsm_tree = arg; session = lsm_tree->ckpt_session; - memset(&cookie, 0, sizeof(cookie)); - F_SET(&cookie, WT_LSM_WORKER_CHECKPOINT); + WT_CLEAR(cookie); while (F_ISSET(lsm_tree, WT_LSM_TREE_WORKING)) { - WT_ERR(__lsm_copy_chunks(lsm_tree, &cookie)); + WT_ERR(__lsm_copy_chunks(session, lsm_tree, &cookie)); /* Write checkpoints in all completed files. */ for (i = 0, j = 0; i < cookie.nchunks; i++) { chunk = cookie.chunk_array[i]; if (F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)) continue; + /* Stop if a thread is still active in the chunk. */ + if (chunk->ncursor != 0) + break; /* * NOTE: we pass a non-NULL config, because otherwise @@ -109,20 +112,15 @@ err: __wt_free(session, cookie.chunk_array); * the contents without holding the LSM tree handle lock long term. */ static int -__lsm_copy_chunks(WT_LSM_TREE *lsm_tree, WT_LSM_WORKER_COOKIE *cookie) +__lsm_copy_chunks(WT_SESSION_IMPL *session, + WT_LSM_TREE *lsm_tree, WT_LSM_WORKER_COOKIE *cookie) { WT_DECL_RET; - WT_SESSION_IMPL *session; int nchunks; /* Always return zero chunks on error. */ cookie->nchunks = 0; - if (F_ISSET(cookie, WT_LSM_WORKER_CHECKPOINT)) - session = lsm_tree->ckpt_session; - else - session = lsm_tree->worker_session; - __wt_spin_lock(session, &lsm_tree->lock); if (!F_ISSET(lsm_tree, WT_LSM_TREE_WORKING)) { __wt_spin_unlock(session, &lsm_tree->lock); @@ -135,12 +133,7 @@ __lsm_copy_chunks(WT_LSM_TREE *lsm_tree, WT_LSM_WORKER_COOKIE *cookie) * to merge operations. */ nchunks = lsm_tree->nchunks - 1; - /* Checkpoint doesn't care if there are active cursors, merge does. */ - if (F_ISSET(cookie, WT_LSM_WORKER_MERGE)) { - for (; nchunks > 0 && lsm_tree->chunk[nchunks - 1]->ncursor > 0; - --nchunks) - ; - } + /* * If the tree array of active chunks is larger than our current buffer, * increase the size of our current buffer to match. -- cgit v1.2.1