summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Gorrod <alexg@wiredtiger.com>2013-12-06 16:47:36 +1100
committerAlex Gorrod <alexg@wiredtiger.com>2013-12-06 16:47:36 +1100
commit6d4ca8b0e0b9dd4ed637b39868c227d0302ed96b (patch)
tree80d4812e4a5f66e8a456f3f2e9a0fc13fa8c11af
parent7b6b95615a006b57f065debb46977a62f9de1912 (diff)
parent92c17c89340b16eec9f88bc2e5d6fb3791b5b0ec (diff)
downloadmongo-6d4ca8b0e0b9dd4ed637b39868c227d0302ed96b.tar.gz
Merge branch 'develop' into compact-lsm
Conflicts: src/lsm/lsm_merge.c
-rw-r--r--bench/wtperf/runners/evict-lsm.wtperf2
-rw-r--r--bench/wtperf/runners/insert-rmw.wtperf2
-rw-r--r--bench/wtperf/runners/parallel-pop-lsm.wtperf2
-rw-r--r--dist/api_data.py24
-rw-r--r--dist/s_string.ok1
-rw-r--r--src/btree/bt_cursor.c27
-rw-r--r--src/config/config_def.c8
-rw-r--r--src/conn/conn_dhandle.c63
-rw-r--r--src/include/dhandle.h10
-rw-r--r--src/include/extern.h5
-rw-r--r--src/include/lsm.h1
-rw-r--r--src/include/session.h5
-rw-r--r--src/include/wiredtiger.in2
-rw-r--r--src/lsm/lsm_cursor.c5
-rw-r--r--src/lsm/lsm_merge.c50
-rw-r--r--src/lsm/lsm_tree.c43
-rw-r--r--src/lsm/lsm_worker.c85
-rw-r--r--src/meta/meta_ckpt.c12
-rw-r--r--src/meta/meta_table.c7
-rw-r--r--src/os_posix/os_time.c16
-rw-r--r--src/schema/schema_drop.c8
-rw-r--r--src/schema/schema_truncate.c9
-rw-r--r--src/session/session_dhandle.c118
-rw-r--r--src/txn/txn_ckpt.c2
-rw-r--r--test/format/config.h8
-rw-r--r--test/format/format.h2
-rw-r--r--test/format/ops.c5
-rw-r--r--test/format/t.c7
-rw-r--r--test/format/util.c18
-rw-r--r--test/suite/test_cursor03.py10
-rw-r--r--test/suite/test_txn02.py117
-rw-r--r--test/suite/test_txn04.py203
-rw-r--r--test/suite/test_txn05.py234
-rw-r--r--test/suite/wttest.py2
34 files changed, 871 insertions, 242 deletions
diff --git a/bench/wtperf/runners/evict-lsm.wtperf b/bench/wtperf/runners/evict-lsm.wtperf
index da03c2aac64..a5541986668 100644
--- a/bench/wtperf/runners/evict-lsm.wtperf
+++ b/bench/wtperf/runners/evict-lsm.wtperf
@@ -1,6 +1,6 @@
# wtperf options file: evict lsm configuration
conn_config="cache_size=50M"
-table_config="lsm=(chunk_size=2MB),type=lsm"
+table_config="lsm=(merge_threads=2),type=lsm"
icount=10000000
report_interval=5
run_time=120
diff --git a/bench/wtperf/runners/insert-rmw.wtperf b/bench/wtperf/runners/insert-rmw.wtperf
index 742374184b1..50db0baa0d1 100644
--- a/bench/wtperf/runners/insert-rmw.wtperf
+++ b/bench/wtperf/runners/insert-rmw.wtperf
@@ -1,6 +1,6 @@
# wtperf options file: Test the insert-rmw functionality
conn_config="cache_size=500MB"
-table_config="lsm=(chunk_size=5MB),type=lsm"
+table_config="type=lsm"
icount=500000
insert_rmw=true
report_interval=5
diff --git a/bench/wtperf/runners/parallel-pop-lsm.wtperf b/bench/wtperf/runners/parallel-pop-lsm.wtperf
index d91feb9aaf7..9cc16c8ea9f 100644
--- a/bench/wtperf/runners/parallel-pop-lsm.wtperf
+++ b/bench/wtperf/runners/parallel-pop-lsm.wtperf
@@ -1,7 +1,7 @@
# wtperf options file: Run populate thread multi-threaded and with groups
# of operations in each transaction.
conn_config="cache_size=200MB"
-table_config="lsm=(chunk_size=1M),type=lsm"
+table_config="lsm=(merge_threads=2),type=lsm"
transaction_config="isolation=snapshot"
icount=10000000
report_interval=5
diff --git a/dist/api_data.py b/dist/api_data.py
index 80e3f6c76a3..e1c69d178df 100644
--- a/dist/api_data.py
+++ b/dist/api_data.py
@@ -119,11 +119,11 @@ lsm_config = [
type='boolean'),
Config('chunk_max', '5GB', r'''
the maximum size a single chunk can be. Chunks larger than this
- size are not considered for further merges. This is a soft
- limit, and chunks larger than this value can be created. Must
- be larger than chunk_size''',
+ size are not considered for further merges. This is a soft
+ limit, and chunks larger than this value can be created. Must
+ be larger than chunk_size''',
min='100MB', max='10TB'),
- Config('chunk_size', '2MB', r'''
+ Config('chunk_size', '10MB', r'''
the maximum size of the in-memory chunk of an LSM tree''',
min='512K', max='500MB'),
Config('merge_max', '15', r'''
@@ -180,11 +180,11 @@ file_config = format_meta + [
Config('huffman_key', '', r'''
configure Huffman encoding for keys. Permitted values
are empty (off), \c "english", \c "utf8<file>" or \c
- "utf16<file>". See @ref huffman for more information'''),
+ "utf16<file>". See @ref huffman for more information'''),
Config('huffman_value', '', r'''
- configure Huffman encoding for values. Permitted values
+ configure Huffman encoding for values. Permitted values
are empty (off), \c "english", \c "utf8<file>" or \c
- "utf16<file>". See @ref huffman for more information'''),
+ "utf16<file>". See @ref huffman for more information'''),
Config('internal_key_truncate', 'true', r'''
configure internal key truncation, discarding unnecessary
trailing bytes on internal keys (ignored for custom
@@ -238,7 +238,7 @@ file_config = format_meta + [
from this object are read or written into the buffer cache''',
min=0),
Config('os_cache_dirty_max', '0', r'''
- maximum dirty system buffer cache usage, in bytes. If non-zero,
+ maximum dirty system buffer cache usage, in bytes. If non-zero,
schedule writes for dirty blocks belonging to this object in the
system buffer cache after that many bytes from this object are
written into the buffer cache''',
@@ -399,7 +399,7 @@ methods = {
'session.create' :
Method(table_only_meta + file_config + lsm_config + source_meta + [
Config('exclusive', 'false', r'''
- fail if the object exists. When false (the default), if the
+ fail if the object exists. When false (the default), if the
object exists, check that its settings match the specified
configuration''',
type='boolean'),
@@ -570,7 +570,7 @@ methods = {
must match ::wiredtiger_extension_init'''),
Config('terminate', 'wiredtiger_extension_terminate', r'''
an optional function in the extension that is called before
- the extension is unloaded during WT_CONNECTION::close. The
+ the extension is unloaded during WT_CONNECTION::close. The
signature of the function must match
::wiredtiger_extension_terminate'''),
]),
@@ -618,7 +618,7 @@ methods = {
Config('file_extend', '', r'''
file extension configuration. If set, extend files of the set
type in allocations of the set size, instead of a block at a
- time as each new block is written. For example,
+ time as each new block is written. For example,
<code>file_extend=(data=16MB)</code>''',
type='list', choices=['data', 'log']),
Config('hazard_max', '1000', r'''
@@ -659,7 +659,7 @@ methods = {
min='1'),
Config('statistics_log', '', r'''
log any statistics the database is configured to maintain,
- to a file. See @ref statistics for more information''',
+ to a file. See @ref statistics for more information''',
type='category', subconfig=[
Config('path', '"WiredTigerStat.%H"', r'''
the pathname to a file into which the log records are written,
diff --git a/dist/s_string.ok b/dist/s_string.ok
index a66518d57d5..6e9d23242da 100644
--- a/dist/s_string.ok
+++ b/dist/s_string.ok
@@ -866,6 +866,7 @@ typedef
uB
uid
uint
+uintmax
unbare
uncompressing
uncompresssed
diff --git a/src/btree/bt_cursor.c b/src/btree/bt_cursor.c
index 0ba899215ee..4c5a921a8f7 100644
--- a/src/btree/bt_cursor.c
+++ b/src/btree/bt_cursor.c
@@ -49,7 +49,8 @@ __cursor_fix_implicit(WT_BTREE *btree, WT_CURSOR_BTREE *cbt)
/*
* __cursor_invalid --
* Return if the cursor references an invalid K/V pair (either the pair
- * doesn't exist at all because the tree is empty, or the pair was deleted).
+ * doesn't exist at all because the tree is empty, or the pair was
+ * deleted).
*/
static inline int
__cursor_invalid(WT_CURSOR_BTREE *cbt)
@@ -68,8 +69,23 @@ __cursor_invalid(WT_CURSOR_BTREE *cbt)
session = (WT_SESSION_IMPL *)cbt->iface.session;
/* If we found an insert list entry with a visible update, use it. */
- if (ins != NULL && (upd = __wt_txn_read(session, ins->upd)) != NULL)
- return (WT_UPDATE_DELETED_ISSET(upd) ? 1 : 0);
+ if (ins != NULL) {
+ if ((upd = __wt_txn_read(session, ins->upd)) != NULL)
+ return (WT_UPDATE_DELETED_ISSET(upd) ? 1 : 0);
+
+ /* Do we have a position on the page? */
+ switch (btree->type) {
+ case BTREE_COL_FIX:
+ if (cbt->recno >= page->u.col_fix.recno + page->entries)
+ return (1);
+ break;
+ case BTREE_COL_VAR:
+ case BTREE_ROW:
+ if (cbt->slot > page->entries)
+ return (1);
+ break;
+ }
+ }
/* The page may be empty, the search routine doesn't check. */
if (page->entries == 0)
@@ -81,9 +97,8 @@ __cursor_invalid(WT_CURSOR_BTREE *cbt)
break;
case BTREE_COL_VAR:
cip = &page->u.col_var.d[cbt->slot];
- if ((cell = WT_COL_PTR(page, cip)) == NULL)
- return (WT_NOTFOUND);
- if (__wt_cell_type(cell) == WT_CELL_DEL)
+ if ((cell = WT_COL_PTR(page, cip)) == NULL ||
+ __wt_cell_type(cell) == WT_CELL_DEL)
return (1);
break;
case BTREE_ROW:
diff --git a/src/config/config_def.c b/src/config/config_def.c
index b7820d7d2ec..056f2218af1 100644
--- a/src/config/config_def.c
+++ b/src/config/config_def.c
@@ -371,10 +371,10 @@ static const WT_CONFIG_ENTRY config_entries[] = {
"internal_page_max=4KB,key_format=u,key_gap=10,leaf_item_max=0,"
"leaf_page_max=1MB,lsm=(auto_throttle=,bloom=,bloom_bit_count=16,"
"bloom_config=,bloom_hash_count=8,bloom_oldest=0,chunk_max=5GB,"
- "chunk_size=2MB,merge_max=15,merge_threads=1),memory_page_max=5MB"
- ",os_cache_dirty_max=0,os_cache_max=0,prefix_compression=,"
- "prefix_compression_min=4,source=,split_pct=75,type=file,"
- "value_format=u",
+ "chunk_size=10MB,merge_max=15,merge_threads=1),"
+ "memory_page_max=5MB,os_cache_dirty_max=0,os_cache_max=0,"
+ "prefix_compression=,prefix_compression_min=4,source=,"
+ "split_pct=75,type=file,value_format=u",
confchk_session_create
},
{ "session.drop",
diff --git a/src/conn/conn_dhandle.c b/src/conn/conn_dhandle.c
index 3e2f9bf55f1..a734ceb9a0a 100644
--- a/src/conn/conn_dhandle.c
+++ b/src/conn/conn_dhandle.c
@@ -103,7 +103,7 @@ __conn_dhandle_get(WT_SESSION_IMPL *session,
strcmp(ckpt, dhandle->checkpoint) == 0))) {
WT_RET(__conn_dhandle_open_lock(
session, dhandle, flags));
- ++dhandle->refcnt;
+ ++dhandle->session_ref;
session->dhandle = dhandle;
return (0);
}
@@ -116,7 +116,7 @@ __conn_dhandle_get(WT_SESSION_IMPL *session,
WT_RET(__wt_calloc_def(session, 1, &dhandle));
WT_ERR(__wt_rwlock_alloc(session, "btree handle", &dhandle->rwlock));
- dhandle->refcnt = 1;
+ dhandle->session_ref = 1;
dhandle->name_hash = hash;
WT_ERR(__wt_strdup(session, name, &dhandle->name));
@@ -379,7 +379,7 @@ __conn_dhandle_sweep(WT_SESSION_IMPL *session)
while (dhandle != NULL) {
dhandle_next = SLIST_NEXT(dhandle, l);
if (!F_ISSET(dhandle, WT_DHANDLE_OPEN) &&
- dhandle->refcnt == 0) {
+ dhandle->session_ref == 0) {
WT_STAT_FAST_CONN_INCR(session, dh_conn_handles);
SLIST_REMOVE(&conn->dhlh, dhandle, __wt_data_handle, l);
SLIST_INSERT_HEAD(&sweeplh, dhandle, l);
@@ -416,8 +416,18 @@ __wt_conn_btree_get(WT_SESSION_IMPL *session,
WT_DATA_HANDLE *dhandle;
WT_DECL_RET;
- if (S2C(session)->dhandle_dead >= WT_DHANDLE_SWEEP_TRIGGER)
+ /*
+ * If enough handles have been closed recently, sweep for dead handles.
+ *
+ * Don't do this if WT_DHANDLE_LOCK_ONLY is set: as well as avoiding
+ * sweeping in what should be a fast path, this also avoids sweeping
+ * during __wt_conn_dhandle_close_all, because it sets
+ * WT_DHANDLE_LOCK_ONLY.
+ */
+ if (!LF_ISSET(WT_DHANDLE_LOCK_ONLY) &&
+ S2C(session)->dhandle_dead >= WT_DHANDLE_SWEEP_TRIGGER)
WT_RET(__conn_dhandle_sweep(session));
+
WT_RET(__conn_dhandle_get(session, name, ckpt, flags));
dhandle = session->dhandle;
@@ -563,7 +573,7 @@ __wt_conn_btree_close(WT_SESSION_IMPL *session, int locked)
/*
* Decrement the reference count and return if still in use.
*/
- if (--dhandle->refcnt > 0)
+ if (--dhandle->session_ref > 0)
return (0);
/*
@@ -610,20 +620,13 @@ int
__wt_conn_dhandle_close_all(WT_SESSION_IMPL *session, const char *name)
{
WT_CONNECTION_IMPL *conn;
- WT_DATA_HANDLE *dhandle, *saved_dhandle;
+ WT_DATA_HANDLE *dhandle;
WT_DECL_RET;
conn = S2C(session);
WT_ASSERT(session, F_ISSET(session, WT_SESSION_SCHEMA_LOCKED));
-
- /*
- * Make sure the caller's handle is tracked, so it will be unlocked
- * even if we failed to get all of the remaining handles we need.
- */
- if ((saved_dhandle = session->dhandle) != NULL &&
- WT_META_TRACKING(session))
- WT_ERR(__wt_meta_track_handle_lock(session, 0));
+ WT_ASSERT(session, session->dhandle == NULL);
SLIST_FOREACH(dhandle, &conn->dhlh, l) {
if (strcmp(dhandle->name, name) != 0)
@@ -631,24 +634,16 @@ __wt_conn_dhandle_close_all(WT_SESSION_IMPL *session, const char *name)
session->dhandle = dhandle;
- /*
- * The caller may have this tree locked to prevent
- * concurrent schema operations.
- */
- if (dhandle == saved_dhandle)
- WT_ASSERT(session,
- F_ISSET(dhandle, WT_DHANDLE_EXCLUSIVE));
- else {
- WT_ERR(__wt_try_writelock(session, dhandle->rwlock));
- F_SET(dhandle, WT_DHANDLE_EXCLUSIVE);
- if (WT_META_TRACKING(session))
- WT_ERR(__wt_meta_track_handle_lock(session, 0));
- }
+ /* Lock the handle exclusively. */
+ WT_ERR(__wt_session_get_btree(session,
+ dhandle->name, dhandle->checkpoint,
+ NULL, WT_DHANDLE_EXCLUSIVE | WT_DHANDLE_LOCK_ONLY));
+ if (WT_META_TRACKING(session))
+ WT_ERR(__wt_meta_track_handle_lock(session, 0));
/*
- * We have an exclusive lock, which means there are no
- * cursors open at this point. Close the handle, if
- * necessary.
+ * We have an exclusive lock, which means there are no cursors
+ * open at this point. Close the handle, if necessary.
*/
if (F_ISSET(dhandle, WT_DHANDLE_OPEN)) {
ret = __wt_meta_track_sub_on(session);
@@ -656,10 +651,9 @@ __wt_conn_dhandle_close_all(WT_SESSION_IMPL *session, const char *name)
ret = __wt_conn_btree_sync_and_close(session);
/*
- * If the close succeeded, drop any locks it
- * acquired. If there was a failure, this
- * function will fail and the whole transaction
- * will be rolled back.
+ * If the close succeeded, drop any locks it acquired.
+ * If there was a failure, this function will fail and
+ * the whole transaction will be rolled back.
*/
if (ret == 0)
ret = __wt_meta_track_sub_off(session);
@@ -668,7 +662,6 @@ __wt_conn_dhandle_close_all(WT_SESSION_IMPL *session, const char *name)
if (!WT_META_TRACKING(session))
WT_TRET(__wt_session_release_btree(session));
- session->dhandle = NULL;
WT_ERR(ret);
}
diff --git a/src/include/dhandle.h b/src/include/dhandle.h
index c620f5d3263..749e54c8f13 100644
--- a/src/include/dhandle.h
+++ b/src/include/dhandle.h
@@ -33,9 +33,17 @@
*/
struct __wt_data_handle {
WT_RWLOCK *rwlock; /* Lock for shared/exclusive ops */
- uint32_t refcnt; /* Sessions using this handle */
SLIST_ENTRY(__wt_data_handle) l;/* Linked list of handles */
+ /*
+ * Sessions caching a connection's data handle will have a non-zero
+ * reference count; sessions using a connection's data handle will
+ * have a non-zero in-use count.
+ */
+ uint32_t session_ref; /* Sessions referencing this handle */
+ int32_t session_inuse; /* Sessions using this handle */
+ time_t timeofdeath; /* Use count went to 0 */
+
uint64_t name_hash; /* Hash of name */
const char *name; /* Object name as a URI */
const char *checkpoint; /* Checkpoint name (or NULL) */
diff --git a/src/include/extern.h b/src/include/extern.h
index 66042de0955..34cc0ae382b 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -1091,6 +1091,7 @@ extern int __wt_thread_create(WT_SESSION_IMPL *session,
void *(*func)(void *),
void *arg);
extern int __wt_thread_join(WT_SESSION_IMPL *session, pthread_t tid);
+extern int __wt_seconds(WT_SESSION_IMPL *session, time_t *timep);
extern int __wt_epoch(WT_SESSION_IMPL *session, struct timespec *tsp);
extern void __wt_yield(void);
extern int __wt_ext_struct_pack(WT_EXTENSION_API *wt_api,
@@ -1284,8 +1285,8 @@ extern int __wt_compact_uri_analyze(WT_SESSION_IMPL *session, const char *uri);
extern int __wt_session_compact( WT_SESSION *wt_session,
const char *uri,
const char *config);
-extern int __wt_session_add_btree( WT_SESSION_IMPL *session,
- WT_DATA_HANDLE_CACHE **dhandle_cachep);
+extern void __wt_session_dhandle_incr_use(WT_SESSION_IMPL *session);
+extern int __wt_session_dhandle_decr_use(WT_SESSION_IMPL *session);
extern int __wt_session_lock_btree(WT_SESSION_IMPL *session, uint32_t flags);
extern int __wt_session_release_btree(WT_SESSION_IMPL *session);
extern int __wt_session_get_btree_ckpt(WT_SESSION_IMPL *session,
diff --git a/src/include/lsm.h b/src/include/lsm.h
index b7aa1df1b24..0614b2d43a3 100644
--- a/src/include/lsm.h
+++ b/src/include/lsm.h
@@ -117,6 +117,7 @@ struct __wt_lsm_tree {
size_t chunk_alloc; /* Space allocated for chunks */
u_int nchunks; /* Number of active chunks */
uint32_t last; /* Last allocated ID */
+ int modified; /* Have there been updates? */
WT_LSM_CHUNK **old_chunks; /* Array of old LSM chunks */
size_t old_alloc; /* Space allocated for old chunks */
diff --git a/src/include/session.h b/src/include/session.h
index b00f26e1345..c24853be074 100644
--- a/src/include/session.h
+++ b/src/include/session.h
@@ -58,7 +58,12 @@ struct __wt_session_impl {
WT_EVENT_HANDLER *event_handler;/* Application's event handlers */
WT_DATA_HANDLE *dhandle; /* Current data handle */
+
+ /* Session handle reference list */
SLIST_HEAD(__dhandles, __wt_data_handle_cache) dhandles;
+#define WT_DHANDLE_SWEEP_WAIT 60 /* Wait before discarding */
+#define WT_DHANDLE_SWEEP_PERIOD 20 /* Only sweep every 20 seconds */
+ time_t last_sweep; /* Last sweep for dead handles */
WT_CURSOR *cursor; /* Current cursor */
/* Cursors closed with the session */
diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in
index 55a258923a2..0d947bf5e91 100644
--- a/src/include/wiredtiger.in
+++ b/src/include/wiredtiger.in
@@ -843,7 +843,7 @@ struct __wt_session {
* between 100MB and 10TB; default \c 5GB.}
* @config{&nbsp;&nbsp;&nbsp;&nbsp;chunk_size, the maximum size of the
* in-memory chunk of an LSM tree., an integer between 512K and 500MB;
- * default \c 2MB.}
+ * default \c 10MB.}
* @config{&nbsp;&nbsp;&nbsp;&nbsp;merge_max, the
* maximum number of chunks to include in a merge operation., an integer
* between 2 and 100; default \c 15.}
diff --git a/src/lsm/lsm_cursor.c b/src/lsm/lsm_cursor.c
index 075faa2ebc4..6c89204da6f 100644
--- a/src/lsm/lsm_cursor.c
+++ b/src/lsm/lsm_cursor.c
@@ -393,8 +393,8 @@ retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) {
* chunk instead.
*/
if (ret == WT_NOTFOUND && F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)) {
- ret = __wt_open_cursor(session, chunk->uri, c,
- NULL, cp);
+ ret = __wt_open_cursor(
+ session, chunk->uri, c, NULL, cp);
if (ret == 0)
F_SET(chunk, WT_LSM_CHUNK_EMPTY);
}
@@ -808,6 +808,7 @@ __clsm_lookup(WT_CURSOR_LSM *clsm)
WT_FORALL_CURSORS(clsm, c, i) {
/* If there is a Bloom filter, see if we can skip the read. */
+ bloom = NULL;
if ((bloom = clsm->blooms[i]) != NULL) {
if (!have_hash) {
WT_ERR(__wt_bloom_hash(
diff --git a/src/lsm/lsm_merge.c b/src/lsm/lsm_merge.c
index c7c436ab801..e8f73ca9e61 100644
--- a/src/lsm/lsm_merge.c
+++ b/src/lsm/lsm_merge.c
@@ -57,7 +57,7 @@ __wt_lsm_merge(
WT_DECL_ITEM(bbuf);
WT_DECL_RET;
WT_ITEM buf, key, value;
- WT_LSM_CHUNK *chunk, *youngest;
+ WT_LSM_CHUNK *chunk, *previous, *youngest;
uint32_t generation, start_id;
uint64_t insert_count, record_count, chunk_size;
u_int dest_id, end_chunk, i, merge_min, nchunks, start_chunk;
@@ -66,18 +66,29 @@ __wt_lsm_merge(
const char *cfg[3];
bloom = NULL;
+ chunk_size = 0;
create_bloom = 0;
dest = src = NULL;
- chunk_size = 0;
start_id = 0;
/*
+ * If the tree is open read-only, be very aggressive. Otherwise, we
+ * can spend a long time waiting for merges to start in read-only
+ * applications.
+ */
+ if (!lsm_tree->modified ||
+ F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING))
+ aggressive = 10;
+ merge_min = (aggressive > 5) ? 2 : lsm_tree->merge_min;
+ max_generation_gap = aggressive > 10 ? 3 : 1;
+
+ /*
* If there aren't any chunks to merge, or some of the chunks aren't
* yet written, we're done. A non-zero error indicates that the worker
* should assume there is no work to do: if there are unwritten chunks,
* the worker should write them immediately.
*/
- if (lsm_tree->nchunks <= 1)
+ if (lsm_tree->nchunks < merge_min)
return (WT_NOTFOUND);
/*
@@ -87,24 +98,14 @@ __wt_lsm_merge(
*/
WT_RET(__wt_lsm_tree_lock(session, lsm_tree, 1));
- /*
- * If the tree is open read-only, be very aggressive. Otherwise, we
- * can spend a long time waiting for merges to start in read-only
- * applications.
- */
- if (F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING) || F_ISSET(
- lsm_tree->chunk[lsm_tree->nchunks - 1], WT_LSM_CHUNK_ONDISK))
- aggressive = 100;
- merge_min = aggressive ? 2 : lsm_tree->merge_min;
- max_generation_gap = aggressive > 10 ? 3 : 1;
/*
- * Only include chunks that are stable on disk and not involved in a
- * merge.
+ * Only include chunks that already have a Bloom filter and not
+ * involved in a merge.
*/
end_chunk = lsm_tree->nchunks - 1;
while (end_chunk > 0 &&
- (!F_ISSET(lsm_tree->chunk[end_chunk], WT_LSM_CHUNK_ONDISK) ||
+ (!F_ISSET(lsm_tree->chunk[end_chunk], WT_LSM_CHUNK_BLOOM) ||
F_ISSET(lsm_tree->chunk[end_chunk], WT_LSM_CHUNK_MERGING)))
--end_chunk;
@@ -159,11 +160,12 @@ __wt_lsm_merge(
* If we have enough chunks for a merge and the next chunk is
* in a different generation, stop.
*/
- if (nchunks >= merge_min &&
- chunk->generation >
- lsm_tree->chunk[start_chunk]->generation &&
- chunk->generation <= youngest->generation + 1)
- break;
+ if (nchunks >= merge_min) {
+ previous = lsm_tree->chunk[start_chunk];
+ if (chunk->generation > previous->generation &&
+ previous->generation <= youngest->generation + 1)
+ break;
+ }
F_SET(chunk, WT_LSM_CHUNK_MERGING);
record_count += chunk->count;
@@ -286,7 +288,7 @@ __wt_lsm_merge(
record_count, insert_count);
/*
- * We've successfully created the new chunk. Now install it. We need
+ * We've successfully created the new chunk. Now install it. We need
* to ensure that the NO_CACHE flag is cleared and the bloom filter
* is closed (even if a step fails), so track errors but don't return
* until we've cleaned up.
@@ -295,6 +297,8 @@ __wt_lsm_merge(
WT_TRET(dest->close(dest));
src = dest = NULL;
+ F_CLR(session, WT_SESSION_NO_CACHE);
+
if (create_bloom) {
if (ret == 0)
WT_TRET(__wt_bloom_finalize(bloom));
@@ -313,7 +317,6 @@ __wt_lsm_merge(
WT_TRET(__wt_bloom_close(bloom));
bloom = NULL;
}
- F_CLR(session, WT_SESSION_NO_CACHE);
WT_ERR(ret);
/*
@@ -328,7 +331,6 @@ __wt_lsm_merge(
WT_ERR_NOTFOUND_OK(ret);
WT_ERR(__wt_lsm_tree_set_chunk_size(session, chunk));
-
WT_ERR(__wt_lsm_tree_lock(session, lsm_tree, 1));
/*
diff --git a/src/lsm/lsm_tree.c b/src/lsm/lsm_tree.c
index 0badd5841d7..3aa1b6419d7 100644
--- a/src/lsm/lsm_tree.c
+++ b/src/lsm/lsm_tree.c
@@ -562,8 +562,13 @@ __wt_lsm_tree_throttle(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
{
WT_LSM_CHUNK *chunk, **cp, *prev_chunk;
uint64_t cache_sz, cache_used, in_memory, record_count;
+ uint64_t oldtime, timediff;
uint32_t i;
+ /* Never throttle in small trees. */
+ if (lsm_tree->nchunks < 3)
+ return;
+
cache_sz = S2C(session)->cache_size;
/*
@@ -601,9 +606,9 @@ __wt_lsm_tree_throttle(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
} else {
WT_ASSERT(session,
WT_TIMECMP(chunk->create_ts, (*cp)->create_ts) >= 0);
- lsm_tree->throttle_sleep = (long)((in_memory - 2) *
- WT_TIMEDIFF(chunk->create_ts, (*cp)->create_ts) /
- (20 * record_count));
+ timediff = WT_TIMEDIFF(chunk->create_ts, (*cp)->create_ts);
+ lsm_tree->throttle_sleep =
+ (long)((in_memory - 2) * timediff / (20 * record_count));
/*
* Get more aggressive as the number of in memory chunks
@@ -622,16 +627,24 @@ __wt_lsm_tree_throttle(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
/*
* Update our estimate of how long each in-memory chunk stays active.
* Filter out some noise by keeping a weighted history of the
- * calculated value.
+ * calculated value. Wait until we have enough chunks that we can
+ * check that the new value is sane: otherwise, after a long idle
+ * period, we can calculate a crazy value.
*/
- prev_chunk = lsm_tree->chunk[lsm_tree->nchunks - 2];
- if (in_memory > 1) {
+ if (in_memory > 1 &&
+ i != lsm_tree->nchunks && !F_ISSET(*cp, WT_LSM_CHUNK_STABLE)) {
+ prev_chunk = lsm_tree->chunk[lsm_tree->nchunks - 2];
WT_ASSERT(session, prev_chunk->generation == 0);
- WT_ASSERT(session, WT_TIMECMP(
- chunk->create_ts, prev_chunk->create_ts) >= 0);
- lsm_tree->chunk_fill_ms =
- (3 * lsm_tree->chunk_fill_ms + WT_TIMEDIFF(
- chunk->create_ts, prev_chunk->create_ts) / 1000000) / 4;
+ WT_ASSERT(session,
+ WT_TIMECMP(chunk->create_ts, prev_chunk->create_ts) >= 0);
+ timediff = WT_TIMEDIFF(chunk->create_ts, prev_chunk->create_ts);
+ WT_ASSERT(session,
+ WT_TIMECMP(prev_chunk->create_ts, (*cp)->create_ts) >= 0);
+ oldtime = WT_TIMEDIFF(prev_chunk->create_ts, (*cp)->create_ts);
+ if (timediff < 10 * oldtime)
+ lsm_tree->chunk_fill_ms =
+ (3 * lsm_tree->chunk_fill_ms +
+ timediff / 1000000) / 4;
}
}
@@ -657,10 +670,10 @@ __wt_lsm_tree_switch(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
!F_ISSET(lsm_tree, WT_LSM_TREE_NEED_SWITCH))
goto err;
- new_id = WT_ATOMIC_ADD(lsm_tree->last, 1);
+ /* Update the throttle time. */
+ __wt_lsm_tree_throttle(session, lsm_tree);
- if (nchunks > 1)
- __wt_lsm_tree_throttle(session, lsm_tree);
+ new_id = WT_ATOMIC_ADD(lsm_tree->last, 1);
WT_ERR(__wt_realloc_def(session, &lsm_tree->chunk_alloc,
nchunks + 1, &lsm_tree->chunk));
@@ -679,6 +692,8 @@ __wt_lsm_tree_switch(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
F_CLR(lsm_tree, WT_LSM_TREE_NEED_SWITCH);
++lsm_tree->dsk_gen;
+ lsm_tree->modified = 1;
+
err: /* TODO: mark lsm_tree bad on error(?) */
WT_TRET(__wt_lsm_tree_unlock(session, lsm_tree));
return (ret);
diff --git a/src/lsm/lsm_worker.c b/src/lsm/lsm_worker.c
index 7fa9de38b5e..3cfb4119b85 100644
--- a/src/lsm/lsm_worker.c
+++ b/src/lsm/lsm_worker.c
@@ -7,7 +7,8 @@
#include "wt_internal.h"
-static int __lsm_bloom_create(WT_SESSION_IMPL *, WT_LSM_TREE *, WT_LSM_CHUNK *);
+static int __lsm_bloom_create(
+ WT_SESSION_IMPL *, WT_LSM_TREE *, WT_LSM_CHUNK *, u_int);
static int __lsm_bloom_work(WT_SESSION_IMPL *, WT_LSM_TREE *);
static int __lsm_discard_handle(WT_SESSION_IMPL *, const char *, const char *);
static int __lsm_free_chunks(WT_SESSION_IMPL *, WT_LSM_TREE *);
@@ -94,9 +95,8 @@ __wt_lsm_merge_worker(void *vargs)
WT_LSM_WORKER_ARGS *args;
WT_LSM_TREE *lsm_tree;
WT_SESSION_IMPL *session;
- uint64_t saved_gen;
u_int aggressive, chunk_wait, id, old_aggressive, stallms;
- int progress, try_bloom;
+ int progress;
args = vargs;
lsm_tree = args->lsm_tree;
@@ -104,9 +104,7 @@ __wt_lsm_merge_worker(void *vargs)
session = lsm_tree->worker_sessions[id];
__wt_free(session, args);
- saved_gen = lsm_tree->dsk_gen;
aggressive = stallms = 0;
- try_bloom = 0;
while (F_ISSET(lsm_tree, WT_LSM_TREE_WORKING)) {
/*
@@ -124,19 +122,18 @@ __wt_lsm_merge_worker(void *vargs)
/* Clear any state from previous worker thread iterations. */
session->dhandle = NULL;
- if (__wt_lsm_merge(session, lsm_tree, id, aggressive) == 0) {
+ /* Try to create a Bloom filter. */
+ if (__lsm_bloom_work(session, lsm_tree) == 0)
+ progress = 1;
+
+ /* If we didn't create a Bloom filter, try to merge. */
+ if (progress == 0 &&
+ __wt_lsm_merge(session, lsm_tree, id, aggressive) == 0)
progress = 1;
- try_bloom = 0;
- }
/* Clear any state from previous worker thread iterations. */
WT_CLEAR_BTREE_IN_SESSION(session);
- /* Try to create a Bloom filter if no merge was possible. */
- if (progress == 0 && try_bloom &&
- __lsm_bloom_work(session, lsm_tree) == 0)
- progress = 1;
-
/*
* Only have one thread freeing old chunks, and only if there
* are chunks to free.
@@ -145,42 +142,25 @@ __wt_lsm_merge_worker(void *vargs)
__lsm_free_chunks(session, lsm_tree) == 0)
progress = 1;
- if (progress || saved_gen != lsm_tree->dsk_gen) {
- aggressive = 0;
+ if (progress)
stallms = 0;
- saved_gen = lsm_tree->dsk_gen;
- } else if (F_ISSET(lsm_tree, WT_LSM_TREE_WORKING) &&
+ else if (F_ISSET(lsm_tree, WT_LSM_TREE_WORKING) &&
!F_ISSET(lsm_tree, WT_LSM_TREE_NEED_SWITCH)) {
- /*
- * The "main" thread polls 10 times per second,
- * secondary threads once per second.
- */
+ /* Poll 10 times per second. */
WT_ERR_TIMEDOUT_OK(__wt_cond_wait(
- session, lsm_tree->work_cond,
- id == 0 ? 100000 : 1000000));
- stallms += (id == 0) ? 100 : 1000;
-
- /*
- * Start creating Bloom filters once enough time has
- * passed that we should have filled a chunk (or 1
- * second if we don't have an estimate).
- */
- try_bloom = (stallms > (lsm_tree->chunk_fill_ms == 0 ?
- 1000 : lsm_tree->chunk_fill_ms));
+ session, lsm_tree->work_cond, 100000));
+ stallms += 100;
/*
* Get aggressive if more than enough chunks for a
* merge should have been created while we waited.
- * Use 30 seconds as a default if we don't have an
+ * Use 10 seconds as a default if we don't have an
* estimate.
*/
chunk_wait = stallms / (lsm_tree->chunk_fill_ms == 0 ?
- 30000 : lsm_tree->chunk_fill_ms);
+ 10000 : lsm_tree->chunk_fill_ms);
old_aggressive = aggressive;
- for (aggressive = 0, chunk_wait /= lsm_tree->merge_min;
- chunk_wait > 0;
- ++aggressive, chunk_wait /= lsm_tree->merge_min)
- ;
+ aggressive = chunk_wait / lsm_tree->merge_min;
if (aggressive > old_aggressive)
WT_VERBOSE_ERR(session, lsm,
@@ -208,7 +188,7 @@ __lsm_bloom_work(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
WT_DECL_RET;
WT_LSM_CHUNK *chunk;
WT_LSM_WORKER_COOKIE cookie;
- int i;
+ u_int i;
WT_CLEAR(cookie);
/* If no work is done, tell our caller by returning WT_NOTFOUND. */
@@ -217,7 +197,7 @@ __lsm_bloom_work(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
WT_RET(__lsm_copy_chunks(session, lsm_tree, &cookie, 0));
/* Create bloom filters in all checkpointed chunks. */
- for (i = (int)cookie.nchunks - 1; i >= 0; i--) {
+ for (i = 0; i < cookie.nchunks; i++) {
chunk = cookie.chunk_array[i];
/*
@@ -232,7 +212,8 @@ __lsm_bloom_work(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
/* See if we win the race to switch on the "busy" flag. */
if (WT_ATOMIC_CAS(chunk->bloom_busy, 0, 1)) {
- ret = __lsm_bloom_create(session, lsm_tree, chunk);
+ ret = __lsm_bloom_create(
+ session, lsm_tree, chunk, (u_int)i);
chunk->bloom_busy = 0;
break;
}
@@ -431,15 +412,14 @@ err: __lsm_unpin_chunks(session, &cookie);
* checkpointed but not yet been merged.
*/
static int
-__lsm_bloom_create(
- WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, WT_LSM_CHUNK *chunk)
+__lsm_bloom_create(WT_SESSION_IMPL *session,
+ WT_LSM_TREE *lsm_tree, WT_LSM_CHUNK *chunk, u_int chunk_off)
{
WT_BLOOM *bloom;
WT_CURSOR *src;
WT_DECL_RET;
WT_ITEM buf, key;
WT_SESSION *wt_session;
- const char *cur_cfg[3];
uint64_t insert_count;
int exist;
@@ -474,10 +454,10 @@ __lsm_bloom_create(
lsm_tree->bloom_config, chunk->count,
lsm_tree->bloom_bit_count, lsm_tree->bloom_hash_count, &bloom));
- cur_cfg[0] = WT_CONFIG_BASE(session, session_open_cursor);
- cur_cfg[1] = "checkpoint=" WT_CHECKPOINT ",raw";
- cur_cfg[2] = NULL;
- WT_ERR(__wt_open_cursor(session, chunk->uri, NULL, cur_cfg, &src));
+ /* Open a special merge cursor just on this chunk. */
+ WT_ERR(__wt_open_cursor(session, lsm_tree->name, NULL, NULL, &src));
+ F_SET(src, WT_CURSTD_RAW);
+ WT_ERR(__wt_clsm_init_merge(src, chunk_off, chunk->id, 1));
F_SET(session, WT_SESSION_NO_CACHE);
for (insert_count = 0; (ret = src->next(src)) == 0; insert_count++) {
@@ -486,11 +466,16 @@ __lsm_bloom_create(
}
WT_ERR_NOTFOUND_OK(ret);
WT_TRET(src->close(src));
- F_CLR(session, WT_SESSION_NO_CACHE);
WT_TRET(__wt_bloom_finalize(bloom));
WT_ERR(ret);
+ F_CLR(session, WT_SESSION_NO_CACHE);
+
+ /* Load the new Bloom filter into cache. */
+ WT_CLEAR(key);
+ WT_ERR_NOTFOUND_OK(__wt_bloom_get(bloom, &key));
+
WT_VERBOSE_ERR(session, lsm,
"LSM worker created bloom filter %s. "
"Expected %" PRIu64 " items, got %" PRIu64,
@@ -498,8 +483,8 @@ __lsm_bloom_create(
/* Ensure the bloom filter is in the metadata. */
WT_ERR(__wt_lsm_tree_lock(session, lsm_tree, 1));
- ret = __wt_lsm_meta_write(session, lsm_tree);
F_SET(chunk, WT_LSM_CHUNK_BLOOM);
+ ret = __wt_lsm_meta_write(session, lsm_tree);
++lsm_tree->dsk_gen;
WT_TRET(__wt_lsm_tree_unlock(session, lsm_tree));
diff --git a/src/meta/meta_ckpt.c b/src/meta/meta_ckpt.c
index d036f64b14d..8eba95c5c68 100644
--- a/src/meta/meta_ckpt.c
+++ b/src/meta/meta_ckpt.c
@@ -367,10 +367,10 @@ int
__wt_meta_ckptlist_set(WT_SESSION_IMPL *session,
const char *fname, WT_CKPT *ckptbase, WT_LSN *ckptlsn)
{
- struct timespec ts;
WT_CKPT *ckpt;
WT_DECL_ITEM(buf);
WT_DECL_RET;
+ time_t secs;
int64_t maxorder;
const char *sep;
@@ -416,8 +416,14 @@ __wt_meta_ckptlist_set(WT_SESSION_IMPL *session,
if (F_ISSET(ckpt, WT_CKPT_ADD))
ckpt->order = ++maxorder;
- WT_ERR(__wt_epoch(session, &ts));
- ckpt->sec = (uintmax_t)ts.tv_sec;
+ /*
+ * XXX
+ * Assumes a time_t fits into a uintmax_t, which isn't
+ * guaranteed, a time_t has to be an arithmetic type,
+ * but not an integral type.
+ */
+ WT_ERR(__wt_seconds(session, &secs));
+ ckpt->sec = (uintmax_t)secs;
}
if (strcmp(ckpt->name, WT_CHECKPOINT) == 0)
WT_ERR(__wt_buf_catfmt(session, buf,
diff --git a/src/meta/meta_table.c b/src/meta/meta_table.c
index 575a72cd7e6..05723cc4aa5 100644
--- a/src/meta/meta_table.c
+++ b/src/meta/meta_table.c
@@ -65,7 +65,14 @@ __wt_metadata_cursor(
WT_ERR(__wt_metadata_open(session));
WT_SET_BTREE_IN_SESSION(session, session->metafile);
+
+ /*
+ * We use the metadata a lot, so we have a handle cached; lock it and
+ * increment the in-use counter.
+ */
WT_ERR(__wt_session_lock_btree(session, 0));
+ __wt_session_dhandle_incr_use(session);
+
ret = __wt_curfile_create(session, NULL, cfg, 0, 0, cursorp);
/* Restore the caller's btree. */
diff --git a/src/os_posix/os_time.c b/src/os_posix/os_time.c
index 8979d77f818..dd3dbd8512b 100644
--- a/src/os_posix/os_time.c
+++ b/src/os_posix/os_time.c
@@ -8,6 +8,22 @@
#include "wt_internal.h"
/*
+ * __wt_seconds --
+ * Return the seconds since the Epoch.
+ */
+int
+__wt_seconds(WT_SESSION_IMPL *session, time_t *timep)
+{
+ struct timespec t;
+
+ WT_RET(__wt_epoch(session, &t));
+
+ *timep = t.tv_sec;
+
+ return (0);
+}
+
+/*
* __wt_epoch --
* Return the time since the Epoch.
*/
diff --git a/src/schema/schema_drop.c b/src/schema/schema_drop.c
index 206d1c4056a..bfca3079707 100644
--- a/src/schema/schema_drop.c
+++ b/src/schema/schema_drop.c
@@ -27,14 +27,6 @@ __drop_file(
if (!WT_PREFIX_SKIP(filename, "file:"))
return (EINVAL);
- if (session->dhandle == NULL &&
- (ret = __wt_session_get_btree(session, uri, NULL, cfg,
- WT_DHANDLE_EXCLUSIVE | WT_DHANDLE_LOCK_ONLY)) != 0) {
- if (ret == WT_NOTFOUND || ret == ENOENT)
- ret = 0;
- return (ret);
- }
-
/* Close all btree handles associated with this file. */
WT_RET(__wt_conn_dhandle_close_all(session, uri));
diff --git a/src/schema/schema_truncate.c b/src/schema/schema_truncate.c
index ff5920fdcee..27b4ffebde5 100644
--- a/src/schema/schema_truncate.c
+++ b/src/schema/schema_truncate.c
@@ -21,9 +21,15 @@ __truncate_file(WT_SESSION_IMPL *session, const char *name)
if (!WT_PREFIX_SKIP(filename, "file:"))
return (EINVAL);
+ /* Open and lock the file. */
+ WT_RET(__wt_session_get_btree(
+ session, name, NULL, NULL, WT_DHANDLE_EXCLUSIVE));
+
/* Get the allocation size. */
allocsize = S2BT(session)->allocsize;
+ WT_RET(__wt_session_release_btree(session));
+
/* Close any btree handles in the file. */
WT_RET(__wt_conn_dhandle_close_all(session, name));
@@ -97,12 +103,9 @@ __wt_schema_truncate(
WT_DECL_RET;
const char *tablename;
- WT_UNUSED(cfg);
tablename = uri;
if (WT_PREFIX_MATCH(uri, "file:")) {
- WT_RET(__wt_session_get_btree(
- session, uri, NULL, NULL, WT_DHANDLE_EXCLUSIVE));
ret = __truncate_file(session, uri);
} else if (WT_PREFIX_MATCH(uri, "lsm:"))
ret = __wt_lsm_tree_truncate(session, uri, cfg);
diff --git a/src/session/session_dhandle.c b/src/session/session_dhandle.c
index 8d163fcf288..1a54c37fcef 100644
--- a/src/session/session_dhandle.c
+++ b/src/session/session_dhandle.c
@@ -8,11 +8,47 @@
#include "wt_internal.h"
/*
- * __wt_session_add_btree --
- * Add a handle to the session's cache.
+ * __wt_session_dhandle_incr_use --
+ * Increment the session data source's in-use counter.
+ */
+void
+__wt_session_dhandle_incr_use(WT_SESSION_IMPL *session)
+{
+ WT_DATA_HANDLE *dhandle;
+
+ dhandle = session->dhandle;
+
+ (void)WT_ATOMIC_ADD(dhandle->session_inuse, 1);
+}
+
+/*
+ * __wt_session_dhandle_decr_use --
+ * Decrement the session data source's in-use counter.
*/
int
-__wt_session_add_btree(
+__wt_session_dhandle_decr_use(WT_SESSION_IMPL *session)
+{
+ WT_DATA_HANDLE *dhandle;
+ WT_DECL_RET;
+
+ dhandle = session->dhandle;
+
+ /*
+ * Decrement the in-use count on the underlying data-source -- if we're
+ * the last reference, set the time-of-death timestamp.
+ */
+ WT_ASSERT(session, dhandle->session_inuse > 0);
+ if (WT_ATOMIC_SUB(dhandle->session_inuse, 1) == 0)
+ WT_TRET(__wt_seconds(session, &dhandle->timeofdeath));
+ return (0);
+}
+
+/*
+ * __session_add_btree --
+ * Add a handle to the session's cache.
+ */
+static int
+__session_add_btree(
WT_SESSION_IMPL *session, WT_DATA_HANDLE_CACHE **dhandle_cachep)
{
WT_DATA_HANDLE_CACHE *dhandle_cache;
@@ -112,6 +148,9 @@ __wt_session_release_btree(WT_SESSION_IMPL *session)
btree = S2BT(session);
dhandle = session->dhandle;
+ /* Decrement the data-source's in-use counter. */
+ WT_ERR(__wt_session_dhandle_decr_use(session));
+
if (F_ISSET(dhandle, WT_DHANDLE_DISCARD_CLOSE)) {
/*
* If configured to discard on last close, attempt to trade our
@@ -215,11 +254,27 @@ retry: WT_RET(__wt_meta_checkpoint_last_name(
* Discard any session dhandles that are not open.
*/
static int
-__session_dhandle_sweep(WT_SESSION_IMPL *session)
+__session_dhandle_sweep(WT_SESSION_IMPL *session, uint32_t flags)
{
WT_DATA_HANDLE *dhandle;
WT_DATA_HANDLE_CACHE *dhandle_cache, *dhandle_cache_next;
- WT_DECL_RET;
+ time_t now;
+
+ /*
+ * Check the local flag WT_DHANDLE_LOCK_ONLY; a common caller with that
+ * flag is in the path to discard the handle, don't sweep in that case.
+ */
+ if (LF_ISSET(WT_DHANDLE_LOCK_ONLY))
+ return (0);
+
+ /*
+ * Periodically sweep for dead handles; if we've swept recently, don't
+ * do it again.
+ */
+ WT_RET(__wt_seconds(session, &now));
+ if (now - session->last_sweep < WT_DHANDLE_SWEEP_PERIOD)
+ return (0);
+ session->last_sweep = now;
WT_STAT_FAST_CONN_INCR(session, dh_session_sweeps);
@@ -227,33 +282,30 @@ __session_dhandle_sweep(WT_SESSION_IMPL *session)
while (dhandle_cache != NULL) {
dhandle_cache_next = SLIST_NEXT(dhandle_cache, l);
dhandle = dhandle_cache->dhandle;
- if (!F_ISSET(dhandle, WT_DHANDLE_EXCLUSIVE|WT_DHANDLE_OPEN)) {
+ if (dhandle->session_inuse == 0 &&
+ now - dhandle->timeofdeath > WT_DHANDLE_SWEEP_WAIT) {
WT_STAT_FAST_CONN_INCR(session, dh_session_handles);
- WT_TRET(__wt_session_discard_btree(
- session, dhandle_cache));
+ WT_RET(
+ __wt_session_discard_btree(session, dhandle_cache));
}
dhandle_cache = dhandle_cache_next;
}
- return (ret);
+ return (0);
}
/*
* __session_open_btree --
- * Wrapper function to first sweep the session and then get the btree.
- * Sweeping is only called when a session notices it has dead dhandles on
- * its session dhandle list. Must be called with schema lock.
+ * Wrapper function to first sweep the session handles and then get the
+ * btree handle; must be called with schema lock.
*/
static int
__session_open_btree(WT_SESSION_IMPL *session,
- const char *name, const char *ckpt, const char *op_cfg[],
- int dead, uint32_t flags)
+ const char *name, const char *ckpt, const char *op_cfg[], uint32_t flags)
{
- WT_DECL_RET;
+ WT_RET(__session_dhandle_sweep(session, flags));
+ WT_RET(__wt_conn_btree_get(session, name, ckpt, op_cfg, flags));
- if (dead)
- WT_TRET(__session_dhandle_sweep(session));
- WT_TRET(__wt_conn_btree_get(session, name, ckpt, op_cfg, flags));
- return (ret);
+ return (0);
}
/*
@@ -268,23 +320,14 @@ __wt_session_get_btree(WT_SESSION_IMPL *session,
WT_DATA_HANDLE_CACHE *dhandle_cache;
WT_DECL_RET;
uint64_t hash;
- int candidate, dead;
+ int candidate;
dhandle = NULL;
- candidate = dead = 0;
+ candidate = 0;
hash = __wt_hash_city64(uri, strlen(uri));
SLIST_FOREACH(dhandle_cache, &session->dhandles, l) {
dhandle = dhandle_cache->dhandle;
- /*
- * We check the local flag WT_DHANDLE_LOCK_ONLY in addition
- * to the dhandle flags. A common caller with the flag
- * is from the path to discard the handle, so we ignore the
- * optimization to sweep in that case.
- */
- if (!LF_ISSET(WT_DHANDLE_LOCK_ONLY) &&
- !F_ISSET(dhandle, WT_DHANDLE_EXCLUSIVE|WT_DHANDLE_OPEN))
- dead = 1;
if (hash != dhandle->name_hash ||
strcmp(uri, dhandle->name) != 0)
continue;
@@ -300,8 +343,8 @@ __wt_session_get_btree(WT_SESSION_IMPL *session,
session->dhandle = dhandle;
/*
- * Try to lock the file; if we succeed, our "exclusive"
- * state must match.
+ * Try to lock the file; if we succeed, our "exclusive" state
+ * must match.
*/
ret = __wt_session_lock_btree(session, flags);
if (ret == WT_NOTFOUND)
@@ -314,20 +357,21 @@ __wt_session_get_btree(WT_SESSION_IMPL *session,
/*
* If we don't already hold the schema lock, get it now so that
* we can find and/or open the handle. We call a wrapper
- * function that will optionally sweep the handle list to
- * remove any dead handles.
+ * function to sweep the handle list to remove any dead handles.
*/
WT_WITH_SCHEMA_LOCK(session, ret =
- __session_open_btree(
- session, uri, checkpoint, cfg, dead, flags));
+ __session_open_btree(session, uri, checkpoint, cfg, flags));
WT_RET(ret);
if (!candidate)
- WT_RET(__wt_session_add_btree(session, NULL));
+ WT_RET(__session_add_btree(session, NULL));
WT_ASSERT(session, LF_ISSET(WT_DHANDLE_LOCK_ONLY) ||
F_ISSET(session->dhandle, WT_DHANDLE_OPEN));
}
+ /* Increment the data-source's in-use counter. */
+ __wt_session_dhandle_incr_use(session);
+
WT_ASSERT(session, LF_ISSET(WT_DHANDLE_EXCLUSIVE) ==
F_ISSET(session->dhandle, WT_DHANDLE_EXCLUSIVE));
F_SET(session->dhandle, LF_ISSET(WT_DHANDLE_DISCARD_CLOSE));
diff --git a/src/txn/txn_ckpt.c b/src/txn/txn_ckpt.c
index 7be84f3d0f8..04817958d97 100644
--- a/src/txn/txn_ckpt.c
+++ b/src/txn/txn_ckpt.c
@@ -476,7 +476,7 @@ __checkpoint_worker(
*/
if ((ret = __wt_meta_ckptlist_get(
session, dhandle->name, &ckptbase)) == WT_NOTFOUND) {
- WT_ASSERT(session, session->dhandle->refcnt == 0);
+ WT_ASSERT(session, session->dhandle->session_ref == 0);
return (__wt_bt_cache_op(
session, NULL, WT_SYNC_DISCARD_NOWRITE));
}
diff --git a/test/format/config.h b/test/format/config.h
index bcbaef13865..9f8ab37c7bd 100644
--- a/test/format/config.h
+++ b/test/format/config.h
@@ -72,8 +72,8 @@ typedef struct {
static CONFIG c[] = {
{ "firstfit",
- "if allocation is firstfit", /* 20% */
- 0x0, C_BOOL, 20, 0, 0, &g.c_firstfit, NULL },
+ "if allocation is firstfit", /* 10% */
+ 0x0, C_BOOL, 10, 0, 0, &g.c_firstfit, NULL },
{ "bitcnt",
"number of bits for fixed-length column-store files",
@@ -83,6 +83,10 @@ static CONFIG c[] = {
"size of the cache in MB",
0x0, 0x0, 1, 100, 1024, &g.c_cache, NULL },
+ { "compaction",
+ "if compaction is running", /* 10% */
+ 0x0, C_BOOL, 10, 0, 0, &g.c_compact, NULL },
+
{ "compression",
"type of compression (none | bzip | lzo | raw | snappy)",
0x0, C_IGNORE|C_STRING, 1, 5, 5, NULL, &g.c_compression },
diff --git a/test/format/format.h b/test/format/format.h
index df6fc87a17b..276c8a2a8bc 100644
--- a/test/format/format.h
+++ b/test/format/format.h
@@ -136,6 +136,7 @@ typedef struct {
u_int c_bitcnt; /* Config values */
u_int c_cache;
+ u_int c_compact;
char *c_compression;
char *c_config_open;
u_int c_data_extend;
@@ -224,6 +225,7 @@ void wts_load(void);
void wts_open(const char *, int, WT_CONNECTION **);
void wts_ops(void);
uint32_t wts_rand(void);
+void wts_rand_init(void);
void wts_read_scan(void);
void wts_salvage(void);
void wts_stats(void);
diff --git a/test/format/ops.c b/test/format/ops.c
index 6b43a5dd20b..d38568523d5 100644
--- a/test/format/ops.c
+++ b/test/format/ops.c
@@ -91,7 +91,7 @@ wts_ops(void)
if ((ret =
pthread_create(&backup_tid, NULL, hot_backup, NULL)) != 0)
die(ret, "pthread_create");
- if ((ret =
+ if (g.c_compact && (ret =
pthread_create(&compact_tid, NULL, compact, NULL)) != 0)
die(ret, "pthread_create");
@@ -130,7 +130,8 @@ wts_ops(void)
/* Wait for the backup, compaction thread. */
g.threads_finished = 1;
(void)pthread_join(backup_tid, NULL);
- (void)pthread_join(compact_tid, NULL);
+ if (g.c_compact)
+ (void)pthread_join(compact_tid, NULL);
}
if (g.logging != 0) {
diff --git a/test/format/t.c b/test/format/t.c
index 69b0a4c5c8d..7fdc25af3f7 100644
--- a/test/format/t.c
+++ b/test/format/t.c
@@ -125,6 +125,13 @@ main(int argc, char *argv[])
/* Clean up on signal. */
(void)signal(SIGINT, onint);
+ /*
+ * Initialize the random number generator (don't reinitialize on each
+ * new run, reinitializing won't be more random than continuing on from
+ * the current state).
+ */
+ wts_rand_init();
+
printf("%s: process %" PRIdMAX "\n", g.progname, (intmax_t)getpid());
while (++g.run_cnt <= g.c_runs || g.c_runs == 0 ) {
startup(); /* Start a run */
diff --git a/test/format/util.c b/test/format/util.c
index e1f52b77b8e..39a105fb9fc 100644
--- a/test/format/util.c
+++ b/test/format/util.c
@@ -201,6 +201,18 @@ track(const char *tag, uint64_t cnt, TINFO *tinfo)
}
/*
+ * wts_rand_init --
+ * Initialize the random number generator.
+ */
+void
+wts_rand_init(void)
+{
+ /* Seed the random number generator. */
+ if (!g.replay)
+ srand((u_int)(0xdeadbeef ^ (u_int)time(NULL)));
+}
+
+/*
* wts_rand --
* Return a random number.
*/
@@ -243,12 +255,6 @@ wts_rand(void)
if ((g.rand_log = fopen("RUNDIR/rand", "w")) == NULL)
die(errno, "fopen: RUNDIR/rand");
(void)setvbuf(g.rand_log, NULL, _IOLBF, 0);
-
- /*
- * Seed the random number generator for each new run (we
- * know it's a new run when we re-open the log file).
- */
- srand((u_int)(0xdeadbeef ^ (u_int)time(NULL)));
}
r = (uint32_t)rand();
fprintf(g.rand_log, "%" PRIu32 "\n", r);
diff --git a/test/suite/test_cursor03.py b/test/suite/test_cursor03.py
index 61b2711a3c2..362b00ebf60 100644
--- a/test/suite/test_cursor03.py
+++ b/test/suite/test_cursor03.py
@@ -48,8 +48,8 @@ class test_cursor03(TestCursorTracker):
('col.val10k', dict(tablekind='col', keysize=None, valsize=[10, 10000], uri='table')),
('row.keyval10k', dict(tablekind='row', keysize=[10,10000], valsize=[10, 10000], uri='table')),
], [
- ('count1000', dict(tablecount=1000,cache_size=25*1024*1024)),
- ('count10000', dict(tablecount=10000, cache_size=64*1024*1024))
+ ('count1000', dict(tablecount=1000)),
+ ('count10000', dict(tablecount=10000))
])
def create_session_and_cursor(self):
@@ -68,12 +68,6 @@ class test_cursor03(TestCursorTracker):
self.cur_initial_conditions(self.table_name1, self.tablecount, self.tablekind, self.keysize, self.valsize, self.uri)
return self.session.open_cursor(tablearg, None, 'append')
- def setUpConnectionOpen(self, dir):
- wtopen_args = 'create,cache_size=' + str(self.cache_size)
- conn = wiredtiger.wiredtiger_open(dir, wtopen_args)
- self.pr(`conn`)
- return conn
-
def test_multiple_remove(self):
"""
Test multiple deletes at the same place
diff --git a/test/suite/test_txn02.py b/test/suite/test_txn02.py
index bc024dfa531..96c1d52ff65 100644
--- a/test/suite/test_txn02.py
+++ b/test/suite/test_txn02.py
@@ -29,14 +29,19 @@
# Transactions: commits and rollbacks
#
-import os, shutil
+import fnmatch, os, shutil
+from suite_subprocess import suite_subprocess
from wiredtiger import wiredtiger_open
from wtscenario import multiply_scenarios, number_scenarios
import wttest
-class test_txn02(wttest.WiredTigerTestCase):
+class test_txn02(wttest.WiredTigerTestCase, suite_subprocess):
+ logmax = "100K"
tablename = 'test_txn02'
uri = 'table:' + tablename
+ archive_list = ['true', 'false']
+ conn_list = ['reopen', 'stay_open']
+ sync_list = ['dsync', 'fsync', 'none']
types = [
('row', dict(tabletype='row',
@@ -72,15 +77,22 @@ class test_txn02(wttest.WiredTigerTestCase):
txn4s = [('t4c', dict(txn4='commit')), ('t4r', dict(txn4='rollback'))]
scenarios = number_scenarios(multiply_scenarios('.', types,
- op1s, txn1s, op2s, txn2s, op3s, txn3s, op4s, txn4s))
-
+ op1s, txn1s, op2s, txn2s, op3s, txn3s, op4s, txn4s))
+ # scenarios = number_scenarios(multiply_scenarios('.', types,
+ # op1s, txn1s, op2s, txn2s, op3s, txn3s, op4s, txn4s)) [:3]
# Overrides WiredTigerTestCase
def setUpConnectionOpen(self, dir):
self.home = dir
+ # Cycle through the different transaction_sync values in a
+ # deterministic manner.
+ self.txn_sync = self.sync_list[
+ self.scenario_number % len(self.sync_list)]
self.backup_dir = os.path.join(self.home, "WT_BACKUP")
- conn = wiredtiger_open(dir,
- 'create,log=(enabled,file_max=100K),' +
- ('error_prefix="%s: ",' % self.shortid()))
+ conn_params = 'create,log=(enabled,file_max=%s),' % self.logmax + \
+ 'error_prefix="%s: ",' % self.shortid() + \
+ 'transaction_sync="%s",' % self.txn_sync
+ # print "Creating conn at '%s' with config '%s'" % (dir, conn_params)
+ conn = wiredtiger_open(dir, conn_params)
self.pr(`conn`)
self.session2 = conn.open_session()
return conn
@@ -111,51 +123,118 @@ class test_txn02(wttest.WiredTigerTestCase):
self.check(self.session2, "isolation=read-committed", committed)
self.check(self.session2, "isolation=read-uncommitted", current)
- # Opening a clone of the database home directory should see the
- # committed results.
+ # Opening a clone of the database home directory should run
+ # recovery and see the committed results.
wttest.removeAll(self.backup_dir)
shutil.copytree(self.home, self.backup_dir)
- backup_conn = wiredtiger_open(self.backup_dir, 'log=(enabled)')
+ backup_conn_params = 'log=(enabled,file_max=%s)' % self.logmax
+ backup_conn = wiredtiger_open(self.backup_dir, backup_conn_params)
try:
self.check(backup_conn.open_session(), None, committed)
- #self.check(backup_conn.open_session(), None, {})
finally:
backup_conn.close()
+ def check_log(self, committed):
+ wttest.removeAll(self.backup_dir)
+ shutil.copytree(self.home, self.backup_dir)
+ #
+ # Open and close the backup connection a few times to force
+ # repeated recovery and log archiving even if later recoveries
+ # are essentially no-ops. Confirm that the backup contains
+ # the committed operations after recovery.
+ #
+ # Cycle through the different archive values in a
+ # deterministic manner.
+ self.archive = self.archive_list[
+ self.scenario_number % len(self.archive_list)]
+ backup_conn_params = \
+ 'log=(enabled,file_max=%s,archive=%s)' % (self.logmax, self.archive)
+ orig_logs = fnmatch.filter(os.listdir(self.backup_dir), "*Log*")
+ endcount = 2
+ count = 0
+ while count < endcount:
+ backup_conn = wiredtiger_open(self.backup_dir, backup_conn_params)
+ try:
+ self.check(backup_conn.open_session(), None, committed)
+ finally:
+ # Yield so that the archive thread gets a chance to run
+ # before we close the connection.
+ yield
+ backup_conn.close()
+ count += 1
+ #
+ # Check logs after repeated openings. The first log should
+ # have been archived if configured. Subsequent openings would not
+ # archive because no checkpoint is written due to no modifications.
+ #
+ cur_logs = fnmatch.filter(os.listdir(self.backup_dir), "*Log*")
+ for o in orig_logs:
+ if self.archive == 'true':
+ self.assertEqual(False, o in cur_logs)
+ else:
+ self.assertEqual(True, o in cur_logs)
+ #
+ # Run printlog and make sure it exits with zero status.
+ #
+ self.runWt(['-h', self.backup_dir, 'printlog'], outfilename='printlog.out')
+
def test_ops(self):
# print "Creating %s with config '%s'" % (self.uri, self.create_params)
self.session.create(self.uri, self.create_params)
- # Set up the table with entries for 1 and 10
+ # Set up the table with entries for 1, 2, 10 and 11.
# We use the overwrite config so insert can update as needed.
c = self.session.open_cursor(self.uri, None, 'overwrite')
c.set_value(1)
c.set_key(1)
c.insert()
+ c.set_key(2)
+ c.insert()
c.set_key(10)
c.insert()
- current = {1:1, 10:1}
+ c.set_key(11)
+ c.insert()
+ current = {1:1, 2:1, 10:1, 11:1}
committed = current.copy()
+ reopen = self.conn_list[
+ self.scenario_number % len(self.conn_list)]
ops = (self.op1, self.op2, self.op3, self.op4)
txns = (self.txn1, self.txn2, self.txn3, self.txn4)
+ # for ok, txn in zip(ops, txns):
# print ', '.join('%s(%d)[%s]' % (ok[0], ok[1], txn)
- # for ok, txn in zip(ops, txns))
for i, ot in enumerate(zip(ops, txns)):
- self.session.begin_transaction()
ok, txn = ot
op, k = ok
- # print '%s(%d)[%s]' % (ok[0], ok[1], txn)
+
+ # Close and reopen the connection and cursor.
+ if reopen == 'reopen':
+ self.reopen_conn()
+ c = self.session.open_cursor(self.uri, None, 'overwrite')
+
+ self.session.begin_transaction()
+ # Test multiple operations per transaction by always
+ # doing the same operation on key k + 1.
+ k1 = k + 1
+ # print '%d: %s(%d)[%s]' % (i, ok[0], ok[1], txn)
if op == 'insert' or op == 'update':
- c.set_key(k)
c.set_value(i + 2)
+ c.set_key(k)
+ c.insert()
+ c.set_key(k1)
c.insert()
current[k] = i + 2
+ current[k1] = i + 2
elif op == 'remove':
c.set_key(k)
c.remove()
+ c.set_key(k1)
+ c.remove()
if k in current:
del current[k]
+ if k1 in current:
+ del current[k1]
+ # print current
# Check the state after each operation.
self.check_all(current, committed)
@@ -169,5 +248,9 @@ class test_txn02(wttest.WiredTigerTestCase):
# Check the state after each commit/rollback.
self.check_all(current, committed)
+ # Check the log state after the entire op completes
+ # and run recovery.
+ self.check_log(committed)
+
if __name__ == '__main__':
wttest.run()
diff --git a/test/suite/test_txn04.py b/test/suite/test_txn04.py
new file mode 100644
index 00000000000..74c14926f07
--- /dev/null
+++ b/test/suite/test_txn04.py
@@ -0,0 +1,203 @@
+#!/usr/bin/env python
+#
+# Public Domain 2008-2013 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+# test_txn04.py
+# Transactions: hot backup and recovery
+#
+
+import os
+from suite_subprocess import suite_subprocess
+from wiredtiger import wiredtiger_open
+from wtscenario import multiply_scenarios, number_scenarios
+import wttest
+
+class test_txn04(wttest.WiredTigerTestCase, suite_subprocess):
+ logmax = "100K"
+ tablename = 'test_txn04'
+ uri = 'table:' + tablename
+ sync_list = ['dsync', 'fsync', 'none']
+
+ types = [
+ ('row', dict(tabletype='row',
+ create_params = 'key_format=i,value_format=i')),
+ ('var', dict(tabletype='var',
+ create_params = 'key_format=r,value_format=i')),
+ ('fix', dict(tabletype='fix',
+ create_params = 'key_format=r,value_format=8t')),
+ ]
+ op1s = [
+ ('insert', dict(op1=('insert', 6))),
+ ('update', dict(op1=('update', 2))),
+ ('remove', dict(op1=('remove', 2))),
+ ('trunc-stop', dict(op1=('stop', 2))),
+ ]
+ txn1s = [('t1c', dict(txn1='commit')), ('t1r', dict(txn1='rollback'))]
+
+ scenarios = number_scenarios(multiply_scenarios('.', types, op1s, txn1s))
+ # Overrides WiredTigerTestCase
+ def setUpConnectionOpen(self, dir):
+ self.home = dir
+ # Cycle through the different transaction_sync values in a
+ # deterministic manner.
+ self.txn_sync = self.sync_list[
+ self.scenario_number % len(self.sync_list)]
+ self.backup_dir = os.path.join(self.home, "WT_BACKUP")
+ # Set archive false on the home directory.
+ conn_params = 'log=(archive=false,enabled,file_max=%s),' % self.logmax + \
+ 'create,error_prefix="%s: ",' % self.shortid() + \
+ 'transaction_sync="%s",' % self.txn_sync
+ # print "Creating conn at '%s' with config '%s'" % (dir, conn_params)
+ conn = wiredtiger_open(dir, conn_params)
+ self.pr(`conn`)
+ self.session2 = conn.open_session()
+ return conn
+
+ # Check that a cursor (optionally started in a new transaction), sees the
+ # expected values.
+ def check(self, session, txn_config, expected):
+ if txn_config:
+ session.begin_transaction(txn_config)
+ c = session.open_cursor(self.uri, None)
+ actual = dict((k, v) for k, v in c if v != 0)
+ # Search for the expected items as well as iterating
+ for k, v in expected.iteritems():
+ self.assertEqual(c[k], v)
+ c.close()
+ if txn_config:
+ session.commit_transaction()
+ self.assertEqual(actual, expected)
+
+ # Check the state of the system with respect to the current cursor and
+ # different isolation levels.
+ def check_all(self, current, committed):
+ # Transactions see their own changes.
+ # Read-uncommitted transactions see all changes.
+ # Snapshot and read-committed transactions should not see changes.
+ self.check(self.session, None, current)
+ self.check(self.session2, "isolation=snapshot", committed)
+ self.check(self.session2, "isolation=read-committed", committed)
+ self.check(self.session2, "isolation=read-uncommitted", current)
+
+ def hot_backup(self, backup_uri, committed):
+ # If we are backing up a target, assume the directory exists.
+ # We just use the wt backup command.
+ # A future test extension could also use a cursor.
+ cmd = '-h ' + self.home + ' backup '
+ if backup_uri != None:
+ cmd += '-t ' + backup_uri + ' '
+ else:
+ wttest.removeAll(self.backup_dir)
+ os.mkdir(self.backup_dir)
+
+ cmd += self.backup_dir
+ self.runWt(cmd.split())
+ self.exception='false'
+ backup_conn_params = 'log=(enabled,file_max=%s)' % self.logmax
+ backup_conn = wiredtiger_open(self.backup_dir, backup_conn_params)
+ try:
+ self.check(backup_conn.open_session(), None, committed)
+ except:
+ self.exception='true'
+ finally:
+ backup_conn.close()
+
+ def test_ops(self):
+ self.session.create(self.uri, self.create_params)
+ c = self.session.open_cursor(self.uri, None, 'overwrite')
+ # Set up the table with entries for 1-5.
+ # We then truncate starting or ending in various places.
+ # We use the overwrite config so insert can update as needed.
+ current = {1:1, 2:1, 3:1, 4:1, 5:1}
+ c.set_value(1)
+ for k in current:
+ c.set_key(k)
+ c.insert()
+ committed = current.copy()
+
+ ops = (self.op1, )
+ txns = (self.txn1, )
+ for i, ot in enumerate(zip(ops, txns)):
+ # Perform a full hot backup of the original tables.
+ # The runWt command closes our connection and sessions so
+ # we need to reopen them here.
+ self.hot_backup(None, committed)
+ self.assertEqual(True, self.exception == 'false')
+ c = self.session.open_cursor(self.uri, None, 'overwrite')
+ c.set_value(1)
+ # Then do the given modification.
+ # Perform a targeted hot backup.
+ self.session.begin_transaction()
+ ok, txn = ot
+ op, k = ok
+
+ # print '%d: %s(%d)[%s]' % (i, ok[0], ok[1], txn)
+ if op == 'insert' or op == 'update':
+ c.set_value(i + 2)
+ c.set_key(k)
+ c.insert()
+ current[k] = i + 2
+ elif op == 'remove':
+ c.set_key(k)
+ c.remove()
+ if k in current:
+ del current[k]
+ elif op == 'stop':
+ # For both, the key given is the start key. Add 2
+ # for the stop key.
+ c.set_key(k)
+ kstart = 1
+ kstop = k
+ self.session.truncate(None, None, c, None)
+ while (kstart <= kstop):
+ del current[kstart]
+ kstart += 1
+
+ # print current
+ # Check the state after each operation.
+ self.check_all(current, committed)
+
+ if txn == 'commit':
+ committed = current.copy()
+ self.session.commit_transaction()
+ elif txn == 'rollback':
+ current = committed.copy()
+ self.session.rollback_transaction()
+
+ # Check the state after each commit/rollback.
+ self.check_all(current, committed)
+
+ # Backup the target we modified. We expect that running
+ # recovery now will generate an exception if we committed.
+ # print 'Call hot_backup with ' + self.uri
+ self.hot_backup(self.uri, committed)
+ if txn == 'commit':
+ self.assertEqual(True, self.exception == 'true')
+ else:
+ self.assertEqual(True, self.exception == 'false')
+
+if __name__ == '__main__':
+ wttest.run()
diff --git a/test/suite/test_txn05.py b/test/suite/test_txn05.py
new file mode 100644
index 00000000000..e017aab9cb0
--- /dev/null
+++ b/test/suite/test_txn05.py
@@ -0,0 +1,234 @@
+#!/usr/bin/env python
+#
+# Public Domain 2008-2013 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+# test_txn05.py
+# Transactions: commits and rollbacks
+#
+
+import fnmatch, os, shutil
+from suite_subprocess import suite_subprocess
+from wiredtiger import wiredtiger_open
+from wtscenario import multiply_scenarios, number_scenarios
+import wttest
+
+class test_txn05(wttest.WiredTigerTestCase, suite_subprocess):
+ logmax = "100K"
+ tablename = 'test_txn05'
+ uri = 'table:' + tablename
+ archive_list = ['true', 'false']
+ sync_list = ['dsync', 'fsync', 'none']
+
+ types = [
+ ('row', dict(tabletype='row',
+ create_params = 'key_format=i,value_format=i')),
+ ('var', dict(tabletype='var',
+ create_params = 'key_format=r,value_format=i')),
+ ('fix', dict(tabletype='fix',
+ create_params = 'key_format=r,value_format=8t')),
+ ]
+ op1s = [
+ ('trunc-all', dict(op1=('all', 0))),
+ ('trunc-both', dict(op1=('both', 2))),
+ ('trunc-start', dict(op1=('start', 2))),
+ ('trunc-stop', dict(op1=('stop', 2))),
+ ]
+ txn1s = [('t1c', dict(txn1='commit')), ('t1r', dict(txn1='rollback'))]
+
+ scenarios = number_scenarios(multiply_scenarios('.', types, op1s, txn1s))
+ # scenarios = number_scenarios(multiply_scenarios('.', types, op1s, txn1s))[:3]
+ # Overrides WiredTigerTestCase
+ def setUpConnectionOpen(self, dir):
+ self.home = dir
+ # Cycle through the different transaction_sync values in a
+ # deterministic manner.
+ self.txn_sync = self.sync_list[
+ self.scenario_number % len(self.sync_list)]
+ self.backup_dir = os.path.join(self.home, "WT_BACKUP")
+ conn_params = 'create,log=(enabled,file_max=%s),' % self.logmax + \
+ 'error_prefix="%s: ",' % self.shortid() + \
+ 'transaction_sync="%s",' % self.txn_sync
+ # print "Creating conn at '%s' with config '%s'" % (dir, conn_params)
+ conn = wiredtiger_open(dir, conn_params)
+ self.pr(`conn`)
+ self.session2 = conn.open_session()
+ return conn
+
+ # Check that a cursor (optionally started in a new transaction), sees the
+ # expected values.
+ def check(self, session, txn_config, expected):
+ if txn_config:
+ session.begin_transaction(txn_config)
+ c = session.open_cursor(self.uri, None)
+ actual = dict((k, v) for k, v in c if v != 0)
+ # Search for the expected items as well as iterating
+ for k, v in expected.iteritems():
+ self.assertEqual(c[k], v)
+ c.close()
+ if txn_config:
+ session.commit_transaction()
+ self.assertEqual(actual, expected)
+
+ # Check the state of the system with respect to the current cursor and
+ # different isolation levels.
+ def check_all(self, current, committed):
+ # Transactions see their own changes.
+ # Read-uncommitted transactions see all changes.
+ # Snapshot and read-committed transactions should not see changes.
+ self.check(self.session, None, current)
+ self.check(self.session2, "isolation=snapshot", committed)
+ self.check(self.session2, "isolation=read-committed", committed)
+ self.check(self.session2, "isolation=read-uncommitted", current)
+
+ # Opening a clone of the database home directory should run
+ # recovery and see the committed results.
+ wttest.removeAll(self.backup_dir)
+ shutil.copytree(self.home, self.backup_dir)
+ backup_conn_params = 'log=(enabled,file_max=%s)' % self.logmax
+ backup_conn = wiredtiger_open(self.backup_dir, backup_conn_params)
+ try:
+ self.check(backup_conn.open_session(), None, committed)
+ finally:
+ backup_conn.close()
+
+ def check_log(self, committed):
+ wttest.removeAll(self.backup_dir)
+ shutil.copytree(self.home, self.backup_dir)
+ #
+ # Open and close the backup connection a few times to force
+ # repeated recovery and log archiving even if later recoveries
+ # are essentially no-ops. Confirm that the backup contains
+ # the committed operations after recovery.
+ #
+ # Cycle through the different archive values in a
+ # deterministic manner.
+ self.archive = self.archive_list[
+ self.scenario_number % len(self.archive_list)]
+ backup_conn_params = \
+ 'log=(enabled,file_max=%s,archive=%s)' % (self.logmax, self.archive)
+ orig_logs = fnmatch.filter(os.listdir(self.backup_dir), "*Log*")
+ endcount = 2
+ count = 0
+ while count < endcount:
+ backup_conn = wiredtiger_open(self.backup_dir, backup_conn_params)
+ try:
+ self.check(backup_conn.open_session(), None, committed)
+ finally:
+ # Let other threads like archive run before closing.
+ yield
+ backup_conn.close()
+ count += 1
+ #
+ # Check logs after repeated openings. The first log should
+ # have been archived if configured. Subsequent openings would not
+ # archive because no checkpoint is written due to no modifications.
+ #
+ cur_logs = fnmatch.filter(os.listdir(self.backup_dir), "*Log*")
+ for o in orig_logs:
+ if self.archive == 'true':
+ self.assertEqual(False, o in cur_logs)
+ else:
+ self.assertEqual(True, o in cur_logs)
+ #
+ # Run printlog and make sure it exits with zero status.
+ #
+ self.runWt(['-h', self.backup_dir, 'printlog'], outfilename='printlog.out')
+
+ def test_ops(self):
+ # print "Creating %s with config '%s'" % (self.uri, self.create_params)
+ self.session.create(self.uri, self.create_params)
+ # Set up the table with entries for 1-5.
+ # We then truncate starting or ending in various places.
+ c = self.session.open_cursor(self.uri, None)
+ current = {1:1, 2:1, 3:1, 4:1, 5:1}
+ c.set_value(1)
+ for k in current:
+ c.set_key(k)
+ c.insert()
+ committed = current.copy()
+
+ ops = (self.op1, )
+ txns = (self.txn1, )
+ for i, ot in enumerate(zip(ops, txns)):
+ self.session.begin_transaction()
+ ok, txn = ot
+ # print '%d: %s(%d)[%s]' % (i, ok[0], ok[1], txn)
+ op, k = ok
+
+ # print '%d: %s(%d)[%s]' % (i, ok[0], ok[1], txn)
+ if op == 'stop':
+ c.set_key(k)
+ self.session.truncate(None, None, c, None)
+ kstart = 1
+ kstop = k
+ elif op == 'start':
+ c.set_key(k)
+ self.session.truncate(None, c, None, None)
+ kstart = k
+ kstop = len(current)
+ elif op == 'both':
+ c2 = self.session.open_cursor(self.uri, None)
+ # For both, the key given is the start key. Add 2
+ # for the stop key.
+ kstart = k
+ kstop = k + 2
+ c.set_key(kstart)
+ c2.set_key(kstop)
+ self.session.truncate(None, c, c2, None)
+ c2.close()
+ elif op == 'all':
+ c2 = self.session.open_cursor(self.uri, None)
+ kstart = 1
+ kstop = len(current)
+ c.set_key(kstart)
+ c2.set_key(kstop)
+ self.session.truncate(None, c, c2, None)
+ c2.close()
+
+ while (kstart <= kstop):
+ del current[kstart]
+ kstart += 1
+
+ # print current
+ # Check the state after each operation.
+ self.check_all(current, committed)
+
+ if txn == 'commit':
+ committed = current.copy()
+ self.session.commit_transaction()
+ elif txn == 'rollback':
+ current = committed.copy()
+ self.session.rollback_transaction()
+
+ # Check the state after each commit/rollback.
+ self.check_all(current, committed)
+
+ # Check the log state after the entire op completes
+ # and run recovery.
+ self.check_log(committed)
+
+if __name__ == '__main__':
+ wttest.run()
diff --git a/test/suite/wttest.py b/test/suite/wttest.py
index f196f61ab9f..22eb808403a 100644
--- a/test/suite/wttest.py
+++ b/test/suite/wttest.py
@@ -235,8 +235,8 @@ class WiredTigerTestCase(unittest.TestCase):
def setUp(self):
if not hasattr(self.__class__, 'wt_ntests'):
self.__class__.wt_ntests = 0
- self.__class__.wt_ntests += 1
self.testdir = os.path.join(WiredTigerTestCase._parentTestdir, self.className() + '.' + str(self.__class__.wt_ntests))
+ self.__class__.wt_ntests += 1
if WiredTigerTestCase._verbose > 2:
self.prhead('started in ' + self.testdir, True)
self.origcwd = os.getcwd()