diff options
author | Michael Cahill <michael.cahill@wiredtiger.com> | 2012-11-01 16:25:51 +1100 |
---|---|---|
committer | Michael Cahill <michael.cahill@wiredtiger.com> | 2012-11-01 16:25:51 +1100 |
commit | a31f708a1c2604df704b65f05859c18d6ceb092b (patch) | |
tree | 0e2c6a70802e8721d108d233ff3b58764ff1bd62 | |
parent | e1189da47952a0efa148c0254dcfda4cdd4478c0 (diff) | |
download | mongo-a31f708a1c2604df704b65f05859c18d6ceb092b.tar.gz |
Support multiple LSM merge threads with the "lsm_merge_threads" config key. Use IDs rather than array index to mark the start chunk in a merge, in case we race with another thread.
-rw-r--r-- | dist/api_data.py | 3 | ||||
-rw-r--r-- | dist/s_string.ok | 1 | ||||
-rw-r--r-- | src/config/config_def.c | 11 | ||||
-rw-r--r-- | src/include/extern.h | 13 | ||||
-rw-r--r-- | src/include/lsm.h | 56 | ||||
-rw-r--r-- | src/include/wiredtiger.in | 2 | ||||
-rw-r--r-- | src/include/wt_internal.h | 2 | ||||
-rw-r--r-- | src/lsm/lsm_cursor.c | 32 | ||||
-rw-r--r-- | src/lsm/lsm_merge.c | 83 | ||||
-rw-r--r-- | src/lsm/lsm_meta.c | 64 | ||||
-rw-r--r-- | src/lsm/lsm_tree.c | 126 | ||||
-rw-r--r-- | src/lsm/lsm_worker.c | 39 |
12 files changed, 280 insertions, 152 deletions
diff --git a/dist/api_data.py b/dist/api_data.py index 7a7506d1e25..33c0a02a66b 100644 --- a/dist/api_data.py +++ b/dist/api_data.py @@ -113,6 +113,9 @@ lsm_config = [ Config('lsm_merge_max', '15', r''' the maximum number of chunks to include in a merge operation''', min='2', max='100'), + Config('lsm_merge_threads', '1', r''' + the number of thread to perform merge operations''', + min='1', max='10'), # !!! max must match WT_LSM_MAX_WORKERS ] # Per-file configuration diff --git a/dist/s_string.ok b/dist/s_string.ok index eb9ddc3d77f..3cc9659a210 100644 --- a/dist/s_string.ok +++ b/dist/s_string.ok @@ -9,6 +9,7 @@ ADDR AJZ API APIs +ARGS Alakuijala Alloc Athlon diff --git a/src/config/config_def.c b/src/config/config_def.c index 50f3156b29e..6280b21c3ad 100644 --- a/src/config/config_def.c +++ b/src/config/config_def.c @@ -115,8 +115,9 @@ __wt_confdfl_file_meta = "internal_page_max=2KB,key_format=u,key_gap=10,leaf_item_max=0," "leaf_page_max=1MB,lsm_bloom=,lsm_bloom_bit_count=8,lsm_bloom_config=" ",lsm_bloom_hash_count=4,lsm_bloom_newest=0,lsm_bloom_oldest=0," - "lsm_chunk_size=2MB,lsm_merge_max=15,prefix_compression=,split_pct=75" - ",value_format=u,version=(major=0,minor=0)"; + "lsm_chunk_size=2MB,lsm_merge_max=15,lsm_merge_threads=1," + "prefix_compression=,split_pct=75,value_format=u,version=(major=0," + "minor=0)"; WT_CONFIG_CHECK __wt_confchk_file_meta[] = { @@ -146,6 +147,7 @@ __wt_confchk_file_meta[] = { { "lsm_bloom_oldest", "boolean", NULL }, { "lsm_chunk_size", "int", "min=512K,max=500MB" }, { "lsm_merge_max", "int", "min=2,max=100" }, + { "lsm_merge_threads", "int", "min=1,max=10" }, { "prefix_compression", "boolean", NULL }, { "split_pct", "int", "min=25,max=100" }, { "value_format", "format", NULL }, @@ -232,8 +234,8 @@ __wt_confdfl_session_create = "key_format=u,key_gap=10,leaf_item_max=0,leaf_page_max=1MB,lsm_bloom=" ",lsm_bloom_bit_count=8,lsm_bloom_config=,lsm_bloom_hash_count=4," "lsm_bloom_newest=0,lsm_bloom_oldest=0,lsm_chunk_size=2MB," - "lsm_merge_max=15,prefix_compression=,source=,split_pct=75,type=file," - "value_format=u,value_format=u"; + "lsm_merge_max=15,lsm_merge_threads=1,prefix_compression=,source=," + "split_pct=75,type=file,value_format=u,value_format=u"; WT_CONFIG_CHECK __wt_confchk_session_create[] = { @@ -266,6 +268,7 @@ __wt_confchk_session_create[] = { { "lsm_bloom_oldest", "boolean", NULL }, { "lsm_chunk_size", "int", "min=512K,max=500MB" }, { "lsm_merge_max", "int", "min=2,max=100" }, + { "lsm_merge_threads", "int", "min=1,max=10" }, { "prefix_compression", "boolean", NULL }, { "source", "string", NULL }, { "split_pct", "int", "min=25,max=100" }, diff --git a/src/include/extern.h b/src/include/extern.h index 3e03b03c47f..4b6c1065f18 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -704,8 +704,9 @@ extern int __wt_log_printf(WT_SESSION_IMPL *session, 2, 3))); extern WT_LOGREC_DESC __wt_logdesc_debug; -extern int __wt_clsm_init_merge(WT_CURSOR *cursor, +extern int __wt_clsm_init_merge( WT_CURSOR *cursor, int start_chunk, + uint32_t start_id, int nchunks); extern int __wt_clsm_open(WT_SESSION_IMPL *session, const char *uri, @@ -719,8 +720,9 @@ extern int __wt_lsm_merge_update_tree(WT_SESSION_IMPL *session, int start_chunk, int nchunks, WT_LSM_CHUNK *chunk); -extern int __wt_lsm_merge(WT_SESSION_IMPL *session, +extern int __wt_lsm_merge( WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, + uint32_t id, int stalls); extern int __wt_lsm_meta_read(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree); extern int __wt_lsm_meta_write(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree); @@ -730,15 +732,14 @@ extern int __wt_lsm_stat_init( WT_SESSION_IMPL *session, extern int __wt_lsm_tree_close_all(WT_SESSION_IMPL *session); extern int __wt_lsm_tree_bloom_name( WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, - int i, + uint32_t id, WT_ITEM *buf); extern int __wt_lsm_tree_chunk_name( WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, - int i, + uint32_t id, WT_ITEM *buf); extern int __wt_lsm_tree_setup_chunk(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, - int i, WT_LSM_CHUNK *chunk, int create_bloom); extern int __wt_lsm_tree_create(WT_SESSION_IMPL *session, @@ -769,7 +770,7 @@ extern int __wt_lsm_tree_worker(WT_SESSION_IMPL *session, const char *[]), const char *cfg[], uint32_t open_flags); -extern void *__wt_lsm_worker(void *arg); +extern void *__wt_lsm_worker(void *vargs); extern void *__wt_lsm_checkpoint_worker(void *arg); extern int __wt_lsm_copy_chunks(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, diff --git a/src/include/lsm.h b/src/include/lsm.h index 0d64c275ba1..e33fe6da3c6 100644 --- a/src/include/lsm.h +++ b/src/include/lsm.h @@ -20,12 +20,12 @@ struct __wt_cursor_lsm { WT_CURSOR **cursors; WT_CURSOR *current; /* The current cursor for iteration */ - WT_LSM_CHUNK *primary_chunk; /* The current primary chunk. */ + WT_LSM_CHUNK *primary_chunk; /* The current primary chunk */ #define WT_CLSM_ITERATE_NEXT 0x01 /* Forward iteration */ #define WT_CLSM_ITERATE_PREV 0x02 /* Backward iteration */ -#define WT_CLSM_MERGE 0x04 /* Merge cursor, don't update. */ -#define WT_CLSM_MINOR_MERGE 0x08 /* Minor merge, include tombstones. */ +#define WT_CLSM_MERGE 0x04 /* Merge cursor, don't update */ +#define WT_CLSM_MINOR_MERGE 0x08 /* Minor merge, include tombstones */ #define WT_CLSM_MULTIPLE 0x10 /* Multiple cursors have values for the current key */ #define WT_CLSM_UPDATED 0x20 /* Cursor has done updates */ @@ -37,14 +37,16 @@ struct __wt_cursor_lsm { * A single chunk (file) in an LSM tree. */ struct __wt_lsm_chunk { - const char *uri; /* Data source for this chunk. */ - const char *bloom_uri; /* URI of Bloom filter, if any. */ - uint64_t count; /* Approximate count of records. */ - uint32_t generation; /* Merge generation. */ - - uint32_t ncursor; /* Cursors with the chunk as primary. */ -#define WT_LSM_CHUNK_ONDISK 0x01 -#define WT_LSM_CHUNK_BLOOM 0x02 + uint32_t id; /* ID used to generate URIs */ + uint32_t generation; /* Merge generation */ + const char *uri; /* Data source for this chunk */ + const char *bloom_uri; /* URI of Bloom filter, if any */ + uint64_t count; /* Approximate count of records */ + + uint32_t ncursor; /* Cursors with the chunk as primary */ +#define WT_LSM_CHUNK_BLOOM 0x01 +#define WT_LSM_CHUNK_MERGING 0x02 +#define WT_LSM_CHUNK_ONDISK 0x04 uint32_t flags; }; @@ -59,7 +61,7 @@ struct __wt_lsm_tree { WT_COLLATOR *collator; - int refcnt; /* Number of users of the tree. */ + int refcnt; /* Number of users of the tree */ WT_RWLOCK *rwlock; TAILQ_ENTRY(__wt_lsm_tree) q; @@ -74,21 +76,26 @@ struct __wt_lsm_tree { uint32_t bloom_hash_count; uint32_t chunk_size; uint32_t merge_max; + uint32_t merge_threads; + #define WT_LSM_BLOOM_MERGED 0x00000001 #define WT_LSM_BLOOM_NEWEST 0x00000002 #define WT_LSM_BLOOM_OFF 0x00000004 #define WT_LSM_BLOOM_OLDEST 0x00000008 - uint32_t bloom; /* Bloom creation policy. */ + uint32_t bloom; /* Bloom creation policy */ - WT_SESSION_IMPL *worker_session;/* Passed to thread_create */ - pthread_t worker_tid; /* LSM worker thread */ +#define WT_LSM_MAX_WORKERS 10 + /* Passed to thread_create */ + WT_SESSION_IMPL *worker_sessions[WT_LSM_MAX_WORKERS]; + /* LSM worker thread(s) */ + pthread_t worker_tids[WT_LSM_MAX_WORKERS]; WT_SESSION_IMPL *ckpt_session; /* For checkpoint worker */ pthread_t ckpt_tid; /* LSM checkpoint worker thread */ - int nchunks; /* Number of active chunks */ - int last; /* Last allocated ID. */ WT_LSM_CHUNK **chunk; /* Array of active LSM chunks */ size_t chunk_alloc; /* Space allocated for chunks */ + int nchunks; /* Number of active chunks */ + uint32_t last; /* Last allocated ID */ WT_LSM_CHUNK **old_chunks; /* Array of old LSM chunks */ size_t old_alloc; /* Space allocated for old chunks */ @@ -115,7 +122,16 @@ struct __wt_lsm_data_source { * State for an LSM worker thread. */ struct __wt_lsm_worker_cookie { - WT_LSM_CHUNK **chunk_array; - size_t chunk_alloc; - int nchunks; + WT_LSM_CHUNK **chunk_array; + size_t chunk_alloc; + int nchunks; +}; + +/* + * WT_LSM_WORKER_ARGS -- + * State for an LSM worker thread. + */ +struct __wt_lsm_worker_args { + WT_LSM_TREE *lsm_tree; + int id; }; diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in index dbeb6cdf0b1..b9691676801 100644 --- a/src/include/wiredtiger.in +++ b/src/include/wiredtiger.in @@ -685,6 +685,8 @@ struct __wt_session { * LSM tree.,an integer between 512K and 500MB; default \c 2MB.} * @config{lsm_merge_max, the maximum number of chunks to include in a * merge operation.,an integer between 2 and 100; default \c 15.} + * @config{lsm_merge_threads, the number of thread to perform merge + * operations.,an integer between 1 and 10; default \c 1.} * @config{prefix_compression, configure row-store format key prefix * compression.,a boolean flag; default \c true.} * @config{source, override the default data source URI derived from the diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h index bcb5754bcbc..90a7d282291 100644 --- a/src/include/wt_internal.h +++ b/src/include/wt_internal.h @@ -137,6 +137,8 @@ struct __wt_lsm_stats; typedef struct __wt_lsm_stats WT_LSM_STATS; struct __wt_lsm_tree; typedef struct __wt_lsm_tree WT_LSM_TREE; +struct __wt_lsm_worker_args; + typedef struct __wt_lsm_worker_args WT_LSM_WORKER_ARGS; struct __wt_lsm_worker_cookie; typedef struct __wt_lsm_worker_cookie WT_LSM_WORKER_COOKIE; struct __wt_named_collator; diff --git a/src/lsm/lsm_cursor.c b/src/lsm/lsm_cursor.c index 480c095244c..7cb4191bae7 100644 --- a/src/lsm/lsm_cursor.c +++ b/src/lsm/lsm_cursor.c @@ -33,7 +33,7 @@ CURSOR_UPDATE_API_CALL(cursor, session, n, NULL); \ WT_ERR(__clsm_enter(clsm)) -static int __clsm_open_cursors(WT_CURSOR_LSM *, int); +static int __clsm_open_cursors(WT_CURSOR_LSM *, int, uint32_t); static int __clsm_search(WT_CURSOR *); static inline int @@ -41,7 +41,7 @@ __clsm_enter(WT_CURSOR_LSM *clsm) { if (!F_ISSET(clsm, WT_CLSM_MERGE) && clsm->dsk_gen != clsm->lsm_tree->dsk_gen) - WT_RET(__clsm_open_cursors(clsm, 0)); + WT_RET(__clsm_open_cursors(clsm, 0, 0)); return (0); } @@ -106,7 +106,7 @@ __clsm_close_cursors(WT_CURSOR_LSM *clsm) * Open cursors for the current set of files. */ static int -__clsm_open_cursors(WT_CURSOR_LSM *clsm, int start_chunk) +__clsm_open_cursors(WT_CURSOR_LSM *clsm, int start_chunk, uint32_t start_id) { WT_CURSOR *c, **cp; WT_DECL_RET; @@ -135,10 +135,27 @@ __clsm_open_cursors(WT_CURSOR_LSM *clsm, int start_chunk) WT_RET(__clsm_close_cursors(clsm)); __wt_spin_lock(session, &lsm_tree->lock); + /* Merge cursors have already figured out how many chunks they need. */ - if (F_ISSET(clsm, WT_CLSM_MERGE)) + if (F_ISSET(clsm, WT_CLSM_MERGE)) { nchunks = clsm->nchunks; - else + + /* + * We may have raced with another merge completing. Check that + * we're starting at the right offset in the chunk array. + */ + if (start_chunk >= lsm_tree->nchunks || + lsm_tree->chunk[start_chunk]->id != start_id) + for (start_chunk = 0; + start_chunk < lsm_tree->nchunks; + start_chunk++) { + chunk = lsm_tree->chunk[start_chunk]; + if (chunk->id == start_id) + break; + } + + WT_ASSERT(session, start_chunk + nchunks <= lsm_tree->nchunks); + } else nchunks = lsm_tree->nchunks; if (clsm->cursors == NULL || nchunks > clsm->nchunks) { @@ -201,7 +218,8 @@ err: __wt_spin_unlock(session, &lsm_tree->lock); * Initialize an LSM cursor for a merge. */ int -__wt_clsm_init_merge(WT_CURSOR *cursor, int start_chunk, int nchunks) +__wt_clsm_init_merge( + WT_CURSOR *cursor, int start_chunk, uint32_t start_id, int nchunks) { WT_CURSOR_LSM *clsm; @@ -211,7 +229,7 @@ __wt_clsm_init_merge(WT_CURSOR *cursor, int start_chunk, int nchunks) F_SET(clsm, WT_CLSM_MINOR_MERGE); clsm->nchunks = nchunks; - return (__clsm_open_cursors(clsm, start_chunk)); + return (__clsm_open_cursors(clsm, start_chunk, start_id)); } /* diff --git a/src/lsm/lsm_merge.c b/src/lsm/lsm_merge.c index a54e798c8b2..320cf07bfd2 100644 --- a/src/lsm/lsm_merge.c +++ b/src/lsm/lsm_merge.c @@ -19,6 +19,8 @@ __wt_lsm_merge_update_tree(WT_SESSION_IMPL *session, size_t chunk_sz, chunks_after_merge; int i, j; + WT_ASSERT(session, start_chunk + nchunks <= lsm_tree->nchunks); + /* Setup the array of obsolete chunks. */ if (nchunks > lsm_tree->old_avail) { chunk_sz = sizeof(*lsm_tree->old_chunks); @@ -61,7 +63,8 @@ __wt_lsm_merge_update_tree(WT_SESSION_IMPL *session, * Merge a set of chunks of an LSM tree. */ int -__wt_lsm_merge(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int stalls) +__wt_lsm_merge( + WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, uint32_t id, int stalls) { WT_BLOOM *bloom; WT_CURSOR *src, *dest; @@ -71,7 +74,7 @@ __wt_lsm_merge(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int stalls) WT_LSM_CHUNK *chunk; const char *cur_cfg[] = API_CONF_DEFAULTS(session, open_cursor, "bulk,raw"); - uint32_t generation; + uint32_t generation, start_id; uint64_t insert_count, record_count; int create_bloom, dest_id, end_chunk, i; int max_chunks, nchunks, start_chunk; @@ -82,19 +85,12 @@ __wt_lsm_merge(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int stalls) create_bloom = 0; /* - * Take a copy of the latest chunk id. This value needs to be atomically - * read. We need a copy, since other threads may alter the chunk count - * while we are doing a merge. - */ - nchunks = lsm_tree->nchunks; - - /* * If there aren't any chunks to merge, or some of the chunks aren't * yet written, we're done. A non-zero error indicates that the worker * should assume there is no work to do: if there are unwritten chunks, * the worker should write them immediately. */ - if (nchunks <= 1) + if (lsm_tree->nchunks <= 1) return (WT_NOTFOUND); /* @@ -104,10 +100,14 @@ __wt_lsm_merge(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int stalls) */ __wt_spin_lock(session, &lsm_tree->lock); - /* Only include chunks that are stable on disk. */ - end_chunk = nchunks - 1; + /* + * Only include chunks that are stable on disk and not involved in a + * merge. + */ + end_chunk = lsm_tree->nchunks - 1; while (end_chunk > 0 && - !F_ISSET(lsm_tree->chunk[end_chunk], WT_LSM_CHUNK_ONDISK)) + (!F_ISSET(lsm_tree->chunk[end_chunk], WT_LSM_CHUNK_ONDISK) || + F_ISSET(lsm_tree->chunk[end_chunk], WT_LSM_CHUNK_MERGING))) --end_chunk; /* @@ -129,6 +129,10 @@ __wt_lsm_merge(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int stalls) chunk = lsm_tree->chunk[start_chunk - 1]; nchunks = end_chunk - start_chunk + 1; + /* If the chunk is already involved in a merge, stop. */ + if (F_ISSET(chunk, WT_LSM_CHUNK_MERGING)) + break; + /* * If the next chunk is more than double the average size of * the chunks we have so far, stop. @@ -136,24 +140,44 @@ __wt_lsm_merge(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int stalls) if (nchunks > 2 && chunk->count > 2 * record_count / nchunks) break; - /* Don't do any big merges until we have waited for 10s. */ - if (nchunks > 0 && stalls < 10 && - chunk->count > lsm_tree->chunk[end_chunk]->count * 2) + /* + * Never do big merges in the first thread if there are + * multiple threads. If there is a single thread, wait for 10 + * seconds looking for small merges before trying a big one. + */ + if (id == 0 && nchunks > 0 && + chunk->count > lsm_tree->chunk[end_chunk]->count * 2 && + (lsm_tree->merge_threads > 1 || stalls < 10)) break; + F_SET(chunk, WT_LSM_CHUNK_MERGING); record_count += chunk->count; --start_chunk; if (nchunks == max_chunks) record_count -= lsm_tree->chunk[end_chunk--]->count; } - __wt_spin_unlock(session, &lsm_tree->lock); nchunks = end_chunk - start_chunk + 1; WT_ASSERT(session, nchunks <= max_chunks); /* Don't do small merges unless we have waited for 2s. */ - if (nchunks <= 1 || (stalls < 2 && nchunks < max_chunks / 2)) + if (nchunks <= 1 || + (id == 0 && stalls < 2 && nchunks < max_chunks / 2)) { + for (i = start_chunk; i <= end_chunk; i++) + F_CLR(lsm_tree->chunk[i], WT_LSM_CHUNK_MERGING); + nchunks = 0; + } + + /* Find the merge generation. */ + for (generation = 0, i = 0; i < nchunks; i++) + if (lsm_tree->chunk[start_chunk + i]->generation > generation) + generation = lsm_tree->chunk[i]->generation; + + start_id = lsm_tree->chunk[start_chunk]->id; + __wt_spin_unlock(session, &lsm_tree->lock); + + if (nchunks == 0) return (WT_NOTFOUND); /* Allocate an ID for the merge. */ @@ -164,17 +188,13 @@ __wt_lsm_merge(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int stalls) start_chunk, end_chunk, dest_id, record_count); WT_RET(__wt_calloc_def(session, 1, &chunk)); + chunk->id = dest_id; if (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_MERGED) && (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_OLDEST) || start_chunk > 0) && record_count > 0) create_bloom = 1; - /* Find the merge generation. */ - for (generation = 0, i = start_chunk; i <= end_chunk; i++) - if (lsm_tree->chunk[i]->generation > generation) - generation = lsm_tree->chunk[i]->generation; - /* * Special setup for the merge cursor: * first, reset to open the dependent cursors; @@ -183,10 +203,10 @@ __wt_lsm_merge(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int stalls) */ WT_ERR(__wt_open_cursor(session, lsm_tree->name, NULL, NULL, &src)); F_SET(src, WT_CURSTD_RAW); - WT_ERR(__wt_clsm_init_merge(src, start_chunk, nchunks)); + WT_ERR(__wt_clsm_init_merge(src, start_chunk, start_id, nchunks)); WT_WITH_SCHEMA_LOCK(session, ret = __wt_lsm_tree_setup_chunk( - session, lsm_tree, dest_id, chunk, create_bloom)); + session, lsm_tree, chunk, create_bloom)); WT_ERR(ret); if (create_bloom) WT_ERR(__wt_bloom_create(session, chunk->bloom_uri, @@ -227,6 +247,19 @@ __wt_lsm_merge(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int stalls) WT_ERR(ret); __wt_spin_lock(session, &lsm_tree->lock); + + /* + * Check whether we raced with another merge, and adjust the chunk + * array offset as necessary. + */ + if (start_chunk >= lsm_tree->nchunks || + lsm_tree->chunk[start_chunk]->id != start_id) + for (start_chunk = 0; + start_chunk < lsm_tree->nchunks; + start_chunk++) + if (lsm_tree->chunk[start_chunk]->id == start_id) + break; + ret = __wt_lsm_merge_update_tree( session, lsm_tree, start_chunk, nchunks, chunk); diff --git a/src/lsm/lsm_meta.c b/src/lsm/lsm_meta.c index 926f6bf4f5b..093e5f592e0 100644 --- a/src/lsm/lsm_meta.c +++ b/src/lsm/lsm_meta.c @@ -17,11 +17,15 @@ __wt_lsm_meta_read(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) WT_CONFIG cparser, lparser; WT_CONFIG_ITEM ck, cv, lk, lv; WT_DECL_RET; + WT_ITEM buf; WT_LSM_CHUNK *chunk; const char *config; int nchunks; size_t chunk_sz, alloc; + WT_CLEAR(buf); + chunk_sz = sizeof(WT_LSM_CHUNK); + WT_RET(__wt_metadata_read(session, lsm_tree->name, &config)); WT_ERR(__wt_config_init(session, &cparser, config)); while ((ret = __wt_config_next(&cparser, &ck, &cv)) == 0) { @@ -55,46 +59,54 @@ __wt_lsm_meta_read(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) lsm_tree->chunk_size = (uint32_t)cv.val; else if (WT_STRING_MATCH("lsm_merge_max", ck.str, ck.len)) lsm_tree->merge_max = (uint32_t)cv.val; + else if (WT_STRING_MATCH("lsm_merge_threads", ck.str, ck.len)) + lsm_tree->merge_threads = (uint32_t)cv.val; else if (WT_STRING_MATCH("last", ck.str, ck.len)) lsm_tree->last = (int)cv.val; else if (WT_STRING_MATCH("chunks", ck.str, ck.len)) { WT_ERR(__wt_config_subinit(session, &lparser, &cv)); - chunk_sz = sizeof(*lsm_tree->chunk); for (nchunks = 0; (ret = __wt_config_next(&lparser, &lk, &lv)) == 0; ) { - if (WT_STRING_MATCH("bloom", lk.str, lk.len)) { - WT_ERR(__wt_strndup(session, - lv.str, lv.len, &chunk->bloom_uri)); + if (WT_STRING_MATCH("id", lk.str, lk.len)) { + if ((nchunks + 1) * chunk_sz > + lsm_tree->chunk_alloc) + WT_ERR(__wt_realloc(session, + &lsm_tree->chunk_alloc, + WT_MAX(10 * chunk_sz, + 2 * lsm_tree->chunk_alloc), + &lsm_tree->chunk)); + WT_ERR(__wt_calloc_def( + session, 1, &chunk)); + lsm_tree->chunk[nchunks++] = chunk; + chunk->id = (uint32_t)lv.val; + WT_ERR(__wt_lsm_tree_chunk_name(session, + lsm_tree, chunk->id, &buf)); + chunk->uri = + __wt_buf_steal(session, &buf, NULL); + F_SET(chunk, WT_LSM_CHUNK_ONDISK); + + } else if (WT_STRING_MATCH( + "bloom", lk.str, lk.len)) { + WT_ERR(__wt_lsm_tree_bloom_name(session, + lsm_tree, chunk->id, &buf)); + chunk->bloom_uri = + __wt_buf_steal(session, &buf, NULL); F_SET(chunk, WT_LSM_CHUNK_BLOOM); continue; - } - if (WT_STRING_MATCH("count", lk.str, lk.len)) { + } else if (WT_STRING_MATCH( + "count", lk.str, lk.len)) { chunk->count = lv.val; continue; - } - if (WT_STRING_MATCH( + } else if (WT_STRING_MATCH( "generation", lk.str, lk.len)) { chunk->generation = (uint32_t)lv.val; continue; } - if ((nchunks + 1) * chunk_sz > - lsm_tree->chunk_alloc) - WT_ERR(__wt_realloc(session, - &lsm_tree->chunk_alloc, - WT_MAX(10 * chunk_sz, - 2 * lsm_tree->chunk_alloc), - &lsm_tree->chunk)); - WT_ERR(__wt_calloc_def(session, 1, &chunk)); - lsm_tree->chunk[nchunks++] = chunk; - WT_ERR(__wt_strndup(session, - lk.str, lk.len, &chunk->uri)); - F_SET(chunk, WT_LSM_CHUNK_ONDISK); } WT_ERR_NOTFOUND_OK(ret); lsm_tree->nchunks = nchunks; } else if (WT_STRING_MATCH("old_chunks", ck.str, ck.len)) { WT_ERR(__wt_config_subinit(session, &lparser, &cv)); - chunk_sz = sizeof(*lsm_tree->old_chunks); for (nchunks = 0; (ret = __wt_config_next(&lparser, &lk, &lv)) == 0; ) { if (WT_STRING_MATCH("bloom", lk.str, lk.len)) { @@ -157,20 +169,20 @@ __wt_lsm_meta_write(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) lsm_tree->key_format, lsm_tree->value_format)); WT_ERR(__wt_buf_catfmt(session, buf, ",last=%" PRIu32 ",lsm_chunk_size=%" PRIu64 - ",lsm_merge_max=%" PRIu32 ",lsm_bloom=%" PRIu32 + ",lsm_merge_max=%" PRIu32 ",lsm_merge_threads=%" PRIu32 + ",lsm_bloom=%" PRIu32 ",lsm_bloom_bit_count=%" PRIu32 ",lsm_bloom_hash_count=%" PRIu32, lsm_tree->last, (uint64_t)lsm_tree->chunk_size, - lsm_tree->merge_max, lsm_tree->bloom, + lsm_tree->merge_max, lsm_tree->merge_threads, lsm_tree->bloom, lsm_tree->bloom_bit_count, lsm_tree->bloom_hash_count)); WT_ERR(__wt_buf_catfmt(session, buf, ",chunks=[")); for (i = 0; i < lsm_tree->nchunks; i++) { chunk = lsm_tree->chunk[i]; if (i > 0) WT_ERR(__wt_buf_catfmt(session, buf, ",")); - WT_ERR(__wt_buf_catfmt(session, buf, "\"%s\"", chunk->uri)); + WT_ERR(__wt_buf_catfmt(session, buf, "id=%" PRIu32, chunk->id)); if (F_ISSET(chunk, WT_LSM_CHUNK_BLOOM)) - WT_ERR(__wt_buf_catfmt( - session, buf, ",bloom=\"%s\"", chunk->bloom_uri)); + WT_ERR(__wt_buf_catfmt(session, buf, ",bloom")); if (chunk->count != 0) WT_ERR(__wt_buf_catfmt( session, buf, ",count=%" PRIu64, chunk->count)); diff --git a/src/lsm/lsm_tree.c b/src/lsm/lsm_tree.c index d80901a4229..efdadbee498 100644 --- a/src/lsm/lsm_tree.c +++ b/src/lsm/lsm_tree.c @@ -68,11 +68,15 @@ __lsm_tree_close(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) { WT_DECL_RET; WT_SESSION *wt_session; + WT_SESSION_IMPL *s; + uint32_t i; if (F_ISSET(lsm_tree, WT_LSM_TREE_WORKING)) { F_CLR(lsm_tree, WT_LSM_TREE_WORKING); if (F_ISSET(S2C(session), WT_CONN_LSM_MERGE)) - WT_TRET(__wt_thread_join(lsm_tree->worker_tid)); + for (i = 0; i < lsm_tree->merge_threads; i++) + WT_TRET(__wt_thread_join( + lsm_tree->worker_tids[i])); WT_TRET(__wt_thread_join(lsm_tree->ckpt_tid)); } @@ -83,18 +87,19 @@ __lsm_tree_close(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) * * Do this in the main thread to avoid deadlocks. */ - if (lsm_tree->worker_session != NULL) { - F_SET(lsm_tree->worker_session, - F_ISSET(session, WT_SESSION_SCHEMA_LOCKED)); - - wt_session = &lsm_tree->worker_session->iface; + for (i = 0; i < lsm_tree->merge_threads; i++) { + if ((s = lsm_tree->worker_sessions[i]) == NULL) + continue; + lsm_tree->worker_sessions[i] = NULL; + F_SET(s, F_ISSET(session, WT_SESSION_SCHEMA_LOCKED)); + wt_session = &s->iface; WT_TRET(wt_session->close(wt_session, NULL)); /* * This is safe after the close because session handles are * not freed, but are managed by the connection. */ - __wt_free(NULL, lsm_tree->worker_session->hazard); + __wt_free(NULL, s->hazard); } if (lsm_tree->ckpt_session != NULL) { F_SET(lsm_tree->ckpt_session, @@ -137,10 +142,10 @@ __wt_lsm_tree_close_all(WT_SESSION_IMPL *session) */ int __wt_lsm_tree_bloom_name( - WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int i, WT_ITEM *buf) + WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, uint32_t id, WT_ITEM *buf) { - WT_RET(__wt_buf_fmt(session, buf, "file:%s-%06d.bf", - lsm_tree->filename, i)); + WT_RET(__wt_buf_fmt(session, buf, "file:%s-%06" PRIu32 ".bf", + lsm_tree->filename, id)); return (0); } @@ -150,10 +155,10 @@ __wt_lsm_tree_bloom_name( */ int __wt_lsm_tree_chunk_name( - WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, int i, WT_ITEM *buf) + WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, uint32_t id, WT_ITEM *buf) { - WT_RET(__wt_buf_fmt(session, buf, "file:%s-%06d.lsm", - lsm_tree->filename, i)); + WT_RET(__wt_buf_fmt(session, buf, "file:%s-%06" PRIu32 ".lsm", + lsm_tree->filename, id)); return (0); } @@ -163,15 +168,16 @@ __wt_lsm_tree_chunk_name( */ int __wt_lsm_tree_setup_chunk(WT_SESSION_IMPL *session, - WT_LSM_TREE *lsm_tree, int i, WT_LSM_CHUNK *chunk, int create_bloom) + WT_LSM_TREE *lsm_tree, WT_LSM_CHUNK *chunk, int create_bloom) { - WT_DECL_ITEM(buf); - WT_DECL_ITEM(bbuf); WT_DECL_RET; + WT_ITEM buf; const char *cfg[] = API_CONF_DEFAULTS(session, drop, "force"); - WT_RET(__wt_scr_alloc(session, 0, &buf)); - WT_ERR(__wt_lsm_tree_chunk_name(session, lsm_tree, i, buf)); + WT_CLEAR(buf); + WT_RET(__wt_lsm_tree_chunk_name(session, lsm_tree, chunk->id, &buf)); + chunk->uri = __wt_buf_steal(session, &buf, NULL); + /* * Drop the chunk first - there may be some content hanging over from * an aborted merge. @@ -181,20 +187,16 @@ __wt_lsm_tree_setup_chunk(WT_SESSION_IMPL *session, * things with handle locks and metadata tracking. It can never have * been the result of an interrupted merge, anyway. */ - if (i > 1) - WT_ERR(__wt_schema_drop(session, buf->data, cfg)); - WT_ERR(__wt_schema_create(session, buf->data, lsm_tree->file_config)); - chunk->uri = __wt_buf_steal(session, buf, NULL); + if (chunk->id > 1) + WT_ERR(__wt_schema_drop(session, chunk->uri, cfg)); + WT_ERR(__wt_schema_create(session, chunk->uri, lsm_tree->file_config)); if (create_bloom) { - WT_ERR(__wt_scr_alloc(session, 0, &bbuf)); WT_ERR(__wt_lsm_tree_bloom_name( - session, lsm_tree, i, bbuf)); - chunk->bloom_uri = __wt_buf_steal(session, bbuf, NULL); + session, lsm_tree, chunk->id, &buf)); + chunk->bloom_uri = __wt_buf_steal(session, &buf, NULL); } -err: __wt_scr_free(&buf); - __wt_scr_free(&bbuf); - return (ret); +err: return (ret); } /* @@ -205,12 +207,12 @@ static int __lsm_tree_start_worker(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) { WT_CONNECTION *wt_conn; + WT_LSM_WORKER_ARGS *wargs; WT_SESSION *wt_session; + WT_SESSION_IMPL *s; + uint32_t i; wt_conn = &S2C(session)->iface; - WT_RET(wt_conn->open_session(wt_conn, NULL, NULL, &wt_session)); - lsm_tree->worker_session = (WT_SESSION_IMPL *)wt_session; - F_SET(lsm_tree->worker_session, WT_SESSION_INTERNAL); WT_RET(wt_conn->open_session(wt_conn, NULL, NULL, &wt_session)); lsm_tree->ckpt_session = (WT_SESSION_IMPL *)wt_session; @@ -220,8 +222,19 @@ __lsm_tree_start_worker(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) /* The new thread will rely on the WORKING value being visible. */ WT_FULL_BARRIER(); if (F_ISSET(S2C(session), WT_CONN_LSM_MERGE)) - WT_RET(__wt_thread_create( - &lsm_tree->worker_tid, __wt_lsm_worker, lsm_tree)); + for (i = 0; i < lsm_tree->merge_threads; i++) { + WT_RET(wt_conn->open_session( + wt_conn, NULL, NULL, &wt_session)); + s = (WT_SESSION_IMPL *)wt_session; + F_SET(s, WT_SESSION_INTERNAL); + lsm_tree->worker_sessions[i] = s; + + WT_RET(__wt_calloc_def(session, 1, &wargs)); + wargs->lsm_tree = lsm_tree; + wargs->id = i; + WT_RET(__wt_thread_create( + &lsm_tree->worker_tids[i], __wt_lsm_worker, wargs)); + } WT_RET(__wt_thread_create( &lsm_tree->ckpt_tid, __wt_lsm_checkpoint_worker, lsm_tree)); @@ -306,6 +319,10 @@ __wt_lsm_tree_create(WT_SESSION_IMPL *session, lsm_tree->chunk_size = (uint32_t)cval.val; WT_ERR(__wt_config_gets(session, cfg, "lsm_merge_max", &cval)); lsm_tree->merge_max = (uint32_t)cval.val; + WT_ERR(__wt_config_gets(session, cfg, "lsm_merge_threads", &cval)); + lsm_tree->merge_threads = (uint32_t)cval.val; + /* Sanity check that api_data.py is in sync with lsm.h */ + WT_ASSERT(session, lsm_tree->merge_threads <= WT_LSM_MAX_WORKERS); WT_ERR(__wt_scr_alloc(session, 0, &buf)); WT_ERR(__wt_buf_fmt(session, buf, @@ -346,20 +363,21 @@ __lsm_tree_open_check( WT_CONFIG_ITEM cval; const char *cfg[] = API_CONF_DEFAULTS( session, create, lsm_tree->file_config); + uint64_t required; uint32_t maxleafpage; - uint64_t req; WT_RET(__wt_config_gets( session, cfg, "leaf_page_max", &cval)); maxleafpage = (uint32_t)cval.val; /* Three chunks, plus one page for each participant in a merge. */ - req = 3 * lsm_tree->chunk_size + (lsm_tree->merge_max * maxleafpage); - if (S2C(session)->cache_size < req) + required = 3 * lsm_tree->chunk_size + + lsm_tree->merge_threads * (lsm_tree->merge_max * maxleafpage); + if (S2C(session)->cache_size < required) WT_RET_MSG(session, EINVAL, "The LSM configuration requires a cache size of at least %" PRIu64 ". Configured size is %" PRIu64, - req, S2C(session)->cache_size); + required, S2C(session)->cache_size); return (0); } @@ -466,7 +484,7 @@ __wt_lsm_tree_switch( { WT_DECL_RET; WT_LSM_CHUNK *chunk; - int new_id; + uint32_t new_id; new_id = WT_ATOMIC_ADD(lsm_tree->last, 1); @@ -486,9 +504,9 @@ __wt_lsm_tree_switch( &lsm_tree->chunk)); WT_ERR(__wt_calloc_def(session, 1, &chunk)); + chunk->id = new_id; lsm_tree->chunk[lsm_tree->nchunks++] = chunk; - WT_ERR(__wt_lsm_tree_setup_chunk( - session, lsm_tree, new_id, chunk, + WT_ERR(__wt_lsm_tree_setup_chunk(session, lsm_tree, chunk, FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_NEWEST) ? 1 : 0)); ++lsm_tree->dsk_gen; @@ -556,14 +574,15 @@ int __wt_lsm_tree_rename(WT_SESSION_IMPL *session, const char *oldname, const char *newname, const char *cfg[]) { - WT_DECL_ITEM(buf); WT_DECL_RET; + WT_ITEM buf; WT_LSM_CHUNK *chunk; WT_LSM_TREE *lsm_tree; const char *old; int i; old = NULL; + WT_CLEAR(buf); /* Get the LSM tree. */ WT_RET(__wt_lsm_tree_get(session, oldname, 1, &lsm_tree)); @@ -574,8 +593,6 @@ __wt_lsm_tree_rename(WT_SESSION_IMPL *session, /* Prevent any new opens. */ WT_RET(__wt_spin_trylock(session, &lsm_tree->lock)); - WT_ERR(__wt_scr_alloc(session, 0, &buf)); - /* Set the new name. */ __wt_free(session, lsm_tree->name); WT_ERR(__wt_strdup(session, newname, &lsm_tree->name)); @@ -586,16 +603,18 @@ __wt_lsm_tree_rename(WT_SESSION_IMPL *session, old = chunk->uri; chunk->uri = NULL; - WT_ERR(__wt_lsm_tree_chunk_name(session, lsm_tree, i, buf)); - chunk->uri = __wt_buf_steal(session, buf, NULL); + WT_ERR(__wt_lsm_tree_chunk_name( + session, lsm_tree, (uint32_t)i, &buf)); + chunk->uri = __wt_buf_steal(session, &buf, NULL); WT_ERR(__wt_schema_rename(session, old, chunk->uri, cfg)); __wt_free(session, old); - if ((old = chunk->bloom_uri) != NULL) { + if (F_ISSET(chunk, WT_LSM_CHUNK_BLOOM)) { + old = chunk->bloom_uri; chunk->bloom_uri = NULL; WT_ERR(__wt_lsm_tree_bloom_name( - session, lsm_tree, i, buf)); - chunk->bloom_uri = __wt_buf_steal(session, buf, NULL); + session, lsm_tree, (uint32_t)i, &buf)); + chunk->bloom_uri = __wt_buf_steal(session, &buf, NULL); F_SET(chunk, WT_LSM_CHUNK_BLOOM); WT_ERR(__wt_schema_rename( session, old, chunk->uri, cfg)); @@ -610,7 +629,6 @@ __wt_lsm_tree_rename(WT_SESSION_IMPL *session, if (0) { err: __wt_spin_unlock(session, &lsm_tree->lock); } - __wt_scr_free(&buf); if (old != NULL) __wt_free(session, old); __lsm_tree_discard(session, lsm_tree); @@ -640,15 +658,15 @@ __wt_lsm_tree_truncate( /* Prevent any new opens. */ WT_RET(__wt_spin_trylock(session, &lsm_tree->lock)); - /* Mark all chunks old. */ + /* Create the new chunk. */ WT_ERR(__wt_calloc_def(session, 1, &chunk)); + chunk->id = WT_ATOMIC_ADD(lsm_tree->last, 1); + WT_ERR(__wt_lsm_tree_setup_chunk(session, lsm_tree, chunk, 0)); + + /* Mark all chunks old. */ WT_ERR(__wt_lsm_merge_update_tree( session, lsm_tree, 0, lsm_tree->nchunks, chunk)); - /* Create the new chunk. */ - WT_ERR(__wt_lsm_tree_setup_chunk( - session, lsm_tree, WT_ATOMIC_ADD(lsm_tree->last, 1), chunk, 0)); - WT_ERR(__wt_lsm_meta_write(session, lsm_tree)); WT_ERR(__lsm_tree_start_worker(session, lsm_tree)); diff --git a/src/lsm/lsm_worker.c b/src/lsm/lsm_worker.c index dd16432e635..7bde98a11eb 100644 --- a/src/lsm/lsm_worker.c +++ b/src/lsm/lsm_worker.c @@ -17,14 +17,18 @@ static int __lsm_free_chunks(WT_SESSION_IMPL *, WT_LSM_TREE *); * trees to disk and merging on-disk trees. */ void * -__wt_lsm_worker(void *arg) +__wt_lsm_worker(void *vargs) { + WT_LSM_WORKER_ARGS *args; WT_LSM_TREE *lsm_tree; WT_SESSION_IMPL *session; - int progress, stalls; + int id, progress, stalls; - lsm_tree = arg; - session = lsm_tree->worker_session; + args = vargs; + lsm_tree = args->lsm_tree; + id = args->id; + session = lsm_tree->worker_sessions[id]; + __wt_free(session, args); stalls = 0; while (F_ISSET(lsm_tree, WT_LSM_TREE_WORKING)) { @@ -34,13 +38,18 @@ __wt_lsm_worker(void *arg) session->btree = NULL; /* Report stalls to merge in seconds. */ - if (__wt_lsm_merge(session, lsm_tree, stalls / 1000) == 0) + if (__wt_lsm_merge(session, lsm_tree, id, stalls / 1000) == 0) progress = 1; /* Clear any state from previous worker thread iterations. */ session->btree = NULL; - if (lsm_tree->nold_chunks != lsm_tree->old_avail && + /* + * Only have one thread freeing old chunks, and only if there + * are chunks to free. + */ + if (id == 0 && + lsm_tree->nold_chunks != lsm_tree->old_avail && __lsm_free_chunks(session, lsm_tree) == 0) progress = 1; @@ -177,7 +186,7 @@ __lsm_bloom_create(WT_SESSION_IMPL *session, WT_BLOOM *bloom; WT_CURSOR *src; WT_DECL_RET; - WT_ITEM key; + WT_ITEM buf, key; const char *cur_cfg[] = API_CONF_DEFAULTS(session, open_cursor, "raw"); uint64_t insert_count; @@ -185,11 +194,21 @@ __lsm_bloom_create(WT_SESSION_IMPL *session, chunk->count == 0) return (0); - WT_ASSERT(session, chunk->bloom_uri != NULL); + /* + * Normally, the Bloom URI is populated when the chunk struct is + * allocated. After an open, however, it may not have been. + * Deal with that here. + */ + if (chunk->bloom_uri == NULL) { + WT_CLEAR(buf); + WT_RET(__wt_lsm_tree_bloom_name( + session, lsm_tree, chunk->id, &buf)); + chunk->bloom_uri = __wt_buf_steal(session, &buf, NULL); + } bloom = NULL; - WT_ERR(__wt_bloom_create(session, chunk->bloom_uri, + WT_RET(__wt_bloom_create(session, chunk->bloom_uri, lsm_tree->bloom_config, chunk->count, lsm_tree->bloom_bit_count, lsm_tree->bloom_hash_count, &bloom)); @@ -262,7 +281,7 @@ __lsm_free_chunks(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) "LSM worker drop busy: %s.", chunk->uri); continue; - } else + } else WT_ERR(ret); } |