summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/src/lsm/lsm_cursor.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/src/lsm/lsm_cursor.c')
-rw-r--r--src/third_party/wiredtiger/src/lsm/lsm_cursor.c275
1 files changed, 210 insertions, 65 deletions
diff --git a/src/third_party/wiredtiger/src/lsm/lsm_cursor.c b/src/third_party/wiredtiger/src/lsm/lsm_cursor.c
index e98f59e7b05..067c527a21a 100644
--- a/src/third_party/wiredtiger/src/lsm/lsm_cursor.c
+++ b/src/third_party/wiredtiger/src/lsm/lsm_cursor.c
@@ -10,7 +10,7 @@
#define WT_FORALL_CURSORS(clsm, c, i) \
for ((i) = (clsm)->nchunks; (i) > 0;) \
- if (((c) = (clsm)->cursors[--i]) != NULL)
+ if (((c) = (clsm)->chunks[--i]->cursor) != NULL)
#define WT_LSM_CURCMP(s, lsm_tree, c1, c2, cmp) \
__wt_compare(s, (lsm_tree)->collator, &(c1)->key, &(c2)->key, &cmp)
@@ -18,6 +18,7 @@
static int __clsm_lookup(WT_CURSOR_LSM *, WT_ITEM *);
static int __clsm_open_cursors(WT_CURSOR_LSM *, bool, u_int, uint32_t);
static int __clsm_reset_cursors(WT_CURSOR_LSM *, WT_CURSOR *);
+static int __clsm_search_near(WT_CURSOR *cursor, int *exactp);
/*
* __wt_clsm_request_switch --
@@ -109,7 +110,7 @@ __clsm_enter_update(WT_CURSOR_LSM *clsm)
primary = NULL;
have_primary = false;
} else {
- primary = clsm->cursors[clsm->nchunks - 1];
+ primary = clsm->chunks[clsm->nchunks - 1]->cursor;
primary_chunk = clsm->primary_chunk;
WT_ASSERT(session, F_ISSET(&session->txn, WT_TXN_HAS_ID));
have_primary = (primary != NULL && primary_chunk != NULL &&
@@ -165,8 +166,7 @@ __clsm_enter(WT_CURSOR_LSM *clsm, bool reset, bool update)
WT_LSM_TREE *lsm_tree;
WT_SESSION_IMPL *session;
WT_TXN *txn;
- uint64_t *switch_txnp;
- uint64_t snap_min;
+ uint64_t i, pinned_id , switch_txn;
lsm_tree = clsm->lsm_tree;
session = (WT_SESSION_IMPL *)clsm->iface.session;
@@ -226,8 +226,8 @@ __clsm_enter(WT_CURSOR_LSM *clsm, bool reset, bool update)
* that overlaps with our snapshot is a potential
* conflict.
*
- * Note that the global snap_min is correct here: it
- * tracks concurrent transactions excluding special
+ * Note that the pinned ID is correct here: it tracks
+ * concurrent transactions excluding special
* transactions such as checkpoint (which we can't
* conflict with because checkpoint only writes the
* metadata, which is not an LSM tree).
@@ -237,17 +237,18 @@ __clsm_enter(WT_CURSOR_LSM *clsm, bool reset, bool update)
F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT)) {
WT_ASSERT(session,
F_ISSET(txn, WT_TXN_HAS_SNAPSHOT));
- snap_min =
- WT_SESSION_TXN_STATE(session)->snap_min;
- for (switch_txnp =
- &clsm->switch_txn[clsm->nchunks - 2];
+ pinned_id =
+ WT_SESSION_TXN_STATE(session)->pinned_id;
+ for (i = clsm->nchunks - 2;
clsm->nupdates < clsm->nchunks;
- clsm->nupdates++, switch_txnp--) {
- if (WT_TXNID_LT(*switch_txnp, snap_min))
+ clsm->nupdates++, i--) {
+ switch_txn =
+ clsm->chunks[i]->switch_txn;
+ if (WT_TXNID_LT(switch_txn, pinned_id))
break;
WT_ASSERT(session,
!__wt_txn_visible_all(
- session, *switch_txnp));
+ session, switch_txn));
}
}
}
@@ -378,7 +379,7 @@ __clsm_close_cursors(WT_CURSOR_LSM *clsm, u_int start, u_int end)
WT_CURSOR *c;
u_int i;
- if (clsm->cursors == NULL || clsm->nchunks == 0)
+ if (clsm->chunks == NULL || clsm->nchunks == 0)
return (0);
/*
@@ -387,12 +388,12 @@ __clsm_close_cursors(WT_CURSOR_LSM *clsm, u_int start, u_int end)
* careful with unsigned integer wrapping.
*/
for (i = start; i < end; i++) {
- if ((c = (clsm)->cursors[i]) != NULL) {
- clsm->cursors[i] = NULL;
+ if ((c = (clsm)->chunks[i]->cursor) != NULL) {
+ clsm->chunks[i]->cursor = NULL;
WT_RET(c->close(c));
}
- if ((bloom = clsm->blooms[i]) != NULL) {
- clsm->blooms[i] = NULL;
+ if ((bloom = clsm->chunks[i]->bloom) != NULL) {
+ clsm->chunks[i]->bloom = NULL;
WT_RET(__wt_bloom_close(bloom));
}
}
@@ -401,6 +402,45 @@ __clsm_close_cursors(WT_CURSOR_LSM *clsm, u_int start, u_int end)
}
/*
+ * __clsm_resize_chunks --
+ * Allocates an array of unit objects for each chunk.
+ */
+static int
+__clsm_resize_chunks(
+ WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm, u_int nchunks)
+{
+ WT_DECL_RET;
+ WT_LSM_CURSOR_CHUNK *chunk;
+
+ /* Don't allocate more iterators if we don't need them. */
+ if (clsm->chunks_count >= nchunks) {
+ return (ret);
+ }
+
+ WT_RET(__wt_realloc_def(session, &clsm->chunks_alloc, nchunks,
+ &clsm->chunks));
+ for (; clsm->chunks_count < nchunks; clsm->chunks_count++) {
+ WT_RET(__wt_calloc_one(session, &chunk));
+ clsm->chunks[clsm->chunks_count] = chunk;
+ }
+ return (ret);
+}
+
+/*
+ * __clsm_free_chunks --
+ * Allocates an array of unit objects for each chunk.
+ */
+static void
+__clsm_free_chunks(WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm)
+{
+ size_t i;
+ for (i = 0; i < clsm->chunks_count; i++) {
+ __wt_free(session, clsm->chunks[i]);
+ }
+ __wt_free(session, clsm->chunks);
+}
+
+/*
* __clsm_open_cursors --
* Open cursors for the current set of files.
*/
@@ -409,7 +449,7 @@ __clsm_open_cursors(
WT_CURSOR_LSM *clsm, bool update, u_int start_chunk, uint32_t start_id)
{
WT_BTREE *btree;
- WT_CURSOR *c, **cp, *primary;
+ WT_CURSOR *c, *cursor, *primary;
WT_DECL_RET;
WT_LSM_CHUNK *chunk;
WT_LSM_TREE *lsm_tree;
@@ -422,6 +462,7 @@ __clsm_open_cursors(
bool locked;
c = &clsm->iface;
+ cursor = NULL;
session = (WT_SESSION_IMPL *)c->session;
txn = &session->txn;
chunk = NULL;
@@ -465,7 +506,7 @@ __clsm_open_cursors(
retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) {
nchunks = clsm->nchunks;
ngood = 0;
-
+ WT_ERR(__clsm_resize_chunks(session, clsm, nchunks));
/*
* We may have raced with another merge completing. Check that
* we're starting at the right offset in the chunk array.
@@ -486,16 +527,13 @@ retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) {
WT_ASSERT(session, start_chunk + nchunks <= lsm_tree->nchunks);
} else {
nchunks = lsm_tree->nchunks;
+ WT_ERR(__clsm_resize_chunks(session, clsm, nchunks));
/*
* If we are only opening the cursor for updates, only open the
* primary chunk, plus any other chunks that might be required
* to detect snapshot isolation conflicts.
*/
- if (F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT))
- WT_ERR(__wt_realloc_def(session,
- &clsm->txnid_alloc, nchunks,
- &clsm->switch_txn));
if (F_ISSET(clsm, WT_CLSM_OPEN_READ))
ngood = nupdates = 0;
else if (F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT)) {
@@ -504,11 +542,11 @@ retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) {
* chunk are globally visible. Copy the maximum
* transaction IDs into the cursor as we go.
*/
- for (ngood = nchunks - 1, nupdates = 1;
- ngood > 0;
+ for (ngood = nchunks - 1, nupdates = 1; ngood > 0;
ngood--, nupdates++) {
chunk = lsm_tree->chunk[ngood - 1];
- clsm->switch_txn[ngood - 1] = chunk->switch_txn;
+ clsm->chunks[ngood - 1]->switch_txn =
+ chunk->switch_txn;
if (__wt_txn_visible_all(
session, chunk->switch_txn))
break;
@@ -519,21 +557,20 @@ retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) {
}
/* Check how many cursors are already open. */
- for (cp = clsm->cursors + ngood;
- ngood < clsm->nchunks && ngood < nchunks;
- cp++, ngood++) {
+ for (; ngood < clsm->nchunks && ngood < nchunks; ngood++) {
chunk = lsm_tree->chunk[ngood];
+ cursor = clsm->chunks[ngood]->cursor;
/* If the cursor isn't open yet, we're done. */
- if (*cp == NULL)
+ if (cursor == NULL)
break;
/* Easy case: the URIs don't match. */
- if (strcmp((*cp)->uri, chunk->uri) != 0)
+ if (strcmp(cursor->uri, chunk->uri) != 0)
break;
/* Make sure the checkpoint config matches. */
- checkpoint = ((WT_CURSOR_BTREE *)*cp)->
+ checkpoint = ((WT_CURSOR_BTREE *)cursor)->
btree->dhandle->checkpoint;
if (checkpoint == NULL &&
F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) &&
@@ -541,7 +578,7 @@ retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) {
break;
/* Make sure the Bloom config matches. */
- if (clsm->blooms[ngood] == NULL &&
+ if (clsm->chunks[ngood]->bloom == NULL &&
F_ISSET(chunk, WT_LSM_CHUNK_BLOOM))
break;
}
@@ -559,7 +596,7 @@ retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) {
* full, we may block while closing a cursor. Save the
* generation number and retry if it has changed under us.
*/
- if (clsm->cursors != NULL && ngood < clsm->nchunks) {
+ if (clsm->chunks != NULL && ngood < clsm->nchunks) {
close_range_start = ngood;
close_range_end = clsm->nchunks;
} else if (!F_ISSET(clsm, WT_CLSM_OPEN_READ) && nupdates > 0 ) {
@@ -591,28 +628,23 @@ retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) {
clsm->current = NULL;
}
- WT_ERR(__wt_realloc_def(session,
- &clsm->bloom_alloc, nchunks, &clsm->blooms));
- WT_ERR(__wt_realloc_def(session,
- &clsm->cursor_alloc, nchunks, &clsm->cursors));
-
clsm->nchunks = nchunks;
/* Open the cursors for chunks that have changed. */
- for (i = ngood, cp = clsm->cursors + i; i != nchunks; i++, cp++) {
+ for (i = ngood; i != nchunks; i++) {
chunk = lsm_tree->chunk[i + start_chunk];
/* Copy the maximum transaction ID. */
if (F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT))
- clsm->switch_txn[i] = chunk->switch_txn;
+ clsm->chunks[i]->switch_txn = chunk->switch_txn;
/*
* Read from the checkpoint if the file has been written.
* Once all cursors switch, the in-memory tree can be evicted.
*/
- WT_ASSERT(session, *cp == NULL);
+ WT_ASSERT(session, clsm->chunks[i]->cursor == NULL);
ret = __wt_open_cursor(session, chunk->uri, c,
(F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) && !chunk->empty) ?
- ckpt_cfg : NULL, cp);
+ ckpt_cfg : NULL, &clsm->chunks[i]->cursor);
/*
* XXX kludge: we may have an empty chunk where no checkpoint
@@ -620,8 +652,8 @@ retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) {
* chunk instead.
*/
if (ret == WT_NOTFOUND && F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)) {
- ret = __wt_open_cursor(
- session, chunk->uri, c, NULL, cp);
+ ret = __wt_open_cursor(session,
+ chunk->uri, c, NULL, &clsm->chunks[i]->cursor);
if (ret == 0)
chunk->empty = 1;
}
@@ -634,25 +666,31 @@ retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) {
* write conflicts with concurrent updates.
*/
if (i != nchunks - 1)
- (*cp)->insert = __wt_curfile_update_check;
+ clsm->chunks[i]->cursor->insert =
+ __wt_curfile_update_check;
if (!F_ISSET(clsm, WT_CLSM_MERGE) &&
F_ISSET(chunk, WT_LSM_CHUNK_BLOOM))
WT_ERR(__wt_bloom_open(session, chunk->bloom_uri,
lsm_tree->bloom_bit_count,
lsm_tree->bloom_hash_count,
- c, &clsm->blooms[i]));
+ c, &clsm->chunks[i]->bloom));
/* Child cursors always use overwrite and raw mode. */
- F_SET(*cp, WT_CURSTD_OVERWRITE | WT_CURSTD_RAW);
+ F_SET(clsm->chunks[i]->cursor,
+ WT_CURSTD_OVERWRITE | WT_CURSTD_RAW);
}
+ /* Setup the count values for each chunk in the chunks*/
+ for (i = 0; i != clsm->nchunks; i++)
+ clsm->chunks[i]->count = lsm_tree->chunk[i]->count;
+
/* The last chunk is our new primary. */
if (chunk != NULL &&
!F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) &&
chunk->switch_txn == WT_TXN_NONE) {
clsm->primary_chunk = chunk;
- primary = clsm->cursors[clsm->nchunks - 1];
+ primary = clsm->chunks[clsm->nchunks - 1]->cursor;
/*
* Disable eviction for the in-memory chunk. Also clear the
* bulk load flag here, otherwise eviction will be enabled by
@@ -672,17 +710,19 @@ err:
#ifdef HAVE_DIAGNOSTIC
/* Check that all cursors are open as expected. */
if (ret == 0 && F_ISSET(clsm, WT_CLSM_OPEN_READ)) {
- for (i = 0, cp = clsm->cursors; i != clsm->nchunks; cp++, i++) {
+ for (i = 0; i != clsm->nchunks; i++) {
+ cursor = clsm->chunks[i]->cursor;
chunk = lsm_tree->chunk[i + start_chunk];
- /* Make sure the cursor is open. */
- WT_ASSERT(session, *cp != NULL);
+ /* Make sure the first cursor is open. */
+ WT_ASSERT(session, cursor != NULL);
/* Easy case: the URIs should match. */
- WT_ASSERT(session, strcmp((*cp)->uri, chunk->uri) == 0);
+ WT_ASSERT(
+ session, strcmp(cursor->uri, chunk->uri) == 0);
/* Make sure the checkpoint config matches. */
- checkpoint = ((WT_CURSOR_BTREE *)*cp)->
+ checkpoint = ((WT_CURSOR_BTREE *)cursor)->
btree->dhandle->checkpoint;
WT_ASSERT(session,
(F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) &&
@@ -693,7 +733,8 @@ err:
WT_ASSERT(session,
(F_ISSET(chunk, WT_LSM_CHUNK_BLOOM) &&
!F_ISSET(clsm, WT_CLSM_MERGE)) ?
- clsm->blooms[i] != NULL : clsm->blooms[i] == NULL);
+ clsm->chunks[i]->bloom != NULL :
+ clsm->chunks[i]->bloom == NULL);
}
}
#endif
@@ -902,6 +943,96 @@ err: __clsm_leave(clsm);
}
/*
+ * __clsm_random_chunk --
+ * Pick a chunk at random, weighted by the size of all chunks. Weighting
+ * proportional to documents avoids biasing towards small chunks. Then return
+ * the cursor on the chunk we have picked.
+ */
+static int
+__clsm_random_chunk(WT_SESSION_IMPL *session,
+ WT_CURSOR_LSM *clsm, WT_CURSOR **cursor)
+{
+ uint64_t checked_docs, i, rand_doc, total_docs;
+
+ /*
+ * If the tree is empty we cannot do a random lookup, so return a
+ * WT_NOTFOUND.
+ */
+ if (clsm->nchunks == 0)
+ return (WT_NOTFOUND);
+ for (total_docs = i = 0; i < clsm->nchunks; i++) {
+ total_docs += clsm->chunks[i]->count;
+ }
+ if (total_docs == 0)
+ return (WT_NOTFOUND);
+
+ rand_doc = __wt_random(&session->rnd) % total_docs;
+
+ for (checked_docs = i = 0; i < clsm->nchunks; i++) {
+ checked_docs += clsm->chunks[i]->count;
+ if (rand_doc <= checked_docs) {
+ *cursor = clsm->chunks[i]->cursor;
+ break;
+ }
+ }
+ return (0);
+}
+
+/*
+ * __clsm_next_random --
+ * WT_CURSOR->next method for the LSM cursor type when configured with
+ * next_random.
+ */
+static int
+__clsm_next_random(WT_CURSOR *cursor)
+{
+ WT_CURSOR_LSM *clsm;
+ WT_CURSOR *c;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ int exact;
+
+ c = NULL;
+ clsm = (WT_CURSOR_LSM *)cursor;
+
+ CURSOR_API_CALL(cursor, session, next, NULL);
+ WT_CURSOR_NOVALUE(cursor);
+ WT_ERR(__clsm_enter(clsm, false, false));
+
+ for (;;) {
+ WT_ERR(__clsm_random_chunk(session, clsm, &c));
+ /*
+ * This call to next_random on the chunk can potentially end in
+ * WT_NOTFOUND if the chunk we picked is empty. We want to retry
+ * in that case.
+ */
+ ret = __wt_curfile_next_random(c);
+ if (ret == WT_NOTFOUND)
+ continue;
+
+ WT_ERR(ret);
+ F_SET(cursor, WT_CURSTD_KEY_INT);
+ WT_ERR(c->get_key(c, &cursor->key));
+ /*
+ * Search near the current key to resolve any tombstones
+ * and position to a valid document. If we see a
+ * WT_NOTFOUND here that is valid, as the tree has no
+ * documents visible to us.
+ */
+ WT_ERR(__clsm_search_near(cursor, &exact));
+ break;
+ }
+
+ /* We have found a valid doc. Set that we are now positioned */
+ if (0) {
+err: F_CLR(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
+ }
+ __clsm_leave(clsm);
+ API_END(session, ret);
+ return (ret);
+}
+
+/*
* __clsm_prev --
* WT_CURSOR->prev method for the LSM cursor type.
*/
@@ -1072,7 +1203,7 @@ __clsm_lookup(WT_CURSOR_LSM *clsm, WT_ITEM *value)
WT_FORALL_CURSORS(clsm, c, i) {
/* If there is a Bloom filter, see if we can skip the read. */
bloom = NULL;
- if ((bloom = clsm->blooms[i]) != NULL) {
+ if ((bloom = clsm->chunks[i]->bloom) != NULL) {
if (!have_hash) {
__wt_bloom_hash(bloom, &cursor->key, &bhash);
have_hash = true;
@@ -1259,7 +1390,12 @@ __clsm_search_near(WT_CURSOR *cursor, int *exactp)
*/
F_CLR(cursor, WT_CURSTD_KEY_SET);
F_SET(cursor, WT_CURSTD_KEY_INT);
- if ((ret = cursor->next(cursor)) == 0) {
+ /*
+ * We call __clsm_next here as we want to advance
+ * forward. If we are a random LSM cursor calling next
+ * on the cursor will not advance as we intend.
+ */
+ if ((ret = __clsm_next(cursor)) == 0) {
cmp = 1;
deleted = false;
}
@@ -1268,7 +1404,11 @@ __clsm_search_near(WT_CURSOR *cursor, int *exactp)
}
if (deleted) {
clsm->current = NULL;
- WT_ERR(cursor->prev(cursor));
+ /*
+ * We call prev directly here as cursor->prev may be "invalid"
+ * if this is a random cursor.
+ */
+ WT_ERR(__clsm_prev(cursor));
cmp = -1;
}
*exactp = cmp;
@@ -1312,7 +1452,7 @@ __clsm_put(WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm,
* Clear the existing cursor position. Don't clear the primary cursor:
* we're about to use it anyway.
*/
- primary = clsm->cursors[clsm->nchunks - 1];
+ primary = clsm->chunks[clsm->nchunks - 1]->cursor;
WT_RET(__clsm_reset_cursors(clsm, primary));
/* If necessary, set the position for future scans. */
@@ -1322,12 +1462,12 @@ __clsm_put(WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm,
for (i = 0, slot = clsm->nchunks - 1; i < clsm->nupdates; i++, slot--) {
/* Check if we need to keep updating old chunks. */
if (i > 0 &&
- __wt_txn_visible(session, clsm->switch_txn[slot])) {
+ __wt_txn_visible(session, clsm->chunks[slot]->switch_txn)) {
clsm->nupdates = i;
break;
}
- c = clsm->cursors[slot];
+ c = clsm->chunks[slot]->cursor;
c->set_key(c, key);
c->set_value(c, value);
WT_RET((position && i == 0) ? c->update(c) : c->insert(c));
@@ -1485,9 +1625,7 @@ __wt_clsm_close(WT_CURSOR *cursor)
clsm = (WT_CURSOR_LSM *)cursor;
CURSOR_API_CALL(cursor, session, close, NULL);
WT_TRET(__clsm_close_cursors(clsm, 0, clsm->nchunks));
- __wt_free(session, clsm->blooms);
- __wt_free(session, clsm->cursors);
- __wt_free(session, clsm->switch_txn);
+ __clsm_free_chunks(session, clsm);
/* In case we were somehow left positioned, clear that. */
__clsm_leave(clsm);
@@ -1588,6 +1726,13 @@ __wt_clsm_open(WT_SESSION_IMPL *session,
*/
clsm->dsk_gen = 0;
+ /* If the next_random option is set, configure a random cursor */
+ WT_ERR(__wt_config_gets_def(session, cfg, "next_random", 0, &cval));
+ if (cval.val != 0) {
+ __wt_cursor_set_notsup(cursor);
+ cursor->next = __clsm_next_random;
+ }
+
WT_ERR(__wt_cursor_init(cursor, cursor->uri, owner, cfg, cursorp));
if (bulk)