summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Cahill <michael.cahill@wiredtiger.com>2012-10-26 12:56:33 +1100
committerMichael Cahill <michael.cahill@wiredtiger.com>2012-10-26 12:56:33 +1100
commitd347ee7515d2632aa6570ff6f6243ec7a740c731 (patch)
tree1bddabb8accff933973f7842c335a187ea95e16b
parent64e059d5a591f39aefdd025814762c7ec2529815 (diff)
downloadmongo-d347ee7515d2632aa6570ff6f6243ec7a740c731.tar.gz
Add a reference count to LSM trees to prevent a race between a drop and opening a cursor (if the drop happened in between a thread getting the tree and opening the first chunk).
-rw-r--r--src/cursor/cur_stat.c6
-rw-r--r--src/include/extern.h5
-rw-r--r--src/include/lsm.h1
-rw-r--r--src/lsm/lsm_cursor.c12
-rw-r--r--src/lsm/lsm_tree.c46
5 files changed, 53 insertions, 17 deletions
diff --git a/src/cursor/cur_stat.c b/src/cursor/cur_stat.c
index 4ef98867f33..a66d41d354d 100644
--- a/src/cursor/cur_stat.c
+++ b/src/cursor/cur_stat.c
@@ -349,10 +349,12 @@ __curstat_lsm_init(WT_SESSION_IMPL *session,
WT_LSM_TREE *lsm_tree;
WT_WITH_SCHEMA_LOCK_OPT(session,
- ret = __wt_lsm_tree_get(session, uri, &lsm_tree));
+ ret = __wt_lsm_tree_get(session, uri, 0, &lsm_tree));
WT_RET(ret);
- WT_RET(__wt_lsm_stat_init(session, lsm_tree, flags));
+ ret = __wt_lsm_stat_init(session, lsm_tree, flags);
+ __wt_lsm_tree_release(session, lsm_tree);
+ WT_RET(ret);
cst->btree = NULL;
cst->notpositioned = 1;
diff --git a/src/include/extern.h b/src/include/extern.h
index 5f559e0f365..3e03b03c47f 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -745,9 +745,12 @@ extern int __wt_lsm_tree_create(WT_SESSION_IMPL *session,
const char *uri,
int exclusive,
const char *config);
-extern int __wt_lsm_tree_get( WT_SESSION_IMPL *session,
+extern int __wt_lsm_tree_get(WT_SESSION_IMPL *session,
const char *uri,
+ int exclusive,
WT_LSM_TREE **treep);
+extern void __wt_lsm_tree_release(WT_SESSION_IMPL *session,
+ WT_LSM_TREE *lsm_tree);
extern int __wt_lsm_tree_switch( WT_SESSION_IMPL *session,
WT_LSM_TREE *lsm_tree);
extern int __wt_lsm_tree_drop( WT_SESSION_IMPL *session,
diff --git a/src/include/lsm.h b/src/include/lsm.h
index f67c515a3ac..0d64c275ba1 100644
--- a/src/include/lsm.h
+++ b/src/include/lsm.h
@@ -59,6 +59,7 @@ struct __wt_lsm_tree {
WT_COLLATOR *collator;
+ int refcnt; /* Number of users of the tree. */
WT_RWLOCK *rwlock;
TAILQ_ENTRY(__wt_lsm_tree) q;
diff --git a/src/lsm/lsm_cursor.c b/src/lsm/lsm_cursor.c
index d1a221ea242..1f11e3f8bc0 100644
--- a/src/lsm/lsm_cursor.c
+++ b/src/lsm/lsm_cursor.c
@@ -903,6 +903,8 @@ __clsm_close(WT_CURSOR *cursor)
__wt_free(session, clsm->cursors);
/* The WT_LSM_TREE owns the URI. */
cursor->uri = NULL;
+ if (clsm->lsm_tree != NULL)
+ __wt_lsm_tree_release(session, clsm->lsm_tree);
WT_TRET(__wt_cursor_close(cursor));
API_END(session);
@@ -951,16 +953,17 @@ __wt_clsm_open(WT_SESSION_IMPL *session,
WT_LSM_TREE *lsm_tree;
clsm = NULL;
+ cursor = NULL;
if (!WT_PREFIX_MATCH(uri, "lsm:"))
return (EINVAL);
/* Get the LSM tree. */
WT_WITH_SCHEMA_LOCK_OPT(session,
- ret = __wt_lsm_tree_get(session, uri, &lsm_tree));
+ ret = __wt_lsm_tree_get(session, uri, 0, &lsm_tree));
WT_RET(ret);
- WT_RET(__wt_calloc_def(session, 1, &clsm));
+ WT_ERR(__wt_calloc_def(session, 1, &clsm));
cursor = &clsm->iface;
*cursor = iface;
@@ -989,7 +992,10 @@ __wt_clsm_open(WT_SESSION_IMPL *session,
F_SET(cursor, WT_CURSTD_OVERWRITE);
if (0) {
-err: (void)__clsm_close(cursor);
+err: if (lsm_tree != NULL)
+ __wt_lsm_tree_release(session, lsm_tree);
+ if (cursor != NULL)
+ (void)__clsm_close(cursor);
}
return (ret);
diff --git a/src/lsm/lsm_tree.c b/src/lsm/lsm_tree.c
index f0818d2fca8..c5f1d2c95de 100644
--- a/src/lsm/lsm_tree.c
+++ b/src/lsm/lsm_tree.c
@@ -243,8 +243,10 @@ __wt_lsm_tree_create(WT_SESSION_IMPL *session,
const char *cfg[] = API_CONF_DEFAULTS(session, create, config);
/* If the tree is open, it already exists. */
- if ((ret = __wt_lsm_tree_get(session, uri, &lsm_tree)) == 0)
+ if ((ret = __wt_lsm_tree_get(session, uri, 0, &lsm_tree)) == 0) {
+ __wt_lsm_tree_release(session, lsm_tree);
return (exclusive ? EEXIST : 0);
+ }
WT_RET_NOTFOUND_OK(ret);
/* If the tree has metadata, it already exists. */
@@ -319,10 +321,12 @@ __wt_lsm_tree_create(WT_SESSION_IMPL *session,
/*
* Open our new tree and add it to the handle cache. Don't discard on
- * error the returned handle is NULL on error, and the metadata tracking
- * macros handle cleaning up on failure.
+ * error: the returned handle is NULL on error, and the metadata
+ * tracking macros handle cleaning up on failure.
*/
ret = __lsm_tree_open(session, uri, &lsm_tree);
+ if (ret == 0)
+ __wt_lsm_tree_release(session, lsm_tree);
if (0) {
err: __lsm_tree_discard(session, lsm_tree);
@@ -401,6 +405,7 @@ __lsm_tree_open(
lsm_tree->dsk_gen = 1;
/* Now the tree is setup, make it visible to others. */
+ lsm_tree->refcnt = 1;
TAILQ_INSERT_HEAD(&S2C(session)->lsmqh, lsm_tree, q);
F_SET(lsm_tree, WT_LSM_TREE_OPEN);
@@ -418,13 +423,17 @@ err: __lsm_tree_discard(session, lsm_tree);
* get an LSM tree structure for the given name.
*/
int
-__wt_lsm_tree_get(
- WT_SESSION_IMPL *session, const char *uri, WT_LSM_TREE **treep)
+__wt_lsm_tree_get(WT_SESSION_IMPL *session,
+ const char *uri, int exclusive, WT_LSM_TREE **treep)
{
WT_LSM_TREE *lsm_tree;
TAILQ_FOREACH(lsm_tree, &S2C(session)->lsmqh, q)
if (strcmp(uri, lsm_tree->name) == 0) {
+ if (exclusive && lsm_tree->refcnt)
+ return (EBUSY);
+
+ WT_ATOMIC_ADD(lsm_tree->refcnt, 1);
*treep = lsm_tree;
return (0);
}
@@ -437,6 +446,17 @@ __wt_lsm_tree_get(
}
/*
+ * __wt_lsm_tree_get --
+ * get an LSM tree structure for the given name.
+ */
+void
+__wt_lsm_tree_release(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
+{
+ WT_ASSERT(session, lsm_tree->refcnt > 0);
+ WT_ATOMIC_SUB(lsm_tree->refcnt, 1);
+}
+
+/*
* __wt_lsm_tree_switch --
* Switch to a new in-memory tree.
*/
@@ -492,7 +512,7 @@ __wt_lsm_tree_drop(
int i;
/* Get the LSM tree. */
- WT_RET(__wt_lsm_tree_get(session, name, &lsm_tree));
+ WT_RET(__wt_lsm_tree_get(session, name, 1, &lsm_tree));
/* Shut down the LSM worker. */
WT_RET(__lsm_tree_close(session, lsm_tree));
@@ -546,7 +566,7 @@ __wt_lsm_tree_rename(WT_SESSION_IMPL *session,
old = NULL;
/* Get the LSM tree. */
- WT_RET(__wt_lsm_tree_get(session, oldname, &lsm_tree));
+ WT_RET(__wt_lsm_tree_get(session, oldname, 1, &lsm_tree));
/* Shut down the LSM worker. */
WT_RET(__lsm_tree_close(session, lsm_tree));
@@ -612,7 +632,7 @@ __wt_lsm_tree_truncate(
WT_UNUSED(cfg);
/* Get the LSM tree. */
- WT_RET(__wt_lsm_tree_get(session, name, &lsm_tree));
+ WT_RET(__wt_lsm_tree_get(session, name, 1, &lsm_tree));
/* Shut down the LSM worker. */
WT_RET(__lsm_tree_close(session, lsm_tree));
@@ -633,6 +653,7 @@ __wt_lsm_tree_truncate(
WT_ERR(__lsm_tree_start_worker(session, lsm_tree));
__wt_spin_unlock(session, &lsm_tree->lock);
+ __wt_lsm_tree_release(session, lsm_tree);
if (0) {
err: __wt_spin_unlock(session, &lsm_tree->lock);
@@ -651,18 +672,21 @@ __wt_lsm_tree_worker(WT_SESSION_IMPL *session,
int (*func)(WT_SESSION_IMPL *, const char *[]),
const char *cfg[], uint32_t open_flags)
{
+ WT_DECL_RET;
WT_LSM_CHUNK *chunk;
WT_LSM_TREE *lsm_tree;
int i;
- WT_RET(__wt_lsm_tree_get(session, uri, &lsm_tree));
+ WT_RET(__wt_lsm_tree_get(session, uri,
+ FLD_ISSET(open_flags, WT_BTREE_EXCLUSIVE) ? 1 : 0, &lsm_tree));
for (i = 0; i < lsm_tree->nchunks; i++) {
chunk = lsm_tree->chunk[i];
if (func == __wt_checkpoint &&
F_ISSET(chunk, WT_LSM_CHUNK_ONDISK))
continue;
- WT_RET(__wt_schema_worker(
+ WT_ERR(__wt_schema_worker(
session, chunk->uri, func, cfg, open_flags));
}
- return (0);
+err: __wt_lsm_tree_release(session, lsm_tree);
+ return (ret);
}