diff options
author | Alex Gorrod <alexg@wiredtiger.com> | 2013-12-06 16:47:36 +1100 |
---|---|---|
committer | Alex Gorrod <alexg@wiredtiger.com> | 2013-12-06 16:47:36 +1100 |
commit | 6d4ca8b0e0b9dd4ed637b39868c227d0302ed96b (patch) | |
tree | 80d4812e4a5f66e8a456f3f2e9a0fc13fa8c11af | |
parent | 7b6b95615a006b57f065debb46977a62f9de1912 (diff) | |
parent | 92c17c89340b16eec9f88bc2e5d6fb3791b5b0ec (diff) | |
download | mongo-6d4ca8b0e0b9dd4ed637b39868c227d0302ed96b.tar.gz |
Merge branch 'develop' into compact-lsm
Conflicts:
src/lsm/lsm_merge.c
34 files changed, 871 insertions, 242 deletions
diff --git a/bench/wtperf/runners/evict-lsm.wtperf b/bench/wtperf/runners/evict-lsm.wtperf index da03c2aac64..a5541986668 100644 --- a/bench/wtperf/runners/evict-lsm.wtperf +++ b/bench/wtperf/runners/evict-lsm.wtperf @@ -1,6 +1,6 @@ # wtperf options file: evict lsm configuration conn_config="cache_size=50M" -table_config="lsm=(chunk_size=2MB),type=lsm" +table_config="lsm=(merge_threads=2),type=lsm" icount=10000000 report_interval=5 run_time=120 diff --git a/bench/wtperf/runners/insert-rmw.wtperf b/bench/wtperf/runners/insert-rmw.wtperf index 742374184b1..50db0baa0d1 100644 --- a/bench/wtperf/runners/insert-rmw.wtperf +++ b/bench/wtperf/runners/insert-rmw.wtperf @@ -1,6 +1,6 @@ # wtperf options file: Test the insert-rmw functionality conn_config="cache_size=500MB" -table_config="lsm=(chunk_size=5MB),type=lsm" +table_config="type=lsm" icount=500000 insert_rmw=true report_interval=5 diff --git a/bench/wtperf/runners/parallel-pop-lsm.wtperf b/bench/wtperf/runners/parallel-pop-lsm.wtperf index d91feb9aaf7..9cc16c8ea9f 100644 --- a/bench/wtperf/runners/parallel-pop-lsm.wtperf +++ b/bench/wtperf/runners/parallel-pop-lsm.wtperf @@ -1,7 +1,7 @@ # wtperf options file: Run populate thread multi-threaded and with groups # of operations in each transaction. conn_config="cache_size=200MB" -table_config="lsm=(chunk_size=1M),type=lsm" +table_config="lsm=(merge_threads=2),type=lsm" transaction_config="isolation=snapshot" icount=10000000 report_interval=5 diff --git a/dist/api_data.py b/dist/api_data.py index 80e3f6c76a3..e1c69d178df 100644 --- a/dist/api_data.py +++ b/dist/api_data.py @@ -119,11 +119,11 @@ lsm_config = [ type='boolean'), Config('chunk_max', '5GB', r''' the maximum size a single chunk can be. Chunks larger than this - size are not considered for further merges. This is a soft - limit, and chunks larger than this value can be created. Must - be larger than chunk_size''', + size are not considered for further merges. This is a soft + limit, and chunks larger than this value can be created. Must + be larger than chunk_size''', min='100MB', max='10TB'), - Config('chunk_size', '2MB', r''' + Config('chunk_size', '10MB', r''' the maximum size of the in-memory chunk of an LSM tree''', min='512K', max='500MB'), Config('merge_max', '15', r''' @@ -180,11 +180,11 @@ file_config = format_meta + [ Config('huffman_key', '', r''' configure Huffman encoding for keys. Permitted values are empty (off), \c "english", \c "utf8<file>" or \c - "utf16<file>". See @ref huffman for more information'''), + "utf16<file>". See @ref huffman for more information'''), Config('huffman_value', '', r''' - configure Huffman encoding for values. Permitted values + configure Huffman encoding for values. Permitted values are empty (off), \c "english", \c "utf8<file>" or \c - "utf16<file>". See @ref huffman for more information'''), + "utf16<file>". See @ref huffman for more information'''), Config('internal_key_truncate', 'true', r''' configure internal key truncation, discarding unnecessary trailing bytes on internal keys (ignored for custom @@ -238,7 +238,7 @@ file_config = format_meta + [ from this object are read or written into the buffer cache''', min=0), Config('os_cache_dirty_max', '0', r''' - maximum dirty system buffer cache usage, in bytes. If non-zero, + maximum dirty system buffer cache usage, in bytes. If non-zero, schedule writes for dirty blocks belonging to this object in the system buffer cache after that many bytes from this object are written into the buffer cache''', @@ -399,7 +399,7 @@ methods = { 'session.create' : Method(table_only_meta + file_config + lsm_config + source_meta + [ Config('exclusive', 'false', r''' - fail if the object exists. When false (the default), if the + fail if the object exists. When false (the default), if the object exists, check that its settings match the specified configuration''', type='boolean'), @@ -570,7 +570,7 @@ methods = { must match ::wiredtiger_extension_init'''), Config('terminate', 'wiredtiger_extension_terminate', r''' an optional function in the extension that is called before - the extension is unloaded during WT_CONNECTION::close. The + the extension is unloaded during WT_CONNECTION::close. The signature of the function must match ::wiredtiger_extension_terminate'''), ]), @@ -618,7 +618,7 @@ methods = { Config('file_extend', '', r''' file extension configuration. If set, extend files of the set type in allocations of the set size, instead of a block at a - time as each new block is written. For example, + time as each new block is written. For example, <code>file_extend=(data=16MB)</code>''', type='list', choices=['data', 'log']), Config('hazard_max', '1000', r''' @@ -659,7 +659,7 @@ methods = { min='1'), Config('statistics_log', '', r''' log any statistics the database is configured to maintain, - to a file. See @ref statistics for more information''', + to a file. See @ref statistics for more information''', type='category', subconfig=[ Config('path', '"WiredTigerStat.%H"', r''' the pathname to a file into which the log records are written, diff --git a/dist/s_string.ok b/dist/s_string.ok index a66518d57d5..6e9d23242da 100644 --- a/dist/s_string.ok +++ b/dist/s_string.ok @@ -866,6 +866,7 @@ typedef uB uid uint +uintmax unbare uncompressing uncompresssed diff --git a/src/btree/bt_cursor.c b/src/btree/bt_cursor.c index 0ba899215ee..4c5a921a8f7 100644 --- a/src/btree/bt_cursor.c +++ b/src/btree/bt_cursor.c @@ -49,7 +49,8 @@ __cursor_fix_implicit(WT_BTREE *btree, WT_CURSOR_BTREE *cbt) /* * __cursor_invalid -- * Return if the cursor references an invalid K/V pair (either the pair - * doesn't exist at all because the tree is empty, or the pair was deleted). + * doesn't exist at all because the tree is empty, or the pair was + * deleted). */ static inline int __cursor_invalid(WT_CURSOR_BTREE *cbt) @@ -68,8 +69,23 @@ __cursor_invalid(WT_CURSOR_BTREE *cbt) session = (WT_SESSION_IMPL *)cbt->iface.session; /* If we found an insert list entry with a visible update, use it. */ - if (ins != NULL && (upd = __wt_txn_read(session, ins->upd)) != NULL) - return (WT_UPDATE_DELETED_ISSET(upd) ? 1 : 0); + if (ins != NULL) { + if ((upd = __wt_txn_read(session, ins->upd)) != NULL) + return (WT_UPDATE_DELETED_ISSET(upd) ? 1 : 0); + + /* Do we have a position on the page? */ + switch (btree->type) { + case BTREE_COL_FIX: + if (cbt->recno >= page->u.col_fix.recno + page->entries) + return (1); + break; + case BTREE_COL_VAR: + case BTREE_ROW: + if (cbt->slot > page->entries) + return (1); + break; + } + } /* The page may be empty, the search routine doesn't check. */ if (page->entries == 0) @@ -81,9 +97,8 @@ __cursor_invalid(WT_CURSOR_BTREE *cbt) break; case BTREE_COL_VAR: cip = &page->u.col_var.d[cbt->slot]; - if ((cell = WT_COL_PTR(page, cip)) == NULL) - return (WT_NOTFOUND); - if (__wt_cell_type(cell) == WT_CELL_DEL) + if ((cell = WT_COL_PTR(page, cip)) == NULL || + __wt_cell_type(cell) == WT_CELL_DEL) return (1); break; case BTREE_ROW: diff --git a/src/config/config_def.c b/src/config/config_def.c index b7820d7d2ec..056f2218af1 100644 --- a/src/config/config_def.c +++ b/src/config/config_def.c @@ -371,10 +371,10 @@ static const WT_CONFIG_ENTRY config_entries[] = { "internal_page_max=4KB,key_format=u,key_gap=10,leaf_item_max=0," "leaf_page_max=1MB,lsm=(auto_throttle=,bloom=,bloom_bit_count=16," "bloom_config=,bloom_hash_count=8,bloom_oldest=0,chunk_max=5GB," - "chunk_size=2MB,merge_max=15,merge_threads=1),memory_page_max=5MB" - ",os_cache_dirty_max=0,os_cache_max=0,prefix_compression=," - "prefix_compression_min=4,source=,split_pct=75,type=file," - "value_format=u", + "chunk_size=10MB,merge_max=15,merge_threads=1)," + "memory_page_max=5MB,os_cache_dirty_max=0,os_cache_max=0," + "prefix_compression=,prefix_compression_min=4,source=," + "split_pct=75,type=file,value_format=u", confchk_session_create }, { "session.drop", diff --git a/src/conn/conn_dhandle.c b/src/conn/conn_dhandle.c index 3e2f9bf55f1..a734ceb9a0a 100644 --- a/src/conn/conn_dhandle.c +++ b/src/conn/conn_dhandle.c @@ -103,7 +103,7 @@ __conn_dhandle_get(WT_SESSION_IMPL *session, strcmp(ckpt, dhandle->checkpoint) == 0))) { WT_RET(__conn_dhandle_open_lock( session, dhandle, flags)); - ++dhandle->refcnt; + ++dhandle->session_ref; session->dhandle = dhandle; return (0); } @@ -116,7 +116,7 @@ __conn_dhandle_get(WT_SESSION_IMPL *session, WT_RET(__wt_calloc_def(session, 1, &dhandle)); WT_ERR(__wt_rwlock_alloc(session, "btree handle", &dhandle->rwlock)); - dhandle->refcnt = 1; + dhandle->session_ref = 1; dhandle->name_hash = hash; WT_ERR(__wt_strdup(session, name, &dhandle->name)); @@ -379,7 +379,7 @@ __conn_dhandle_sweep(WT_SESSION_IMPL *session) while (dhandle != NULL) { dhandle_next = SLIST_NEXT(dhandle, l); if (!F_ISSET(dhandle, WT_DHANDLE_OPEN) && - dhandle->refcnt == 0) { + dhandle->session_ref == 0) { WT_STAT_FAST_CONN_INCR(session, dh_conn_handles); SLIST_REMOVE(&conn->dhlh, dhandle, __wt_data_handle, l); SLIST_INSERT_HEAD(&sweeplh, dhandle, l); @@ -416,8 +416,18 @@ __wt_conn_btree_get(WT_SESSION_IMPL *session, WT_DATA_HANDLE *dhandle; WT_DECL_RET; - if (S2C(session)->dhandle_dead >= WT_DHANDLE_SWEEP_TRIGGER) + /* + * If enough handles have been closed recently, sweep for dead handles. + * + * Don't do this if WT_DHANDLE_LOCK_ONLY is set: as well as avoiding + * sweeping in what should be a fast path, this also avoids sweeping + * during __wt_conn_dhandle_close_all, because it sets + * WT_DHANDLE_LOCK_ONLY. + */ + if (!LF_ISSET(WT_DHANDLE_LOCK_ONLY) && + S2C(session)->dhandle_dead >= WT_DHANDLE_SWEEP_TRIGGER) WT_RET(__conn_dhandle_sweep(session)); + WT_RET(__conn_dhandle_get(session, name, ckpt, flags)); dhandle = session->dhandle; @@ -563,7 +573,7 @@ __wt_conn_btree_close(WT_SESSION_IMPL *session, int locked) /* * Decrement the reference count and return if still in use. */ - if (--dhandle->refcnt > 0) + if (--dhandle->session_ref > 0) return (0); /* @@ -610,20 +620,13 @@ int __wt_conn_dhandle_close_all(WT_SESSION_IMPL *session, const char *name) { WT_CONNECTION_IMPL *conn; - WT_DATA_HANDLE *dhandle, *saved_dhandle; + WT_DATA_HANDLE *dhandle; WT_DECL_RET; conn = S2C(session); WT_ASSERT(session, F_ISSET(session, WT_SESSION_SCHEMA_LOCKED)); - - /* - * Make sure the caller's handle is tracked, so it will be unlocked - * even if we failed to get all of the remaining handles we need. - */ - if ((saved_dhandle = session->dhandle) != NULL && - WT_META_TRACKING(session)) - WT_ERR(__wt_meta_track_handle_lock(session, 0)); + WT_ASSERT(session, session->dhandle == NULL); SLIST_FOREACH(dhandle, &conn->dhlh, l) { if (strcmp(dhandle->name, name) != 0) @@ -631,24 +634,16 @@ __wt_conn_dhandle_close_all(WT_SESSION_IMPL *session, const char *name) session->dhandle = dhandle; - /* - * The caller may have this tree locked to prevent - * concurrent schema operations. - */ - if (dhandle == saved_dhandle) - WT_ASSERT(session, - F_ISSET(dhandle, WT_DHANDLE_EXCLUSIVE)); - else { - WT_ERR(__wt_try_writelock(session, dhandle->rwlock)); - F_SET(dhandle, WT_DHANDLE_EXCLUSIVE); - if (WT_META_TRACKING(session)) - WT_ERR(__wt_meta_track_handle_lock(session, 0)); - } + /* Lock the handle exclusively. */ + WT_ERR(__wt_session_get_btree(session, + dhandle->name, dhandle->checkpoint, + NULL, WT_DHANDLE_EXCLUSIVE | WT_DHANDLE_LOCK_ONLY)); + if (WT_META_TRACKING(session)) + WT_ERR(__wt_meta_track_handle_lock(session, 0)); /* - * We have an exclusive lock, which means there are no - * cursors open at this point. Close the handle, if - * necessary. + * We have an exclusive lock, which means there are no cursors + * open at this point. Close the handle, if necessary. */ if (F_ISSET(dhandle, WT_DHANDLE_OPEN)) { ret = __wt_meta_track_sub_on(session); @@ -656,10 +651,9 @@ __wt_conn_dhandle_close_all(WT_SESSION_IMPL *session, const char *name) ret = __wt_conn_btree_sync_and_close(session); /* - * If the close succeeded, drop any locks it - * acquired. If there was a failure, this - * function will fail and the whole transaction - * will be rolled back. + * If the close succeeded, drop any locks it acquired. + * If there was a failure, this function will fail and + * the whole transaction will be rolled back. */ if (ret == 0) ret = __wt_meta_track_sub_off(session); @@ -668,7 +662,6 @@ __wt_conn_dhandle_close_all(WT_SESSION_IMPL *session, const char *name) if (!WT_META_TRACKING(session)) WT_TRET(__wt_session_release_btree(session)); - session->dhandle = NULL; WT_ERR(ret); } diff --git a/src/include/dhandle.h b/src/include/dhandle.h index c620f5d3263..749e54c8f13 100644 --- a/src/include/dhandle.h +++ b/src/include/dhandle.h @@ -33,9 +33,17 @@ */ struct __wt_data_handle { WT_RWLOCK *rwlock; /* Lock for shared/exclusive ops */ - uint32_t refcnt; /* Sessions using this handle */ SLIST_ENTRY(__wt_data_handle) l;/* Linked list of handles */ + /* + * Sessions caching a connection's data handle will have a non-zero + * reference count; sessions using a connection's data handle will + * have a non-zero in-use count. + */ + uint32_t session_ref; /* Sessions referencing this handle */ + int32_t session_inuse; /* Sessions using this handle */ + time_t timeofdeath; /* Use count went to 0 */ + uint64_t name_hash; /* Hash of name */ const char *name; /* Object name as a URI */ const char *checkpoint; /* Checkpoint name (or NULL) */ diff --git a/src/include/extern.h b/src/include/extern.h index 66042de0955..34cc0ae382b 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -1091,6 +1091,7 @@ extern int __wt_thread_create(WT_SESSION_IMPL *session, void *(*func)(void *), void *arg); extern int __wt_thread_join(WT_SESSION_IMPL *session, pthread_t tid); +extern int __wt_seconds(WT_SESSION_IMPL *session, time_t *timep); extern int __wt_epoch(WT_SESSION_IMPL *session, struct timespec *tsp); extern void __wt_yield(void); extern int __wt_ext_struct_pack(WT_EXTENSION_API *wt_api, @@ -1284,8 +1285,8 @@ extern int __wt_compact_uri_analyze(WT_SESSION_IMPL *session, const char *uri); extern int __wt_session_compact( WT_SESSION *wt_session, const char *uri, const char *config); -extern int __wt_session_add_btree( WT_SESSION_IMPL *session, - WT_DATA_HANDLE_CACHE **dhandle_cachep); +extern void __wt_session_dhandle_incr_use(WT_SESSION_IMPL *session); +extern int __wt_session_dhandle_decr_use(WT_SESSION_IMPL *session); extern int __wt_session_lock_btree(WT_SESSION_IMPL *session, uint32_t flags); extern int __wt_session_release_btree(WT_SESSION_IMPL *session); extern int __wt_session_get_btree_ckpt(WT_SESSION_IMPL *session, diff --git a/src/include/lsm.h b/src/include/lsm.h index b7aa1df1b24..0614b2d43a3 100644 --- a/src/include/lsm.h +++ b/src/include/lsm.h @@ -117,6 +117,7 @@ struct __wt_lsm_tree { size_t chunk_alloc; /* Space allocated for chunks */ u_int nchunks; /* Number of active chunks */ uint32_t last; /* Last allocated ID */ + int modified; /* Have there been updates? */ WT_LSM_CHUNK **old_chunks; /* Array of old LSM chunks */ size_t old_alloc; /* Space allocated for old chunks */ diff --git a/src/include/session.h b/src/include/session.h index b00f26e1345..c24853be074 100644 --- a/src/include/session.h +++ b/src/include/session.h @@ -58,7 +58,12 @@ struct __wt_session_impl { WT_EVENT_HANDLER *event_handler;/* Application's event handlers */ WT_DATA_HANDLE *dhandle; /* Current data handle */ + + /* Session handle reference list */ SLIST_HEAD(__dhandles, __wt_data_handle_cache) dhandles; +#define WT_DHANDLE_SWEEP_WAIT 60 /* Wait before discarding */ +#define WT_DHANDLE_SWEEP_PERIOD 20 /* Only sweep every 20 seconds */ + time_t last_sweep; /* Last sweep for dead handles */ WT_CURSOR *cursor; /* Current cursor */ /* Cursors closed with the session */ diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in index 55a258923a2..0d947bf5e91 100644 --- a/src/include/wiredtiger.in +++ b/src/include/wiredtiger.in @@ -843,7 +843,7 @@ struct __wt_session { * between 100MB and 10TB; default \c 5GB.} * @config{ chunk_size, the maximum size of the * in-memory chunk of an LSM tree., an integer between 512K and 500MB; - * default \c 2MB.} + * default \c 10MB.} * @config{ merge_max, the * maximum number of chunks to include in a merge operation., an integer * between 2 and 100; default \c 15.} diff --git a/src/lsm/lsm_cursor.c b/src/lsm/lsm_cursor.c index 075faa2ebc4..6c89204da6f 100644 --- a/src/lsm/lsm_cursor.c +++ b/src/lsm/lsm_cursor.c @@ -393,8 +393,8 @@ retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) { * chunk instead. */ if (ret == WT_NOTFOUND && F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)) { - ret = __wt_open_cursor(session, chunk->uri, c, - NULL, cp); + ret = __wt_open_cursor( + session, chunk->uri, c, NULL, cp); if (ret == 0) F_SET(chunk, WT_LSM_CHUNK_EMPTY); } @@ -808,6 +808,7 @@ __clsm_lookup(WT_CURSOR_LSM *clsm) WT_FORALL_CURSORS(clsm, c, i) { /* If there is a Bloom filter, see if we can skip the read. */ + bloom = NULL; if ((bloom = clsm->blooms[i]) != NULL) { if (!have_hash) { WT_ERR(__wt_bloom_hash( diff --git a/src/lsm/lsm_merge.c b/src/lsm/lsm_merge.c index c7c436ab801..e8f73ca9e61 100644 --- a/src/lsm/lsm_merge.c +++ b/src/lsm/lsm_merge.c @@ -57,7 +57,7 @@ __wt_lsm_merge( WT_DECL_ITEM(bbuf); WT_DECL_RET; WT_ITEM buf, key, value; - WT_LSM_CHUNK *chunk, *youngest; + WT_LSM_CHUNK *chunk, *previous, *youngest; uint32_t generation, start_id; uint64_t insert_count, record_count, chunk_size; u_int dest_id, end_chunk, i, merge_min, nchunks, start_chunk; @@ -66,18 +66,29 @@ __wt_lsm_merge( const char *cfg[3]; bloom = NULL; + chunk_size = 0; create_bloom = 0; dest = src = NULL; - chunk_size = 0; start_id = 0; /* + * If the tree is open read-only, be very aggressive. Otherwise, we + * can spend a long time waiting for merges to start in read-only + * applications. + */ + if (!lsm_tree->modified || + F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING)) + aggressive = 10; + merge_min = (aggressive > 5) ? 2 : lsm_tree->merge_min; + max_generation_gap = aggressive > 10 ? 3 : 1; + + /* * If there aren't any chunks to merge, or some of the chunks aren't * yet written, we're done. A non-zero error indicates that the worker * should assume there is no work to do: if there are unwritten chunks, * the worker should write them immediately. */ - if (lsm_tree->nchunks <= 1) + if (lsm_tree->nchunks < merge_min) return (WT_NOTFOUND); /* @@ -87,24 +98,14 @@ __wt_lsm_merge( */ WT_RET(__wt_lsm_tree_lock(session, lsm_tree, 1)); - /* - * If the tree is open read-only, be very aggressive. Otherwise, we - * can spend a long time waiting for merges to start in read-only - * applications. - */ - if (F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING) || F_ISSET( - lsm_tree->chunk[lsm_tree->nchunks - 1], WT_LSM_CHUNK_ONDISK)) - aggressive = 100; - merge_min = aggressive ? 2 : lsm_tree->merge_min; - max_generation_gap = aggressive > 10 ? 3 : 1; /* - * Only include chunks that are stable on disk and not involved in a - * merge. + * Only include chunks that already have a Bloom filter and not + * involved in a merge. */ end_chunk = lsm_tree->nchunks - 1; while (end_chunk > 0 && - (!F_ISSET(lsm_tree->chunk[end_chunk], WT_LSM_CHUNK_ONDISK) || + (!F_ISSET(lsm_tree->chunk[end_chunk], WT_LSM_CHUNK_BLOOM) || F_ISSET(lsm_tree->chunk[end_chunk], WT_LSM_CHUNK_MERGING))) --end_chunk; @@ -159,11 +160,12 @@ __wt_lsm_merge( * If we have enough chunks for a merge and the next chunk is * in a different generation, stop. */ - if (nchunks >= merge_min && - chunk->generation > - lsm_tree->chunk[start_chunk]->generation && - chunk->generation <= youngest->generation + 1) - break; + if (nchunks >= merge_min) { + previous = lsm_tree->chunk[start_chunk]; + if (chunk->generation > previous->generation && + previous->generation <= youngest->generation + 1) + break; + } F_SET(chunk, WT_LSM_CHUNK_MERGING); record_count += chunk->count; @@ -286,7 +288,7 @@ __wt_lsm_merge( record_count, insert_count); /* - * We've successfully created the new chunk. Now install it. We need + * We've successfully created the new chunk. Now install it. We need * to ensure that the NO_CACHE flag is cleared and the bloom filter * is closed (even if a step fails), so track errors but don't return * until we've cleaned up. @@ -295,6 +297,8 @@ __wt_lsm_merge( WT_TRET(dest->close(dest)); src = dest = NULL; + F_CLR(session, WT_SESSION_NO_CACHE); + if (create_bloom) { if (ret == 0) WT_TRET(__wt_bloom_finalize(bloom)); @@ -313,7 +317,6 @@ __wt_lsm_merge( WT_TRET(__wt_bloom_close(bloom)); bloom = NULL; } - F_CLR(session, WT_SESSION_NO_CACHE); WT_ERR(ret); /* @@ -328,7 +331,6 @@ __wt_lsm_merge( WT_ERR_NOTFOUND_OK(ret); WT_ERR(__wt_lsm_tree_set_chunk_size(session, chunk)); - WT_ERR(__wt_lsm_tree_lock(session, lsm_tree, 1)); /* diff --git a/src/lsm/lsm_tree.c b/src/lsm/lsm_tree.c index 0badd5841d7..3aa1b6419d7 100644 --- a/src/lsm/lsm_tree.c +++ b/src/lsm/lsm_tree.c @@ -562,8 +562,13 @@ __wt_lsm_tree_throttle(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) { WT_LSM_CHUNK *chunk, **cp, *prev_chunk; uint64_t cache_sz, cache_used, in_memory, record_count; + uint64_t oldtime, timediff; uint32_t i; + /* Never throttle in small trees. */ + if (lsm_tree->nchunks < 3) + return; + cache_sz = S2C(session)->cache_size; /* @@ -601,9 +606,9 @@ __wt_lsm_tree_throttle(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) } else { WT_ASSERT(session, WT_TIMECMP(chunk->create_ts, (*cp)->create_ts) >= 0); - lsm_tree->throttle_sleep = (long)((in_memory - 2) * - WT_TIMEDIFF(chunk->create_ts, (*cp)->create_ts) / - (20 * record_count)); + timediff = WT_TIMEDIFF(chunk->create_ts, (*cp)->create_ts); + lsm_tree->throttle_sleep = + (long)((in_memory - 2) * timediff / (20 * record_count)); /* * Get more aggressive as the number of in memory chunks @@ -622,16 +627,24 @@ __wt_lsm_tree_throttle(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) /* * Update our estimate of how long each in-memory chunk stays active. * Filter out some noise by keeping a weighted history of the - * calculated value. + * calculated value. Wait until we have enough chunks that we can + * check that the new value is sane: otherwise, after a long idle + * period, we can calculate a crazy value. */ - prev_chunk = lsm_tree->chunk[lsm_tree->nchunks - 2]; - if (in_memory > 1) { + if (in_memory > 1 && + i != lsm_tree->nchunks && !F_ISSET(*cp, WT_LSM_CHUNK_STABLE)) { + prev_chunk = lsm_tree->chunk[lsm_tree->nchunks - 2]; WT_ASSERT(session, prev_chunk->generation == 0); - WT_ASSERT(session, WT_TIMECMP( - chunk->create_ts, prev_chunk->create_ts) >= 0); - lsm_tree->chunk_fill_ms = - (3 * lsm_tree->chunk_fill_ms + WT_TIMEDIFF( - chunk->create_ts, prev_chunk->create_ts) / 1000000) / 4; + WT_ASSERT(session, + WT_TIMECMP(chunk->create_ts, prev_chunk->create_ts) >= 0); + timediff = WT_TIMEDIFF(chunk->create_ts, prev_chunk->create_ts); + WT_ASSERT(session, + WT_TIMECMP(prev_chunk->create_ts, (*cp)->create_ts) >= 0); + oldtime = WT_TIMEDIFF(prev_chunk->create_ts, (*cp)->create_ts); + if (timediff < 10 * oldtime) + lsm_tree->chunk_fill_ms = + (3 * lsm_tree->chunk_fill_ms + + timediff / 1000000) / 4; } } @@ -657,10 +670,10 @@ __wt_lsm_tree_switch(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) !F_ISSET(lsm_tree, WT_LSM_TREE_NEED_SWITCH)) goto err; - new_id = WT_ATOMIC_ADD(lsm_tree->last, 1); + /* Update the throttle time. */ + __wt_lsm_tree_throttle(session, lsm_tree); - if (nchunks > 1) - __wt_lsm_tree_throttle(session, lsm_tree); + new_id = WT_ATOMIC_ADD(lsm_tree->last, 1); WT_ERR(__wt_realloc_def(session, &lsm_tree->chunk_alloc, nchunks + 1, &lsm_tree->chunk)); @@ -679,6 +692,8 @@ __wt_lsm_tree_switch(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) F_CLR(lsm_tree, WT_LSM_TREE_NEED_SWITCH); ++lsm_tree->dsk_gen; + lsm_tree->modified = 1; + err: /* TODO: mark lsm_tree bad on error(?) */ WT_TRET(__wt_lsm_tree_unlock(session, lsm_tree)); return (ret); diff --git a/src/lsm/lsm_worker.c b/src/lsm/lsm_worker.c index 7fa9de38b5e..3cfb4119b85 100644 --- a/src/lsm/lsm_worker.c +++ b/src/lsm/lsm_worker.c @@ -7,7 +7,8 @@ #include "wt_internal.h" -static int __lsm_bloom_create(WT_SESSION_IMPL *, WT_LSM_TREE *, WT_LSM_CHUNK *); +static int __lsm_bloom_create( + WT_SESSION_IMPL *, WT_LSM_TREE *, WT_LSM_CHUNK *, u_int); static int __lsm_bloom_work(WT_SESSION_IMPL *, WT_LSM_TREE *); static int __lsm_discard_handle(WT_SESSION_IMPL *, const char *, const char *); static int __lsm_free_chunks(WT_SESSION_IMPL *, WT_LSM_TREE *); @@ -94,9 +95,8 @@ __wt_lsm_merge_worker(void *vargs) WT_LSM_WORKER_ARGS *args; WT_LSM_TREE *lsm_tree; WT_SESSION_IMPL *session; - uint64_t saved_gen; u_int aggressive, chunk_wait, id, old_aggressive, stallms; - int progress, try_bloom; + int progress; args = vargs; lsm_tree = args->lsm_tree; @@ -104,9 +104,7 @@ __wt_lsm_merge_worker(void *vargs) session = lsm_tree->worker_sessions[id]; __wt_free(session, args); - saved_gen = lsm_tree->dsk_gen; aggressive = stallms = 0; - try_bloom = 0; while (F_ISSET(lsm_tree, WT_LSM_TREE_WORKING)) { /* @@ -124,19 +122,18 @@ __wt_lsm_merge_worker(void *vargs) /* Clear any state from previous worker thread iterations. */ session->dhandle = NULL; - if (__wt_lsm_merge(session, lsm_tree, id, aggressive) == 0) { + /* Try to create a Bloom filter. */ + if (__lsm_bloom_work(session, lsm_tree) == 0) + progress = 1; + + /* If we didn't create a Bloom filter, try to merge. */ + if (progress == 0 && + __wt_lsm_merge(session, lsm_tree, id, aggressive) == 0) progress = 1; - try_bloom = 0; - } /* Clear any state from previous worker thread iterations. */ WT_CLEAR_BTREE_IN_SESSION(session); - /* Try to create a Bloom filter if no merge was possible. */ - if (progress == 0 && try_bloom && - __lsm_bloom_work(session, lsm_tree) == 0) - progress = 1; - /* * Only have one thread freeing old chunks, and only if there * are chunks to free. @@ -145,42 +142,25 @@ __wt_lsm_merge_worker(void *vargs) __lsm_free_chunks(session, lsm_tree) == 0) progress = 1; - if (progress || saved_gen != lsm_tree->dsk_gen) { - aggressive = 0; + if (progress) stallms = 0; - saved_gen = lsm_tree->dsk_gen; - } else if (F_ISSET(lsm_tree, WT_LSM_TREE_WORKING) && + else if (F_ISSET(lsm_tree, WT_LSM_TREE_WORKING) && !F_ISSET(lsm_tree, WT_LSM_TREE_NEED_SWITCH)) { - /* - * The "main" thread polls 10 times per second, - * secondary threads once per second. - */ + /* Poll 10 times per second. */ WT_ERR_TIMEDOUT_OK(__wt_cond_wait( - session, lsm_tree->work_cond, - id == 0 ? 100000 : 1000000)); - stallms += (id == 0) ? 100 : 1000; - - /* - * Start creating Bloom filters once enough time has - * passed that we should have filled a chunk (or 1 - * second if we don't have an estimate). - */ - try_bloom = (stallms > (lsm_tree->chunk_fill_ms == 0 ? - 1000 : lsm_tree->chunk_fill_ms)); + session, lsm_tree->work_cond, 100000)); + stallms += 100; /* * Get aggressive if more than enough chunks for a * merge should have been created while we waited. - * Use 30 seconds as a default if we don't have an + * Use 10 seconds as a default if we don't have an * estimate. */ chunk_wait = stallms / (lsm_tree->chunk_fill_ms == 0 ? - 30000 : lsm_tree->chunk_fill_ms); + 10000 : lsm_tree->chunk_fill_ms); old_aggressive = aggressive; - for (aggressive = 0, chunk_wait /= lsm_tree->merge_min; - chunk_wait > 0; - ++aggressive, chunk_wait /= lsm_tree->merge_min) - ; + aggressive = chunk_wait / lsm_tree->merge_min; if (aggressive > old_aggressive) WT_VERBOSE_ERR(session, lsm, @@ -208,7 +188,7 @@ __lsm_bloom_work(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) WT_DECL_RET; WT_LSM_CHUNK *chunk; WT_LSM_WORKER_COOKIE cookie; - int i; + u_int i; WT_CLEAR(cookie); /* If no work is done, tell our caller by returning WT_NOTFOUND. */ @@ -217,7 +197,7 @@ __lsm_bloom_work(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) WT_RET(__lsm_copy_chunks(session, lsm_tree, &cookie, 0)); /* Create bloom filters in all checkpointed chunks. */ - for (i = (int)cookie.nchunks - 1; i >= 0; i--) { + for (i = 0; i < cookie.nchunks; i++) { chunk = cookie.chunk_array[i]; /* @@ -232,7 +212,8 @@ __lsm_bloom_work(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) /* See if we win the race to switch on the "busy" flag. */ if (WT_ATOMIC_CAS(chunk->bloom_busy, 0, 1)) { - ret = __lsm_bloom_create(session, lsm_tree, chunk); + ret = __lsm_bloom_create( + session, lsm_tree, chunk, (u_int)i); chunk->bloom_busy = 0; break; } @@ -431,15 +412,14 @@ err: __lsm_unpin_chunks(session, &cookie); * checkpointed but not yet been merged. */ static int -__lsm_bloom_create( - WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, WT_LSM_CHUNK *chunk) +__lsm_bloom_create(WT_SESSION_IMPL *session, + WT_LSM_TREE *lsm_tree, WT_LSM_CHUNK *chunk, u_int chunk_off) { WT_BLOOM *bloom; WT_CURSOR *src; WT_DECL_RET; WT_ITEM buf, key; WT_SESSION *wt_session; - const char *cur_cfg[3]; uint64_t insert_count; int exist; @@ -474,10 +454,10 @@ __lsm_bloom_create( lsm_tree->bloom_config, chunk->count, lsm_tree->bloom_bit_count, lsm_tree->bloom_hash_count, &bloom)); - cur_cfg[0] = WT_CONFIG_BASE(session, session_open_cursor); - cur_cfg[1] = "checkpoint=" WT_CHECKPOINT ",raw"; - cur_cfg[2] = NULL; - WT_ERR(__wt_open_cursor(session, chunk->uri, NULL, cur_cfg, &src)); + /* Open a special merge cursor just on this chunk. */ + WT_ERR(__wt_open_cursor(session, lsm_tree->name, NULL, NULL, &src)); + F_SET(src, WT_CURSTD_RAW); + WT_ERR(__wt_clsm_init_merge(src, chunk_off, chunk->id, 1)); F_SET(session, WT_SESSION_NO_CACHE); for (insert_count = 0; (ret = src->next(src)) == 0; insert_count++) { @@ -486,11 +466,16 @@ __lsm_bloom_create( } WT_ERR_NOTFOUND_OK(ret); WT_TRET(src->close(src)); - F_CLR(session, WT_SESSION_NO_CACHE); WT_TRET(__wt_bloom_finalize(bloom)); WT_ERR(ret); + F_CLR(session, WT_SESSION_NO_CACHE); + + /* Load the new Bloom filter into cache. */ + WT_CLEAR(key); + WT_ERR_NOTFOUND_OK(__wt_bloom_get(bloom, &key)); + WT_VERBOSE_ERR(session, lsm, "LSM worker created bloom filter %s. " "Expected %" PRIu64 " items, got %" PRIu64, @@ -498,8 +483,8 @@ __lsm_bloom_create( /* Ensure the bloom filter is in the metadata. */ WT_ERR(__wt_lsm_tree_lock(session, lsm_tree, 1)); - ret = __wt_lsm_meta_write(session, lsm_tree); F_SET(chunk, WT_LSM_CHUNK_BLOOM); + ret = __wt_lsm_meta_write(session, lsm_tree); ++lsm_tree->dsk_gen; WT_TRET(__wt_lsm_tree_unlock(session, lsm_tree)); diff --git a/src/meta/meta_ckpt.c b/src/meta/meta_ckpt.c index d036f64b14d..8eba95c5c68 100644 --- a/src/meta/meta_ckpt.c +++ b/src/meta/meta_ckpt.c @@ -367,10 +367,10 @@ int __wt_meta_ckptlist_set(WT_SESSION_IMPL *session, const char *fname, WT_CKPT *ckptbase, WT_LSN *ckptlsn) { - struct timespec ts; WT_CKPT *ckpt; WT_DECL_ITEM(buf); WT_DECL_RET; + time_t secs; int64_t maxorder; const char *sep; @@ -416,8 +416,14 @@ __wt_meta_ckptlist_set(WT_SESSION_IMPL *session, if (F_ISSET(ckpt, WT_CKPT_ADD)) ckpt->order = ++maxorder; - WT_ERR(__wt_epoch(session, &ts)); - ckpt->sec = (uintmax_t)ts.tv_sec; + /* + * XXX + * Assumes a time_t fits into a uintmax_t, which isn't + * guaranteed, a time_t has to be an arithmetic type, + * but not an integral type. + */ + WT_ERR(__wt_seconds(session, &secs)); + ckpt->sec = (uintmax_t)secs; } if (strcmp(ckpt->name, WT_CHECKPOINT) == 0) WT_ERR(__wt_buf_catfmt(session, buf, diff --git a/src/meta/meta_table.c b/src/meta/meta_table.c index 575a72cd7e6..05723cc4aa5 100644 --- a/src/meta/meta_table.c +++ b/src/meta/meta_table.c @@ -65,7 +65,14 @@ __wt_metadata_cursor( WT_ERR(__wt_metadata_open(session)); WT_SET_BTREE_IN_SESSION(session, session->metafile); + + /* + * We use the metadata a lot, so we have a handle cached; lock it and + * increment the in-use counter. + */ WT_ERR(__wt_session_lock_btree(session, 0)); + __wt_session_dhandle_incr_use(session); + ret = __wt_curfile_create(session, NULL, cfg, 0, 0, cursorp); /* Restore the caller's btree. */ diff --git a/src/os_posix/os_time.c b/src/os_posix/os_time.c index 8979d77f818..dd3dbd8512b 100644 --- a/src/os_posix/os_time.c +++ b/src/os_posix/os_time.c @@ -8,6 +8,22 @@ #include "wt_internal.h" /* + * __wt_seconds -- + * Return the seconds since the Epoch. + */ +int +__wt_seconds(WT_SESSION_IMPL *session, time_t *timep) +{ + struct timespec t; + + WT_RET(__wt_epoch(session, &t)); + + *timep = t.tv_sec; + + return (0); +} + +/* * __wt_epoch -- * Return the time since the Epoch. */ diff --git a/src/schema/schema_drop.c b/src/schema/schema_drop.c index 206d1c4056a..bfca3079707 100644 --- a/src/schema/schema_drop.c +++ b/src/schema/schema_drop.c @@ -27,14 +27,6 @@ __drop_file( if (!WT_PREFIX_SKIP(filename, "file:")) return (EINVAL); - if (session->dhandle == NULL && - (ret = __wt_session_get_btree(session, uri, NULL, cfg, - WT_DHANDLE_EXCLUSIVE | WT_DHANDLE_LOCK_ONLY)) != 0) { - if (ret == WT_NOTFOUND || ret == ENOENT) - ret = 0; - return (ret); - } - /* Close all btree handles associated with this file. */ WT_RET(__wt_conn_dhandle_close_all(session, uri)); diff --git a/src/schema/schema_truncate.c b/src/schema/schema_truncate.c index ff5920fdcee..27b4ffebde5 100644 --- a/src/schema/schema_truncate.c +++ b/src/schema/schema_truncate.c @@ -21,9 +21,15 @@ __truncate_file(WT_SESSION_IMPL *session, const char *name) if (!WT_PREFIX_SKIP(filename, "file:")) return (EINVAL); + /* Open and lock the file. */ + WT_RET(__wt_session_get_btree( + session, name, NULL, NULL, WT_DHANDLE_EXCLUSIVE)); + /* Get the allocation size. */ allocsize = S2BT(session)->allocsize; + WT_RET(__wt_session_release_btree(session)); + /* Close any btree handles in the file. */ WT_RET(__wt_conn_dhandle_close_all(session, name)); @@ -97,12 +103,9 @@ __wt_schema_truncate( WT_DECL_RET; const char *tablename; - WT_UNUSED(cfg); tablename = uri; if (WT_PREFIX_MATCH(uri, "file:")) { - WT_RET(__wt_session_get_btree( - session, uri, NULL, NULL, WT_DHANDLE_EXCLUSIVE)); ret = __truncate_file(session, uri); } else if (WT_PREFIX_MATCH(uri, "lsm:")) ret = __wt_lsm_tree_truncate(session, uri, cfg); diff --git a/src/session/session_dhandle.c b/src/session/session_dhandle.c index 8d163fcf288..1a54c37fcef 100644 --- a/src/session/session_dhandle.c +++ b/src/session/session_dhandle.c @@ -8,11 +8,47 @@ #include "wt_internal.h" /* - * __wt_session_add_btree -- - * Add a handle to the session's cache. + * __wt_session_dhandle_incr_use -- + * Increment the session data source's in-use counter. + */ +void +__wt_session_dhandle_incr_use(WT_SESSION_IMPL *session) +{ + WT_DATA_HANDLE *dhandle; + + dhandle = session->dhandle; + + (void)WT_ATOMIC_ADD(dhandle->session_inuse, 1); +} + +/* + * __wt_session_dhandle_decr_use -- + * Decrement the session data source's in-use counter. */ int -__wt_session_add_btree( +__wt_session_dhandle_decr_use(WT_SESSION_IMPL *session) +{ + WT_DATA_HANDLE *dhandle; + WT_DECL_RET; + + dhandle = session->dhandle; + + /* + * Decrement the in-use count on the underlying data-source -- if we're + * the last reference, set the time-of-death timestamp. + */ + WT_ASSERT(session, dhandle->session_inuse > 0); + if (WT_ATOMIC_SUB(dhandle->session_inuse, 1) == 0) + WT_TRET(__wt_seconds(session, &dhandle->timeofdeath)); + return (0); +} + +/* + * __session_add_btree -- + * Add a handle to the session's cache. + */ +static int +__session_add_btree( WT_SESSION_IMPL *session, WT_DATA_HANDLE_CACHE **dhandle_cachep) { WT_DATA_HANDLE_CACHE *dhandle_cache; @@ -112,6 +148,9 @@ __wt_session_release_btree(WT_SESSION_IMPL *session) btree = S2BT(session); dhandle = session->dhandle; + /* Decrement the data-source's in-use counter. */ + WT_ERR(__wt_session_dhandle_decr_use(session)); + if (F_ISSET(dhandle, WT_DHANDLE_DISCARD_CLOSE)) { /* * If configured to discard on last close, attempt to trade our @@ -215,11 +254,27 @@ retry: WT_RET(__wt_meta_checkpoint_last_name( * Discard any session dhandles that are not open. */ static int -__session_dhandle_sweep(WT_SESSION_IMPL *session) +__session_dhandle_sweep(WT_SESSION_IMPL *session, uint32_t flags) { WT_DATA_HANDLE *dhandle; WT_DATA_HANDLE_CACHE *dhandle_cache, *dhandle_cache_next; - WT_DECL_RET; + time_t now; + + /* + * Check the local flag WT_DHANDLE_LOCK_ONLY; a common caller with that + * flag is in the path to discard the handle, don't sweep in that case. + */ + if (LF_ISSET(WT_DHANDLE_LOCK_ONLY)) + return (0); + + /* + * Periodically sweep for dead handles; if we've swept recently, don't + * do it again. + */ + WT_RET(__wt_seconds(session, &now)); + if (now - session->last_sweep < WT_DHANDLE_SWEEP_PERIOD) + return (0); + session->last_sweep = now; WT_STAT_FAST_CONN_INCR(session, dh_session_sweeps); @@ -227,33 +282,30 @@ __session_dhandle_sweep(WT_SESSION_IMPL *session) while (dhandle_cache != NULL) { dhandle_cache_next = SLIST_NEXT(dhandle_cache, l); dhandle = dhandle_cache->dhandle; - if (!F_ISSET(dhandle, WT_DHANDLE_EXCLUSIVE|WT_DHANDLE_OPEN)) { + if (dhandle->session_inuse == 0 && + now - dhandle->timeofdeath > WT_DHANDLE_SWEEP_WAIT) { WT_STAT_FAST_CONN_INCR(session, dh_session_handles); - WT_TRET(__wt_session_discard_btree( - session, dhandle_cache)); + WT_RET( + __wt_session_discard_btree(session, dhandle_cache)); } dhandle_cache = dhandle_cache_next; } - return (ret); + return (0); } /* * __session_open_btree -- - * Wrapper function to first sweep the session and then get the btree. - * Sweeping is only called when a session notices it has dead dhandles on - * its session dhandle list. Must be called with schema lock. + * Wrapper function to first sweep the session handles and then get the + * btree handle; must be called with schema lock. */ static int __session_open_btree(WT_SESSION_IMPL *session, - const char *name, const char *ckpt, const char *op_cfg[], - int dead, uint32_t flags) + const char *name, const char *ckpt, const char *op_cfg[], uint32_t flags) { - WT_DECL_RET; + WT_RET(__session_dhandle_sweep(session, flags)); + WT_RET(__wt_conn_btree_get(session, name, ckpt, op_cfg, flags)); - if (dead) - WT_TRET(__session_dhandle_sweep(session)); - WT_TRET(__wt_conn_btree_get(session, name, ckpt, op_cfg, flags)); - return (ret); + return (0); } /* @@ -268,23 +320,14 @@ __wt_session_get_btree(WT_SESSION_IMPL *session, WT_DATA_HANDLE_CACHE *dhandle_cache; WT_DECL_RET; uint64_t hash; - int candidate, dead; + int candidate; dhandle = NULL; - candidate = dead = 0; + candidate = 0; hash = __wt_hash_city64(uri, strlen(uri)); SLIST_FOREACH(dhandle_cache, &session->dhandles, l) { dhandle = dhandle_cache->dhandle; - /* - * We check the local flag WT_DHANDLE_LOCK_ONLY in addition - * to the dhandle flags. A common caller with the flag - * is from the path to discard the handle, so we ignore the - * optimization to sweep in that case. - */ - if (!LF_ISSET(WT_DHANDLE_LOCK_ONLY) && - !F_ISSET(dhandle, WT_DHANDLE_EXCLUSIVE|WT_DHANDLE_OPEN)) - dead = 1; if (hash != dhandle->name_hash || strcmp(uri, dhandle->name) != 0) continue; @@ -300,8 +343,8 @@ __wt_session_get_btree(WT_SESSION_IMPL *session, session->dhandle = dhandle; /* - * Try to lock the file; if we succeed, our "exclusive" - * state must match. + * Try to lock the file; if we succeed, our "exclusive" state + * must match. */ ret = __wt_session_lock_btree(session, flags); if (ret == WT_NOTFOUND) @@ -314,20 +357,21 @@ __wt_session_get_btree(WT_SESSION_IMPL *session, /* * If we don't already hold the schema lock, get it now so that * we can find and/or open the handle. We call a wrapper - * function that will optionally sweep the handle list to - * remove any dead handles. + * function to sweep the handle list to remove any dead handles. */ WT_WITH_SCHEMA_LOCK(session, ret = - __session_open_btree( - session, uri, checkpoint, cfg, dead, flags)); + __session_open_btree(session, uri, checkpoint, cfg, flags)); WT_RET(ret); if (!candidate) - WT_RET(__wt_session_add_btree(session, NULL)); + WT_RET(__session_add_btree(session, NULL)); WT_ASSERT(session, LF_ISSET(WT_DHANDLE_LOCK_ONLY) || F_ISSET(session->dhandle, WT_DHANDLE_OPEN)); } + /* Increment the data-source's in-use counter. */ + __wt_session_dhandle_incr_use(session); + WT_ASSERT(session, LF_ISSET(WT_DHANDLE_EXCLUSIVE) == F_ISSET(session->dhandle, WT_DHANDLE_EXCLUSIVE)); F_SET(session->dhandle, LF_ISSET(WT_DHANDLE_DISCARD_CLOSE)); diff --git a/src/txn/txn_ckpt.c b/src/txn/txn_ckpt.c index 7be84f3d0f8..04817958d97 100644 --- a/src/txn/txn_ckpt.c +++ b/src/txn/txn_ckpt.c @@ -476,7 +476,7 @@ __checkpoint_worker( */ if ((ret = __wt_meta_ckptlist_get( session, dhandle->name, &ckptbase)) == WT_NOTFOUND) { - WT_ASSERT(session, session->dhandle->refcnt == 0); + WT_ASSERT(session, session->dhandle->session_ref == 0); return (__wt_bt_cache_op( session, NULL, WT_SYNC_DISCARD_NOWRITE)); } diff --git a/test/format/config.h b/test/format/config.h index bcbaef13865..9f8ab37c7bd 100644 --- a/test/format/config.h +++ b/test/format/config.h @@ -72,8 +72,8 @@ typedef struct { static CONFIG c[] = { { "firstfit", - "if allocation is firstfit", /* 20% */ - 0x0, C_BOOL, 20, 0, 0, &g.c_firstfit, NULL }, + "if allocation is firstfit", /* 10% */ + 0x0, C_BOOL, 10, 0, 0, &g.c_firstfit, NULL }, { "bitcnt", "number of bits for fixed-length column-store files", @@ -83,6 +83,10 @@ static CONFIG c[] = { "size of the cache in MB", 0x0, 0x0, 1, 100, 1024, &g.c_cache, NULL }, + { "compaction", + "if compaction is running", /* 10% */ + 0x0, C_BOOL, 10, 0, 0, &g.c_compact, NULL }, + { "compression", "type of compression (none | bzip | lzo | raw | snappy)", 0x0, C_IGNORE|C_STRING, 1, 5, 5, NULL, &g.c_compression }, diff --git a/test/format/format.h b/test/format/format.h index df6fc87a17b..276c8a2a8bc 100644 --- a/test/format/format.h +++ b/test/format/format.h @@ -136,6 +136,7 @@ typedef struct { u_int c_bitcnt; /* Config values */ u_int c_cache; + u_int c_compact; char *c_compression; char *c_config_open; u_int c_data_extend; @@ -224,6 +225,7 @@ void wts_load(void); void wts_open(const char *, int, WT_CONNECTION **); void wts_ops(void); uint32_t wts_rand(void); +void wts_rand_init(void); void wts_read_scan(void); void wts_salvage(void); void wts_stats(void); diff --git a/test/format/ops.c b/test/format/ops.c index 6b43a5dd20b..d38568523d5 100644 --- a/test/format/ops.c +++ b/test/format/ops.c @@ -91,7 +91,7 @@ wts_ops(void) if ((ret = pthread_create(&backup_tid, NULL, hot_backup, NULL)) != 0) die(ret, "pthread_create"); - if ((ret = + if (g.c_compact && (ret = pthread_create(&compact_tid, NULL, compact, NULL)) != 0) die(ret, "pthread_create"); @@ -130,7 +130,8 @@ wts_ops(void) /* Wait for the backup, compaction thread. */ g.threads_finished = 1; (void)pthread_join(backup_tid, NULL); - (void)pthread_join(compact_tid, NULL); + if (g.c_compact) + (void)pthread_join(compact_tid, NULL); } if (g.logging != 0) { diff --git a/test/format/t.c b/test/format/t.c index 69b0a4c5c8d..7fdc25af3f7 100644 --- a/test/format/t.c +++ b/test/format/t.c @@ -125,6 +125,13 @@ main(int argc, char *argv[]) /* Clean up on signal. */ (void)signal(SIGINT, onint); + /* + * Initialize the random number generator (don't reinitialize on each + * new run, reinitializing won't be more random than continuing on from + * the current state). + */ + wts_rand_init(); + printf("%s: process %" PRIdMAX "\n", g.progname, (intmax_t)getpid()); while (++g.run_cnt <= g.c_runs || g.c_runs == 0 ) { startup(); /* Start a run */ diff --git a/test/format/util.c b/test/format/util.c index e1f52b77b8e..39a105fb9fc 100644 --- a/test/format/util.c +++ b/test/format/util.c @@ -201,6 +201,18 @@ track(const char *tag, uint64_t cnt, TINFO *tinfo) } /* + * wts_rand_init -- + * Initialize the random number generator. + */ +void +wts_rand_init(void) +{ + /* Seed the random number generator. */ + if (!g.replay) + srand((u_int)(0xdeadbeef ^ (u_int)time(NULL))); +} + +/* * wts_rand -- * Return a random number. */ @@ -243,12 +255,6 @@ wts_rand(void) if ((g.rand_log = fopen("RUNDIR/rand", "w")) == NULL) die(errno, "fopen: RUNDIR/rand"); (void)setvbuf(g.rand_log, NULL, _IOLBF, 0); - - /* - * Seed the random number generator for each new run (we - * know it's a new run when we re-open the log file). - */ - srand((u_int)(0xdeadbeef ^ (u_int)time(NULL))); } r = (uint32_t)rand(); fprintf(g.rand_log, "%" PRIu32 "\n", r); diff --git a/test/suite/test_cursor03.py b/test/suite/test_cursor03.py index 61b2711a3c2..362b00ebf60 100644 --- a/test/suite/test_cursor03.py +++ b/test/suite/test_cursor03.py @@ -48,8 +48,8 @@ class test_cursor03(TestCursorTracker): ('col.val10k', dict(tablekind='col', keysize=None, valsize=[10, 10000], uri='table')), ('row.keyval10k', dict(tablekind='row', keysize=[10,10000], valsize=[10, 10000], uri='table')), ], [ - ('count1000', dict(tablecount=1000,cache_size=25*1024*1024)), - ('count10000', dict(tablecount=10000, cache_size=64*1024*1024)) + ('count1000', dict(tablecount=1000)), + ('count10000', dict(tablecount=10000)) ]) def create_session_and_cursor(self): @@ -68,12 +68,6 @@ class test_cursor03(TestCursorTracker): self.cur_initial_conditions(self.table_name1, self.tablecount, self.tablekind, self.keysize, self.valsize, self.uri) return self.session.open_cursor(tablearg, None, 'append') - def setUpConnectionOpen(self, dir): - wtopen_args = 'create,cache_size=' + str(self.cache_size) - conn = wiredtiger.wiredtiger_open(dir, wtopen_args) - self.pr(`conn`) - return conn - def test_multiple_remove(self): """ Test multiple deletes at the same place diff --git a/test/suite/test_txn02.py b/test/suite/test_txn02.py index bc024dfa531..96c1d52ff65 100644 --- a/test/suite/test_txn02.py +++ b/test/suite/test_txn02.py @@ -29,14 +29,19 @@ # Transactions: commits and rollbacks # -import os, shutil +import fnmatch, os, shutil +from suite_subprocess import suite_subprocess from wiredtiger import wiredtiger_open from wtscenario import multiply_scenarios, number_scenarios import wttest -class test_txn02(wttest.WiredTigerTestCase): +class test_txn02(wttest.WiredTigerTestCase, suite_subprocess): + logmax = "100K" tablename = 'test_txn02' uri = 'table:' + tablename + archive_list = ['true', 'false'] + conn_list = ['reopen', 'stay_open'] + sync_list = ['dsync', 'fsync', 'none'] types = [ ('row', dict(tabletype='row', @@ -72,15 +77,22 @@ class test_txn02(wttest.WiredTigerTestCase): txn4s = [('t4c', dict(txn4='commit')), ('t4r', dict(txn4='rollback'))] scenarios = number_scenarios(multiply_scenarios('.', types, - op1s, txn1s, op2s, txn2s, op3s, txn3s, op4s, txn4s)) - + op1s, txn1s, op2s, txn2s, op3s, txn3s, op4s, txn4s)) + # scenarios = number_scenarios(multiply_scenarios('.', types, + # op1s, txn1s, op2s, txn2s, op3s, txn3s, op4s, txn4s)) [:3] # Overrides WiredTigerTestCase def setUpConnectionOpen(self, dir): self.home = dir + # Cycle through the different transaction_sync values in a + # deterministic manner. + self.txn_sync = self.sync_list[ + self.scenario_number % len(self.sync_list)] self.backup_dir = os.path.join(self.home, "WT_BACKUP") - conn = wiredtiger_open(dir, - 'create,log=(enabled,file_max=100K),' + - ('error_prefix="%s: ",' % self.shortid())) + conn_params = 'create,log=(enabled,file_max=%s),' % self.logmax + \ + 'error_prefix="%s: ",' % self.shortid() + \ + 'transaction_sync="%s",' % self.txn_sync + # print "Creating conn at '%s' with config '%s'" % (dir, conn_params) + conn = wiredtiger_open(dir, conn_params) self.pr(`conn`) self.session2 = conn.open_session() return conn @@ -111,51 +123,118 @@ class test_txn02(wttest.WiredTigerTestCase): self.check(self.session2, "isolation=read-committed", committed) self.check(self.session2, "isolation=read-uncommitted", current) - # Opening a clone of the database home directory should see the - # committed results. + # Opening a clone of the database home directory should run + # recovery and see the committed results. wttest.removeAll(self.backup_dir) shutil.copytree(self.home, self.backup_dir) - backup_conn = wiredtiger_open(self.backup_dir, 'log=(enabled)') + backup_conn_params = 'log=(enabled,file_max=%s)' % self.logmax + backup_conn = wiredtiger_open(self.backup_dir, backup_conn_params) try: self.check(backup_conn.open_session(), None, committed) - #self.check(backup_conn.open_session(), None, {}) finally: backup_conn.close() + def check_log(self, committed): + wttest.removeAll(self.backup_dir) + shutil.copytree(self.home, self.backup_dir) + # + # Open and close the backup connection a few times to force + # repeated recovery and log archiving even if later recoveries + # are essentially no-ops. Confirm that the backup contains + # the committed operations after recovery. + # + # Cycle through the different archive values in a + # deterministic manner. + self.archive = self.archive_list[ + self.scenario_number % len(self.archive_list)] + backup_conn_params = \ + 'log=(enabled,file_max=%s,archive=%s)' % (self.logmax, self.archive) + orig_logs = fnmatch.filter(os.listdir(self.backup_dir), "*Log*") + endcount = 2 + count = 0 + while count < endcount: + backup_conn = wiredtiger_open(self.backup_dir, backup_conn_params) + try: + self.check(backup_conn.open_session(), None, committed) + finally: + # Yield so that the archive thread gets a chance to run + # before we close the connection. + yield + backup_conn.close() + count += 1 + # + # Check logs after repeated openings. The first log should + # have been archived if configured. Subsequent openings would not + # archive because no checkpoint is written due to no modifications. + # + cur_logs = fnmatch.filter(os.listdir(self.backup_dir), "*Log*") + for o in orig_logs: + if self.archive == 'true': + self.assertEqual(False, o in cur_logs) + else: + self.assertEqual(True, o in cur_logs) + # + # Run printlog and make sure it exits with zero status. + # + self.runWt(['-h', self.backup_dir, 'printlog'], outfilename='printlog.out') + def test_ops(self): # print "Creating %s with config '%s'" % (self.uri, self.create_params) self.session.create(self.uri, self.create_params) - # Set up the table with entries for 1 and 10 + # Set up the table with entries for 1, 2, 10 and 11. # We use the overwrite config so insert can update as needed. c = self.session.open_cursor(self.uri, None, 'overwrite') c.set_value(1) c.set_key(1) c.insert() + c.set_key(2) + c.insert() c.set_key(10) c.insert() - current = {1:1, 10:1} + c.set_key(11) + c.insert() + current = {1:1, 2:1, 10:1, 11:1} committed = current.copy() + reopen = self.conn_list[ + self.scenario_number % len(self.conn_list)] ops = (self.op1, self.op2, self.op3, self.op4) txns = (self.txn1, self.txn2, self.txn3, self.txn4) + # for ok, txn in zip(ops, txns): # print ', '.join('%s(%d)[%s]' % (ok[0], ok[1], txn) - # for ok, txn in zip(ops, txns)) for i, ot in enumerate(zip(ops, txns)): - self.session.begin_transaction() ok, txn = ot op, k = ok - # print '%s(%d)[%s]' % (ok[0], ok[1], txn) + + # Close and reopen the connection and cursor. + if reopen == 'reopen': + self.reopen_conn() + c = self.session.open_cursor(self.uri, None, 'overwrite') + + self.session.begin_transaction() + # Test multiple operations per transaction by always + # doing the same operation on key k + 1. + k1 = k + 1 + # print '%d: %s(%d)[%s]' % (i, ok[0], ok[1], txn) if op == 'insert' or op == 'update': - c.set_key(k) c.set_value(i + 2) + c.set_key(k) + c.insert() + c.set_key(k1) c.insert() current[k] = i + 2 + current[k1] = i + 2 elif op == 'remove': c.set_key(k) c.remove() + c.set_key(k1) + c.remove() if k in current: del current[k] + if k1 in current: + del current[k1] + # print current # Check the state after each operation. self.check_all(current, committed) @@ -169,5 +248,9 @@ class test_txn02(wttest.WiredTigerTestCase): # Check the state after each commit/rollback. self.check_all(current, committed) + # Check the log state after the entire op completes + # and run recovery. + self.check_log(committed) + if __name__ == '__main__': wttest.run() diff --git a/test/suite/test_txn04.py b/test/suite/test_txn04.py new file mode 100644 index 00000000000..74c14926f07 --- /dev/null +++ b/test/suite/test_txn04.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python +# +# Public Domain 2008-2013 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# test_txn04.py +# Transactions: hot backup and recovery +# + +import os +from suite_subprocess import suite_subprocess +from wiredtiger import wiredtiger_open +from wtscenario import multiply_scenarios, number_scenarios +import wttest + +class test_txn04(wttest.WiredTigerTestCase, suite_subprocess): + logmax = "100K" + tablename = 'test_txn04' + uri = 'table:' + tablename + sync_list = ['dsync', 'fsync', 'none'] + + types = [ + ('row', dict(tabletype='row', + create_params = 'key_format=i,value_format=i')), + ('var', dict(tabletype='var', + create_params = 'key_format=r,value_format=i')), + ('fix', dict(tabletype='fix', + create_params = 'key_format=r,value_format=8t')), + ] + op1s = [ + ('insert', dict(op1=('insert', 6))), + ('update', dict(op1=('update', 2))), + ('remove', dict(op1=('remove', 2))), + ('trunc-stop', dict(op1=('stop', 2))), + ] + txn1s = [('t1c', dict(txn1='commit')), ('t1r', dict(txn1='rollback'))] + + scenarios = number_scenarios(multiply_scenarios('.', types, op1s, txn1s)) + # Overrides WiredTigerTestCase + def setUpConnectionOpen(self, dir): + self.home = dir + # Cycle through the different transaction_sync values in a + # deterministic manner. + self.txn_sync = self.sync_list[ + self.scenario_number % len(self.sync_list)] + self.backup_dir = os.path.join(self.home, "WT_BACKUP") + # Set archive false on the home directory. + conn_params = 'log=(archive=false,enabled,file_max=%s),' % self.logmax + \ + 'create,error_prefix="%s: ",' % self.shortid() + \ + 'transaction_sync="%s",' % self.txn_sync + # print "Creating conn at '%s' with config '%s'" % (dir, conn_params) + conn = wiredtiger_open(dir, conn_params) + self.pr(`conn`) + self.session2 = conn.open_session() + return conn + + # Check that a cursor (optionally started in a new transaction), sees the + # expected values. + def check(self, session, txn_config, expected): + if txn_config: + session.begin_transaction(txn_config) + c = session.open_cursor(self.uri, None) + actual = dict((k, v) for k, v in c if v != 0) + # Search for the expected items as well as iterating + for k, v in expected.iteritems(): + self.assertEqual(c[k], v) + c.close() + if txn_config: + session.commit_transaction() + self.assertEqual(actual, expected) + + # Check the state of the system with respect to the current cursor and + # different isolation levels. + def check_all(self, current, committed): + # Transactions see their own changes. + # Read-uncommitted transactions see all changes. + # Snapshot and read-committed transactions should not see changes. + self.check(self.session, None, current) + self.check(self.session2, "isolation=snapshot", committed) + self.check(self.session2, "isolation=read-committed", committed) + self.check(self.session2, "isolation=read-uncommitted", current) + + def hot_backup(self, backup_uri, committed): + # If we are backing up a target, assume the directory exists. + # We just use the wt backup command. + # A future test extension could also use a cursor. + cmd = '-h ' + self.home + ' backup ' + if backup_uri != None: + cmd += '-t ' + backup_uri + ' ' + else: + wttest.removeAll(self.backup_dir) + os.mkdir(self.backup_dir) + + cmd += self.backup_dir + self.runWt(cmd.split()) + self.exception='false' + backup_conn_params = 'log=(enabled,file_max=%s)' % self.logmax + backup_conn = wiredtiger_open(self.backup_dir, backup_conn_params) + try: + self.check(backup_conn.open_session(), None, committed) + except: + self.exception='true' + finally: + backup_conn.close() + + def test_ops(self): + self.session.create(self.uri, self.create_params) + c = self.session.open_cursor(self.uri, None, 'overwrite') + # Set up the table with entries for 1-5. + # We then truncate starting or ending in various places. + # We use the overwrite config so insert can update as needed. + current = {1:1, 2:1, 3:1, 4:1, 5:1} + c.set_value(1) + for k in current: + c.set_key(k) + c.insert() + committed = current.copy() + + ops = (self.op1, ) + txns = (self.txn1, ) + for i, ot in enumerate(zip(ops, txns)): + # Perform a full hot backup of the original tables. + # The runWt command closes our connection and sessions so + # we need to reopen them here. + self.hot_backup(None, committed) + self.assertEqual(True, self.exception == 'false') + c = self.session.open_cursor(self.uri, None, 'overwrite') + c.set_value(1) + # Then do the given modification. + # Perform a targeted hot backup. + self.session.begin_transaction() + ok, txn = ot + op, k = ok + + # print '%d: %s(%d)[%s]' % (i, ok[0], ok[1], txn) + if op == 'insert' or op == 'update': + c.set_value(i + 2) + c.set_key(k) + c.insert() + current[k] = i + 2 + elif op == 'remove': + c.set_key(k) + c.remove() + if k in current: + del current[k] + elif op == 'stop': + # For both, the key given is the start key. Add 2 + # for the stop key. + c.set_key(k) + kstart = 1 + kstop = k + self.session.truncate(None, None, c, None) + while (kstart <= kstop): + del current[kstart] + kstart += 1 + + # print current + # Check the state after each operation. + self.check_all(current, committed) + + if txn == 'commit': + committed = current.copy() + self.session.commit_transaction() + elif txn == 'rollback': + current = committed.copy() + self.session.rollback_transaction() + + # Check the state after each commit/rollback. + self.check_all(current, committed) + + # Backup the target we modified. We expect that running + # recovery now will generate an exception if we committed. + # print 'Call hot_backup with ' + self.uri + self.hot_backup(self.uri, committed) + if txn == 'commit': + self.assertEqual(True, self.exception == 'true') + else: + self.assertEqual(True, self.exception == 'false') + +if __name__ == '__main__': + wttest.run() diff --git a/test/suite/test_txn05.py b/test/suite/test_txn05.py new file mode 100644 index 00000000000..e017aab9cb0 --- /dev/null +++ b/test/suite/test_txn05.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python +# +# Public Domain 2008-2013 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# test_txn05.py +# Transactions: commits and rollbacks +# + +import fnmatch, os, shutil +from suite_subprocess import suite_subprocess +from wiredtiger import wiredtiger_open +from wtscenario import multiply_scenarios, number_scenarios +import wttest + +class test_txn05(wttest.WiredTigerTestCase, suite_subprocess): + logmax = "100K" + tablename = 'test_txn05' + uri = 'table:' + tablename + archive_list = ['true', 'false'] + sync_list = ['dsync', 'fsync', 'none'] + + types = [ + ('row', dict(tabletype='row', + create_params = 'key_format=i,value_format=i')), + ('var', dict(tabletype='var', + create_params = 'key_format=r,value_format=i')), + ('fix', dict(tabletype='fix', + create_params = 'key_format=r,value_format=8t')), + ] + op1s = [ + ('trunc-all', dict(op1=('all', 0))), + ('trunc-both', dict(op1=('both', 2))), + ('trunc-start', dict(op1=('start', 2))), + ('trunc-stop', dict(op1=('stop', 2))), + ] + txn1s = [('t1c', dict(txn1='commit')), ('t1r', dict(txn1='rollback'))] + + scenarios = number_scenarios(multiply_scenarios('.', types, op1s, txn1s)) + # scenarios = number_scenarios(multiply_scenarios('.', types, op1s, txn1s))[:3] + # Overrides WiredTigerTestCase + def setUpConnectionOpen(self, dir): + self.home = dir + # Cycle through the different transaction_sync values in a + # deterministic manner. + self.txn_sync = self.sync_list[ + self.scenario_number % len(self.sync_list)] + self.backup_dir = os.path.join(self.home, "WT_BACKUP") + conn_params = 'create,log=(enabled,file_max=%s),' % self.logmax + \ + 'error_prefix="%s: ",' % self.shortid() + \ + 'transaction_sync="%s",' % self.txn_sync + # print "Creating conn at '%s' with config '%s'" % (dir, conn_params) + conn = wiredtiger_open(dir, conn_params) + self.pr(`conn`) + self.session2 = conn.open_session() + return conn + + # Check that a cursor (optionally started in a new transaction), sees the + # expected values. + def check(self, session, txn_config, expected): + if txn_config: + session.begin_transaction(txn_config) + c = session.open_cursor(self.uri, None) + actual = dict((k, v) for k, v in c if v != 0) + # Search for the expected items as well as iterating + for k, v in expected.iteritems(): + self.assertEqual(c[k], v) + c.close() + if txn_config: + session.commit_transaction() + self.assertEqual(actual, expected) + + # Check the state of the system with respect to the current cursor and + # different isolation levels. + def check_all(self, current, committed): + # Transactions see their own changes. + # Read-uncommitted transactions see all changes. + # Snapshot and read-committed transactions should not see changes. + self.check(self.session, None, current) + self.check(self.session2, "isolation=snapshot", committed) + self.check(self.session2, "isolation=read-committed", committed) + self.check(self.session2, "isolation=read-uncommitted", current) + + # Opening a clone of the database home directory should run + # recovery and see the committed results. + wttest.removeAll(self.backup_dir) + shutil.copytree(self.home, self.backup_dir) + backup_conn_params = 'log=(enabled,file_max=%s)' % self.logmax + backup_conn = wiredtiger_open(self.backup_dir, backup_conn_params) + try: + self.check(backup_conn.open_session(), None, committed) + finally: + backup_conn.close() + + def check_log(self, committed): + wttest.removeAll(self.backup_dir) + shutil.copytree(self.home, self.backup_dir) + # + # Open and close the backup connection a few times to force + # repeated recovery and log archiving even if later recoveries + # are essentially no-ops. Confirm that the backup contains + # the committed operations after recovery. + # + # Cycle through the different archive values in a + # deterministic manner. + self.archive = self.archive_list[ + self.scenario_number % len(self.archive_list)] + backup_conn_params = \ + 'log=(enabled,file_max=%s,archive=%s)' % (self.logmax, self.archive) + orig_logs = fnmatch.filter(os.listdir(self.backup_dir), "*Log*") + endcount = 2 + count = 0 + while count < endcount: + backup_conn = wiredtiger_open(self.backup_dir, backup_conn_params) + try: + self.check(backup_conn.open_session(), None, committed) + finally: + # Let other threads like archive run before closing. + yield + backup_conn.close() + count += 1 + # + # Check logs after repeated openings. The first log should + # have been archived if configured. Subsequent openings would not + # archive because no checkpoint is written due to no modifications. + # + cur_logs = fnmatch.filter(os.listdir(self.backup_dir), "*Log*") + for o in orig_logs: + if self.archive == 'true': + self.assertEqual(False, o in cur_logs) + else: + self.assertEqual(True, o in cur_logs) + # + # Run printlog and make sure it exits with zero status. + # + self.runWt(['-h', self.backup_dir, 'printlog'], outfilename='printlog.out') + + def test_ops(self): + # print "Creating %s with config '%s'" % (self.uri, self.create_params) + self.session.create(self.uri, self.create_params) + # Set up the table with entries for 1-5. + # We then truncate starting or ending in various places. + c = self.session.open_cursor(self.uri, None) + current = {1:1, 2:1, 3:1, 4:1, 5:1} + c.set_value(1) + for k in current: + c.set_key(k) + c.insert() + committed = current.copy() + + ops = (self.op1, ) + txns = (self.txn1, ) + for i, ot in enumerate(zip(ops, txns)): + self.session.begin_transaction() + ok, txn = ot + # print '%d: %s(%d)[%s]' % (i, ok[0], ok[1], txn) + op, k = ok + + # print '%d: %s(%d)[%s]' % (i, ok[0], ok[1], txn) + if op == 'stop': + c.set_key(k) + self.session.truncate(None, None, c, None) + kstart = 1 + kstop = k + elif op == 'start': + c.set_key(k) + self.session.truncate(None, c, None, None) + kstart = k + kstop = len(current) + elif op == 'both': + c2 = self.session.open_cursor(self.uri, None) + # For both, the key given is the start key. Add 2 + # for the stop key. + kstart = k + kstop = k + 2 + c.set_key(kstart) + c2.set_key(kstop) + self.session.truncate(None, c, c2, None) + c2.close() + elif op == 'all': + c2 = self.session.open_cursor(self.uri, None) + kstart = 1 + kstop = len(current) + c.set_key(kstart) + c2.set_key(kstop) + self.session.truncate(None, c, c2, None) + c2.close() + + while (kstart <= kstop): + del current[kstart] + kstart += 1 + + # print current + # Check the state after each operation. + self.check_all(current, committed) + + if txn == 'commit': + committed = current.copy() + self.session.commit_transaction() + elif txn == 'rollback': + current = committed.copy() + self.session.rollback_transaction() + + # Check the state after each commit/rollback. + self.check_all(current, committed) + + # Check the log state after the entire op completes + # and run recovery. + self.check_log(committed) + +if __name__ == '__main__': + wttest.run() diff --git a/test/suite/wttest.py b/test/suite/wttest.py index f196f61ab9f..22eb808403a 100644 --- a/test/suite/wttest.py +++ b/test/suite/wttest.py @@ -235,8 +235,8 @@ class WiredTigerTestCase(unittest.TestCase): def setUp(self): if not hasattr(self.__class__, 'wt_ntests'): self.__class__.wt_ntests = 0 - self.__class__.wt_ntests += 1 self.testdir = os.path.join(WiredTigerTestCase._parentTestdir, self.className() + '.' + str(self.__class__.wt_ntests)) + self.__class__.wt_ntests += 1 if WiredTigerTestCase._verbose > 2: self.prhead('started in ' + self.testdir, True) self.origcwd = os.getcwd() |