summaryrefslogtreecommitdiff
path: root/src/cursor/cur_join.c
diff options
context:
space:
mode:
authorMichael Cahill <michael.cahill@mongodb.com>2016-05-05 15:38:12 +1000
committerMichael Cahill <michael.cahill@mongodb.com>2016-05-05 15:38:12 +1000
commit636a7b25ef3eca6b98009330f4d35337d4f35717 (patch)
tree7cc2e03ad96e206cbe73343feef10197023a37da /src/cursor/cur_join.c
parenteaa7b5f0fcc62f356c33a2c56f45b609a73ca5dd (diff)
parent75c22bc0c662622c14e5c47d99ff262cede2c6bf (diff)
downloadmongodb-3.3.6.tar.gz
Merge branch 'develop' into mongodb-3.4mongodb-3.3.6
Diffstat (limited to 'src/cursor/cur_join.c')
-rw-r--r--src/cursor/cur_join.c1498
1 files changed, 935 insertions, 563 deletions
diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c
index 38a83217933..fd7de53c981 100644
--- a/src/cursor/cur_join.c
+++ b/src/cursor/cur_join.c
@@ -8,159 +8,299 @@
#include "wt_internal.h"
+static int __curjoin_entries_in_range(WT_SESSION_IMPL *, WT_CURSOR_JOIN *,
+ WT_ITEM *, WT_CURSOR_JOIN_ITER *);
+static int __curjoin_entry_in_range(WT_SESSION_IMPL *, WT_CURSOR_JOIN_ENTRY *,
+ WT_ITEM *, WT_CURSOR_JOIN_ITER *);
+static int __curjoin_entry_member(WT_SESSION_IMPL *, WT_CURSOR_JOIN_ENTRY *,
+ WT_ITEM *, WT_CURSOR_JOIN_ITER *);
static int __curjoin_insert_endpoint(WT_SESSION_IMPL *,
WT_CURSOR_JOIN_ENTRY *, u_int, WT_CURSOR_JOIN_ENDPOINT **);
+static int __curjoin_iter_close(WT_CURSOR_JOIN_ITER *);
+static int __curjoin_iter_close_all(WT_CURSOR_JOIN_ITER *);
+static bool __curjoin_iter_ready(WT_CURSOR_JOIN_ITER *);
+static int __curjoin_iter_set_entry(WT_CURSOR_JOIN_ITER *, u_int);
+static int __curjoin_pack_recno(WT_SESSION_IMPL *, uint64_t, uint8_t *,
+ size_t, WT_ITEM *);
+static int __curjoin_split_key(WT_SESSION_IMPL *, WT_CURSOR_JOIN *, WT_ITEM *,
+ WT_CURSOR *, WT_CURSOR *, const char *, bool);
+
+#define WT_CURJOIN_ITER_CONSUMED(iter) \
+ ((iter)->entry_pos >= (iter)->entry_count)
/*
- * __curjoin_entry_iter_init --
+ * __wt_curjoin_joined --
+ * Produce an error that this cursor is being used in a join call.
+ */
+int
+__wt_curjoin_joined(WT_CURSOR *cursor)
+{
+ WT_SESSION_IMPL *session;
+
+ session = (WT_SESSION_IMPL *)cursor->session;
+ __wt_errx(session, "cursor is being used in a join");
+ return (ENOTSUP);
+}
+
+/*
+ * __curjoin_iter_init --
* Initialize an iteration for the index managed by a join entry.
*
*/
static int
-__curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
- WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ITER **iterp)
+__curjoin_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_CURSOR_JOIN_ITER **iterp)
{
- WT_CURSOR *to_dup;
- WT_DECL_RET;
- const char *raw_cfg[] = { WT_CONFIG_BASE(
- session, WT_SESSION_open_cursor), "raw", NULL };
- const char *def_cfg[] = { WT_CONFIG_BASE(
- session, WT_SESSION_open_cursor), NULL };
- const char *urimain, **config;
- char *mainbuf, *uri;
WT_CURSOR_JOIN_ITER *iter;
- size_t size;
-
- iter = NULL;
- mainbuf = uri = NULL;
- to_dup = entry->ends[0].cursor;
-
- if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW))
- config = &raw_cfg[0];
- else
- config = &def_cfg[0];
-
- size = strlen(to_dup->internal_uri) + 3;
- WT_ERR(__wt_calloc(session, size, 1, &uri));
- snprintf(uri, size, "%s()", to_dup->internal_uri);
- urimain = cjoin->table->name;
- if (cjoin->projection != NULL) {
- size = strlen(urimain) + strlen(cjoin->projection) + 1;
- WT_ERR(__wt_calloc(session, size, 1, &mainbuf));
- snprintf(mainbuf, size, "%s%s", urimain, cjoin->projection);
- urimain = mainbuf;
- }
- WT_ERR(__wt_calloc_one(session, &iter));
- WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, config,
- &iter->cursor));
- WT_ERR(__wt_cursor_dup_position(to_dup, iter->cursor));
- WT_ERR(__wt_open_cursor(session, urimain, (WT_CURSOR *)cjoin, config,
- &iter->main));
+ *iterp = NULL;
+ WT_RET(__wt_calloc_one(session, iterp));
+ iter = *iterp;
iter->cjoin = cjoin;
iter->session = session;
- iter->entry = entry;
- iter->positioned = false;
- iter->isequal = (entry->ends_next == 1 &&
- WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_EQ);
- *iterp = iter;
+ cjoin->iter = iter;
+ WT_RET(__curjoin_iter_set_entry(iter, 0));
+ return (0);
+}
- if (0) {
-err: __wt_free(session, iter);
- }
- __wt_free(session, mainbuf);
- __wt_free(session, uri);
+/*
+ * __curjoin_iter_close --
+ * Close the iteration, release resources.
+ *
+ */
+static int
+__curjoin_iter_close(WT_CURSOR_JOIN_ITER *iter)
+{
+ WT_DECL_RET;
+
+ if (iter->cursor != NULL)
+ WT_TRET(iter->cursor->close(iter->cursor));
+ __wt_free(iter->session, iter);
return (ret);
}
/*
- * __curjoin_pack_recno --
- * Pack the given recno into a buffer; prepare an item referencing it.
+ * __curjoin_iter_close_all --
+ * Free the iterator and all of its children recursively.
*
*/
static int
-__curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf,
- size_t bufsize, WT_ITEM *item)
+__curjoin_iter_close_all(WT_CURSOR_JOIN_ITER *iter)
{
- WT_SESSION *wtsession;
- size_t sz;
+ WT_CURSOR_JOIN *parent;
+ WT_DECL_RET;
- wtsession = (WT_SESSION *)session;
- WT_RET(wiredtiger_struct_size(wtsession, &sz, "r", r));
- WT_ASSERT(session, sz < bufsize);
- WT_RET(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r));
- item->size = sz;
- item->data = buf;
+ if (iter->child)
+ WT_TRET(__curjoin_iter_close_all(iter->child));
+ iter->child = NULL;
+ WT_ASSERT(iter->session, iter->cjoin->parent == NULL ||
+ iter->cjoin->parent->iter->child == iter);
+ if ((parent = iter->cjoin->parent) != NULL)
+ parent->iter->child = NULL;
+ iter->cjoin->iter = NULL;
+ WT_TRET(__curjoin_iter_close(iter));
+ return (ret);
+}
+
+/*
+ * __curjoin_iter_reset --
+ * Reset an iteration to the starting point.
+ *
+ */
+static int
+__curjoin_iter_reset(WT_CURSOR_JOIN_ITER *iter)
+{
+ if (iter->child != NULL)
+ WT_RET(__curjoin_iter_close_all(iter->child));
+ WT_RET(__curjoin_iter_set_entry(iter, 0));
+ iter->positioned = false;
return (0);
}
/*
- * __curjoin_split_key --
- * Copy the primary key from a cursor (either main table or index)
- * to another cursor. When copying from an index file, the index
- * key is also returned.
+ * __curjoin_iter_ready --
+ * Check the positioned flag for all nested iterators.
+ *
+ */
+static bool
+__curjoin_iter_ready(WT_CURSOR_JOIN_ITER *iter)
+{
+ while (iter != NULL) {
+ if (!iter->positioned)
+ return (false);
+ iter = iter->child;
+ }
+ return (true);
+}
+
+/*
+ * __curjoin_iter_set_entry --
+ * Set the current entry for an iterator.
*
*/
static int
-__curjoin_split_key(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
- WT_ITEM *idxkey, WT_CURSOR *tocur, WT_CURSOR *fromcur,
- const char *repack_fmt, bool isindex)
+__curjoin_iter_set_entry(WT_CURSOR_JOIN_ITER *iter, u_int entry_pos)
{
- WT_CURSOR *firstcg_cur;
- WT_CURSOR_INDEX *cindex;
- WT_ITEM *keyp;
- const uint8_t *p;
+ WT_CURSOR *c, *to_dup;
+ WT_CURSOR_JOIN *cjoin, *topjoin;
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ char *uri;
+ const char *raw_cfg[] = { WT_CONFIG_BASE(
+ iter->session, WT_SESSION_open_cursor), "raw", NULL };
+ const char *def_cfg[] = { WT_CONFIG_BASE(
+ iter->session, WT_SESSION_open_cursor), NULL };
+ const char **config;
+ size_t size;
- if (isindex) {
- cindex = ((WT_CURSOR_INDEX *)fromcur);
- /*
- * Repack tells us where the index key ends; advance past
- * that to get where the raw primary key starts.
- */
- WT_RET(__wt_struct_repack(session, cindex->child->key_format,
- repack_fmt != NULL ? repack_fmt : cindex->iface.key_format,
- &cindex->child->key, idxkey));
- WT_ASSERT(session, cindex->child->key.size > idxkey->size);
- tocur->key.data = (uint8_t *)idxkey->data + idxkey->size;
- tocur->key.size = cindex->child->key.size - idxkey->size;
- if (WT_CURSOR_RECNO(tocur)) {
- p = (const uint8_t *)tocur->key.data;
- WT_RET(__wt_vunpack_uint(&p, tocur->key.size,
- &tocur->recno));
- } else
- tocur->recno = 0;
- } else {
- firstcg_cur = ((WT_CURSOR_TABLE *)fromcur)->cg_cursors[0];
- keyp = &firstcg_cur->key;
- if (WT_CURSOR_RECNO(tocur)) {
- WT_ASSERT(session, keyp->size == sizeof(uint64_t));
- tocur->recno = *(uint64_t *)keyp->data;
- WT_RET(__curjoin_pack_recno(session, tocur->recno,
- cjoin->recno_buf, sizeof(cjoin->recno_buf),
- &tocur->key));
- } else {
- WT_ITEM_SET(tocur->key, *keyp);
- tocur->recno = 0;
+ session = iter->session;
+ cjoin = iter->cjoin;
+ uri = NULL;
+ entry = iter->entry = &cjoin->entries[entry_pos];
+ iter->positioned = false;
+ iter->entry_pos = entry_pos;
+ iter->end_pos = 0;
+
+ iter->is_equal = (entry->ends_next == 1 &&
+ WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_EQ);
+ iter->end_skip = (entry->ends_next > 0 &&
+ WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_GE) ? 1 : 0;
+
+ iter->end_count = WT_MIN(1, entry->ends_next);
+ if (F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION)) {
+ iter->entry_count = cjoin->entries_next;
+ if (iter->is_equal)
+ iter->end_count = entry->ends_next;
+ } else
+ iter->entry_count = 1;
+ WT_ASSERT(iter->session, iter->entry_pos < iter->entry_count);
+
+ entry->stats.actual_count = 0;
+
+ if (entry->subjoin == NULL) {
+ for (topjoin = iter->cjoin; topjoin->parent != NULL;
+ topjoin = topjoin->parent)
+ ;
+ to_dup = entry->ends[0].cursor;
+
+ if (F_ISSET((WT_CURSOR *)topjoin, WT_CURSTD_RAW))
+ config = &raw_cfg[0];
+ else
+ config = &def_cfg[0];
+
+ size = strlen(to_dup->internal_uri) + 3;
+ WT_ERR(__wt_calloc(session, size, 1, &uri));
+ snprintf(uri, size, "%s()", to_dup->internal_uri);
+ if ((c = iter->cursor) == NULL || !WT_STREQ(c->uri, uri)) {
+ iter->cursor = NULL;
+ if (c != NULL)
+ WT_ERR(c->close(c));
+ WT_ERR(__wt_open_cursor(session, uri,
+ (WT_CURSOR *)topjoin, config, &iter->cursor));
}
- idxkey->data = NULL;
- idxkey->size = 0;
+ WT_ERR(__wt_cursor_dup_position(to_dup, iter->cursor));
+ } else if (iter->cursor != NULL) {
+ WT_ERR(iter->cursor->close(iter->cursor));
+ iter->cursor = NULL;
+ }
+
+err: __wt_free(session, uri);
+ return (ret);
+}
+
+/*
+ * __curjoin_iter_bump --
+ * Called to advance the iterator to the next endpoint, which may in turn
+ * advance to the next entry.
+ */
+static int
+__curjoin_iter_bump(WT_CURSOR_JOIN_ITER *iter)
+{
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_SESSION_IMPL *session;
+
+ session = iter->session;
+ iter->positioned = false;
+ entry = iter->entry;
+ if (entry->subjoin == NULL && iter->is_equal &&
+ ++iter->end_pos < iter->end_count) {
+ WT_RET(__wt_cursor_dup_position(
+ entry->ends[iter->end_pos].cursor, iter->cursor));
+ return (0);
}
+ iter->end_pos = iter->end_count = iter->end_skip = 0;
+ if (entry->subjoin != NULL && entry->subjoin->iter != NULL)
+ WT_RET(__curjoin_iter_close_all(entry->subjoin->iter));
+
+ if (++iter->entry_pos >= iter->entry_count) {
+ iter->entry = NULL;
+ return (0);
+ }
+ iter->entry = ++entry;
+ if (entry->subjoin != NULL) {
+ WT_RET(__curjoin_iter_init(session, entry->subjoin,
+ &iter->child));
+ return (0);
+ }
+ WT_RET(__curjoin_iter_set_entry(iter, iter->entry_pos));
return (0);
}
/*
- * __curjoin_entry_iter_next --
+ * __curjoin_iter_next --
* Get the next item in an iteration.
*
*/
static int
-__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor)
+__curjoin_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor)
{
- if (iter->positioned)
- WT_RET(iter->cursor->next(iter->cursor));
- else
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+
+ session = iter->session;
+
+ if (WT_CURJOIN_ITER_CONSUMED(iter))
+ return (WT_NOTFOUND);
+again:
+ entry = iter->entry;
+ if (entry->subjoin != NULL) {
+ if (iter->child == NULL)
+ WT_RET(__curjoin_iter_init(session,
+ entry->subjoin, &iter->child));
+ ret = __curjoin_iter_next(iter->child, cursor);
+ if (ret == 0) {
+ /* The child did the work, we're done. */
+ iter->curkey = &cursor->key;
+ iter->positioned = true;
+ return (ret);
+ }
+ else if (ret == WT_NOTFOUND) {
+ WT_RET(__curjoin_iter_close_all(iter->child));
+ entry->subjoin->iter = NULL;
+ iter->child = NULL;
+ WT_RET(__curjoin_iter_bump(iter));
+ ret = 0;
+ }
+ } else if (iter->positioned) {
+ ret = iter->cursor->next(iter->cursor);
+ if (ret == WT_NOTFOUND) {
+ WT_RET(__curjoin_iter_bump(iter));
+ ret = 0;
+ } else
+ WT_RET(ret);
+ } else
iter->positioned = true;
+ if (WT_CURJOIN_ITER_CONSUMED(iter))
+ return (WT_NOTFOUND);
+
+ if (!__curjoin_iter_ready(iter))
+ goto again;
+
+ WT_RET(ret);
+
/*
* Set our key to the primary key, we'll also need this
* to check membership.
@@ -175,51 +315,380 @@ __curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor)
}
/*
- * __curjoin_entry_iter_reset --
- * Reset an iteration to the starting point.
- *
+ * __curjoin_close --
+ * WT_CURSOR::close for join cursors.
*/
static int
-__curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter)
+__curjoin_close(WT_CURSOR *cursor)
{
- if (iter->positioned) {
- WT_RET(iter->cursor->reset(iter->cursor));
- WT_RET(iter->main->reset(iter->main));
- WT_RET(__wt_cursor_dup_position(
- iter->cjoin->entries[0].ends[0].cursor, iter->cursor));
- iter->positioned = false;
- iter->entry->stats.actual_count = 0;
+ WT_CURSOR_JOIN *cjoin;
+ WT_CURSOR_JOIN_ENDPOINT *end;
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ u_int i;
+
+ cjoin = (WT_CURSOR_JOIN *)cursor;
+
+ JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL);
+
+ __wt_schema_release_table(session, cjoin->table);
+ /* These are owned by the table */
+ cursor->internal_uri = NULL;
+ cursor->key_format = NULL;
+ if (cjoin->projection != NULL) {
+ __wt_free(session, cjoin->projection);
+ __wt_free(session, cursor->value_format);
+ }
+
+ for (entry = cjoin->entries, i = 0; i < cjoin->entries_next;
+ entry++, i++) {
+ if (entry->subjoin != NULL) {
+ F_CLR(&entry->subjoin->iface, WT_CURSTD_JOINED);
+ entry->subjoin->parent = NULL;
+ }
+ if (entry->main != NULL)
+ WT_TRET(entry->main->close(entry->main));
+ if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM))
+ WT_TRET(__wt_bloom_close(entry->bloom));
+ for (end = &entry->ends[0];
+ end < &entry->ends[entry->ends_next]; end++) {
+ F_CLR(end->cursor, WT_CURSTD_JOINED);
+ if (F_ISSET(end, WT_CURJOIN_END_OWN_CURSOR))
+ WT_TRET(end->cursor->close(end->cursor));
+ }
+ __wt_free(session, entry->ends);
+ __wt_free(session, entry->repack_format);
+ }
+
+ if (cjoin->iter != NULL)
+ WT_TRET(__curjoin_iter_close_all(cjoin->iter));
+ if (cjoin->main != NULL)
+ WT_TRET(cjoin->main->close(cjoin->main));
+
+ __wt_free(session, cjoin->entries);
+ WT_TRET(__wt_cursor_close(cursor));
+
+err: API_END_RET(session, ret);
+}
+
+/*
+ * __curjoin_endpoint_init_key --
+ * Set the key in the reference endpoint.
+ */
+static int
+__curjoin_endpoint_init_key(WT_SESSION_IMPL *session,
+ WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ENDPOINT *endpoint)
+{
+ WT_CURSOR *cursor;
+ WT_CURSOR_INDEX *cindex;
+ WT_ITEM *k;
+ uint64_t r;
+
+ if ((cursor = endpoint->cursor) != NULL) {
+ if (entry->index != NULL) {
+ /* Extract and save the index's logical key. */
+ cindex = (WT_CURSOR_INDEX *)endpoint->cursor;
+ WT_RET(__wt_struct_repack(session,
+ cindex->child->key_format,
+ (entry->repack_format != NULL ?
+ entry->repack_format : cindex->iface.key_format),
+ &cindex->child->key, &endpoint->key));
+ } else {
+ k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key;
+ if (WT_CURSOR_RECNO(cursor)) {
+ r = *(uint64_t *)k->data;
+ WT_RET(__curjoin_pack_recno(session, r,
+ endpoint->recno_buf,
+ sizeof(endpoint->recno_buf),
+ &endpoint->key));
+ }
+ else
+ endpoint->key = *k;
+ }
}
return (0);
}
/*
- * __curjoin_entry_iter_ready --
- * The iterator is positioned.
- *
+ * __curjoin_entries_in_range --
+ * Check if a key is in the range specified by the remaining entries,
+ * returning WT_NOTFOUND if not.
*/
-static bool
-__curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter)
+static int
+__curjoin_entries_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_ITEM *curkey, WT_CURSOR_JOIN_ITER *iterarg)
{
- return (iter->positioned);
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_CURSOR_JOIN_ITER *iter;
+ WT_DECL_RET;
+ int fastret, slowret;
+ u_int pos;
+
+ iter = iterarg;
+ if (F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION)) {
+ fastret = 0;
+ slowret = WT_NOTFOUND;
+ } else {
+ fastret = WT_NOTFOUND;
+ slowret = 0;
+ }
+ pos = iter == NULL ? 0 : iter->entry_pos;
+ for (entry = &cjoin->entries[pos]; pos < cjoin->entries_next;
+ entry++, pos++) {
+ ret = __curjoin_entry_member(session, entry, curkey, iter);
+ if (ret == fastret)
+ return (fastret);
+ if (ret != slowret)
+ break;
+ iter = NULL;
+ }
+
+ return (ret == 0 ? slowret : ret);
}
/*
- * __curjoin_entry_iter_close --
- * Close the iteration, release resources.
- *
+ * __curjoin_entry_in_range --
+ * Check if a key is in the range specified by the entry, returning
+ * WT_NOTFOUND if not.
*/
static int
-__curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *iter)
+__curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry,
+ WT_ITEM *curkey, WT_CURSOR_JOIN_ITER *iter)
{
+ WT_COLLATOR *collator;
+ WT_CURSOR_JOIN_ENDPOINT *end, *endmax;
+ bool disjunction, passed;
+ int cmp;
+ u_int pos;
+
+ collator = (entry->index != NULL) ? entry->index->collator : NULL;
+ endmax = &entry->ends[entry->ends_next];
+ disjunction = F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION);
+ passed = false;
+
+ /*
+ * The iterator may have already satisfied some endpoint conditions.
+ * If so and we're a disjunction, we're done. If so and we're a
+ * conjunction, we can start past the satisfied conditions.
+ */
+ if (iter == NULL)
+ pos = 0;
+ else {
+ if (disjunction && iter->end_skip)
+ return (0);
+ pos = iter->end_pos + iter->end_skip;
+ }
+
+ for (end = &entry->ends[pos]; end < endmax; end++) {
+ WT_RET(__wt_compare(session, collator, curkey, &end->key,
+ &cmp));
+ switch (WT_CURJOIN_END_RANGE(end)) {
+ case WT_CURJOIN_END_EQ:
+ passed = (cmp == 0);
+ break;
+
+ case WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ:
+ passed = (cmp >= 0);
+ WT_ASSERT(session, iter == NULL);
+ break;
+
+ case WT_CURJOIN_END_GT:
+ passed = (cmp > 0);
+ if (passed && iter != NULL && pos == 0)
+ iter->end_skip = 1;
+ break;
+
+ case WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ:
+ passed = (cmp <= 0);
+ break;
+
+ case WT_CURJOIN_END_LT:
+ passed = (cmp < 0);
+ break;
+
+ default:
+ WT_RET(__wt_illegal_value(session, NULL));
+ break;
+ }
+
+ if (!passed) {
+ if (iter != NULL &&
+ (iter->is_equal ||
+ F_ISSET(end, WT_CURJOIN_END_LT))) {
+ WT_RET(__curjoin_iter_bump(iter));
+ return (WT_NOTFOUND);
+ }
+ if (!disjunction)
+ return (WT_NOTFOUND);
+ iter = NULL;
+ } else if (disjunction)
+ break;
+ }
+ if (disjunction && end == endmax)
+ return (WT_NOTFOUND);
+ else
+ return (0);
+}
+
+typedef struct {
+ WT_CURSOR iface;
+ WT_CURSOR_JOIN_ENTRY *entry;
+ bool ismember;
+} WT_CURJOIN_EXTRACTOR;
+
+/*
+ * __curjoin_extract_insert --
+ * Handle a key produced by a custom extractor.
+ */
+static int
+__curjoin_extract_insert(WT_CURSOR *cursor) {
+ WT_CURJOIN_EXTRACTOR *cextract;
WT_DECL_RET;
+ WT_ITEM ikey;
+ WT_SESSION_IMPL *session;
- if (iter->cursor != NULL)
- WT_TRET(iter->cursor->close(iter->cursor));
- if (iter->main != NULL)
- WT_TRET(iter->main->close(iter->main));
- __wt_free(iter->session, iter);
+ cextract = (WT_CURJOIN_EXTRACTOR *)cursor;
+ /*
+ * This insert method may be called multiple times during a single
+ * extraction. If we already have a definitive answer to the
+ * membership question, exit early.
+ */
+ if (cextract->ismember)
+ return (0);
+
+ session = (WT_SESSION_IMPL *)cursor->session;
+
+ WT_ITEM_SET(ikey, cursor->key);
+ /*
+ * We appended a padding byte to the key to avoid rewriting the last
+ * column. Strip that away here.
+ */
+ WT_ASSERT(session, ikey.size > 0);
+ --ikey.size;
+
+ ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false);
+ if (ret == WT_NOTFOUND)
+ ret = 0;
+ else if (ret == 0)
+ cextract->ismember = true;
+
+ return (ret);
+}
+
+/*
+ * __curjoin_entry_member --
+ * Do a membership check for a particular index that was joined,
+ * if not a member, returns WT_NOTFOUND.
+ */
+static int
+__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry,
+ WT_ITEM *key, WT_CURSOR_JOIN_ITER *iter)
+{
+ WT_CURJOIN_EXTRACTOR extract_cursor;
+ WT_CURSOR *c;
+ WT_CURSOR_STATIC_INIT(iface,
+ __wt_cursor_get_key, /* get-key */
+ __wt_cursor_get_value, /* get-value */
+ __wt_cursor_set_key, /* set-key */
+ __wt_cursor_set_value, /* set-value */
+ __wt_cursor_compare_notsup, /* compare */
+ __wt_cursor_equals_notsup, /* equals */
+ __wt_cursor_notsup, /* next */
+ __wt_cursor_notsup, /* prev */
+ __wt_cursor_notsup, /* reset */
+ __wt_cursor_notsup, /* search */
+ __wt_cursor_search_near_notsup, /* search-near */
+ __curjoin_extract_insert, /* insert */
+ __wt_cursor_notsup, /* update */
+ __wt_cursor_notsup, /* remove */
+ __wt_cursor_reconfigure_notsup, /* reconfigure */
+ __wt_cursor_notsup); /* close */
+ WT_DECL_RET;
+ WT_INDEX *idx;
+ WT_ITEM v;
+ bool bloom_found;
+
+ if (entry->subjoin == NULL && iter != NULL &&
+ (iter->end_pos + iter->end_skip >= entry->ends_next ||
+ (iter->end_skip > 0 &&
+ F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION))))
+ return (0); /* no checks to make */
+
+ entry->stats.accesses++;
+ bloom_found = false;
+
+ if (entry->bloom != NULL) {
+ /*
+ * If we don't own the Bloom filter, we must be sharing one
+ * in a previous entry. So the shared filter has already
+ * been checked and passed.
+ */
+ if (!F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM))
+ return (0);
+
+ /*
+ * If the item is not in the Bloom filter, we return
+ * immediately, otherwise, we still need to check the
+ * long way.
+ */
+ WT_ERR(__wt_bloom_inmem_get(entry->bloom, key));
+ bloom_found = true;
+ }
+ if (entry->subjoin != NULL) {
+ WT_ASSERT(session,
+ iter == NULL || entry->subjoin == iter->child->cjoin);
+ ret = __curjoin_entries_in_range(session, entry->subjoin,
+ key, iter == NULL ? NULL : iter->child);
+ if (iter != NULL &&
+ WT_CURJOIN_ITER_CONSUMED(iter->child)) {
+ WT_ERR(__curjoin_iter_bump(iter));
+ ret = WT_NOTFOUND;
+ }
+ return (ret);
+ }
+ if (entry->index != NULL) {
+ /*
+ * If this entry is used by the iterator, then we already
+ * have the index key, and we won't have to do any
+ * extraction either.
+ */
+ if (iter != NULL && entry == iter->entry)
+ WT_ITEM_SET(v, iter->idxkey);
+ else {
+ memset(&v, 0, sizeof(v)); /* Keep lint quiet. */
+ c = entry->main;
+ c->set_key(c, key);
+ if ((ret = c->search(c)) == 0)
+ ret = c->get_value(c, &v);
+ else if (ret == WT_NOTFOUND)
+ WT_ERR_MSG(session, WT_ERROR,
+ "main table for join is missing entry");
+ WT_TRET(c->reset(c));
+ WT_ERR(ret);
+ }
+ } else
+ WT_ITEM_SET(v, *key);
+
+ if ((idx = entry->index) != NULL && idx->extractor != NULL &&
+ (iter == NULL || entry != iter->entry)) {
+ WT_CLEAR(extract_cursor);
+ extract_cursor.iface = iface;
+ extract_cursor.iface.session = &session->iface;
+ extract_cursor.iface.key_format = idx->exkey_format;
+ extract_cursor.ismember = false;
+ extract_cursor.entry = entry;
+ WT_ERR(idx->extractor->extract(idx->extractor,
+ &session->iface, key, &v, &extract_cursor.iface));
+ if (!extract_cursor.ismember)
+ WT_ERR(WT_NOTFOUND);
+ } else
+ WT_ERR(__curjoin_entry_in_range(session, entry, &v, iter));
+ if (0) {
+err: if (ret == WT_NOTFOUND && bloom_found)
+ entry->stats.bloom_false_positive++;
+ }
return (ret);
}
@@ -238,10 +707,10 @@ __curjoin_get_key(WT_CURSOR *cursor, ...)
cjoin = (WT_CURSOR_JOIN *)cursor;
va_start(ap, cursor);
- CURSOR_API_CALL(cursor, session, get_key, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, get_key, NULL);
if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) ||
- !__curjoin_entry_iter_ready(cjoin->iter))
+ !cjoin->iter->positioned)
WT_ERR_MSG(session, EINVAL,
"join cursor must be advanced with next()");
WT_ERR(__wt_cursor_get_keyv(cursor, cursor->flags, ap));
@@ -258,23 +727,21 @@ static int
__curjoin_get_value(WT_CURSOR *cursor, ...)
{
WT_CURSOR_JOIN *cjoin;
- WT_CURSOR_JOIN_ITER *iter;
WT_DECL_RET;
WT_SESSION_IMPL *session;
va_list ap;
cjoin = (WT_CURSOR_JOIN *)cursor;
- iter = cjoin->iter;
va_start(ap, cursor);
- CURSOR_API_CALL(cursor, session, get_value, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL);
if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) ||
- !__curjoin_entry_iter_ready(iter))
+ !cjoin->iter->positioned)
WT_ERR_MSG(session, EINVAL,
"join cursor must be advanced with next()");
- WT_ERR(__wt_curtable_get_valuev(iter->main, ap));
+ WT_ERR(__wt_curtable_get_valuev(cjoin->main, ap));
err: va_end(ap);
API_END_RET(session, ret);
@@ -298,7 +765,8 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
session, WT_SESSION_open_cursor), "raw", NULL };
const char *uri;
size_t size;
- int cmp, skip;
+ u_int skip;
+ int cmp;
c = NULL;
skip = 0;
@@ -354,7 +822,34 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
for (end = &entry->ends[skip]; end < endmax; end++) {
WT_ERR(__wt_compare(session, collator, &curkey,
&end->key, &cmp));
- if (!F_ISSET(end, WT_CURJOIN_END_LT)) {
+ if (F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)) {
+ /* if condition satisfied, insert immediately */
+ switch (WT_CURJOIN_END_RANGE(end)) {
+ case WT_CURJOIN_END_EQ:
+ if (cmp == 0)
+ goto insert;
+ break;
+ case WT_CURJOIN_END_GT:
+ if (cmp > 0) {
+ /* skip this check next time */
+ skip = entry->ends_next;
+ goto insert;
+ }
+ break;
+ case WT_CURJOIN_END_GE:
+ if (cmp >= 0)
+ goto insert;
+ break;
+ case WT_CURJOIN_END_LT:
+ if (cmp < 0)
+ goto insert;
+ break;
+ case WT_CURJOIN_END_LE:
+ if (cmp <= 0)
+ goto insert;
+ break;
+ }
+ } else if (!F_ISSET(end, WT_CURJOIN_END_LT)) {
if (cmp < 0 || (cmp == 0 &&
!F_ISSET(end, WT_CURJOIN_END_EQ)))
goto advance;
@@ -370,6 +865,14 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
goto done;
}
}
+ /*
+ * Either it's a disjunction that hasn't satisfied any
+ * condition, or it's a conjunction that has satisfied all
+ * conditions.
+ */
+ if (F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION))
+ goto advance;
+insert:
if (entry->index != NULL) {
curvalue.data =
(unsigned char *)curkey.data + curkey.size;
@@ -394,107 +897,86 @@ err: if (c != NULL)
}
/*
- * __curjoin_endpoint_init_key --
- * Set the key in the reference endpoint.
- */
-static int
-__curjoin_endpoint_init_key(WT_SESSION_IMPL *session,
- WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ENDPOINT *endpoint)
-{
- WT_CURSOR *cursor;
- WT_CURSOR_INDEX *cindex;
- WT_ITEM *k;
- uint64_t r;
-
- if ((cursor = endpoint->cursor) != NULL) {
- if (entry->index != NULL) {
- /* Extract and save the index's logical key. */
- cindex = (WT_CURSOR_INDEX *)endpoint->cursor;
- WT_RET(__wt_struct_repack(session,
- cindex->child->key_format,
- (entry->repack_format != NULL ?
- entry->repack_format : cindex->iface.key_format),
- &cindex->child->key, &endpoint->key));
- } else {
- k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key;
- if (WT_CURSOR_RECNO(cursor)) {
- r = *(uint64_t *)k->data;
- WT_RET(__curjoin_pack_recno(session, r,
- endpoint->recno_buf,
- sizeof(endpoint->recno_buf),
- &endpoint->key));
- }
- else
- endpoint->key = *k;
- }
- }
- return (0);
-}
-
-/*
- * __curjoin_init_iter --
- * Initialize before any iteration.
+ * __curjoin_init_next --
+ * Initialize the cursor join when the next function is first called.
*/
static int
-__curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin)
+__curjoin_init_next(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ bool iterable)
{
WT_BLOOM *bloom;
WT_DECL_RET;
WT_CURSOR *origcur;
WT_CURSOR_JOIN_ENTRY *je, *jeend, *je2;
WT_CURSOR_JOIN_ENDPOINT *end;
+ char *mainbuf;
const char *def_cfg[] = { WT_CONFIG_BASE(
session, WT_SESSION_open_cursor), NULL };
const char *raw_cfg[] = { WT_CONFIG_BASE(
session, WT_SESSION_open_cursor), "raw", NULL };
+ const char **config, *proj, *urimain;
+ size_t size;
uint32_t f, k;
+ mainbuf = NULL;
if (cjoin->entries_next == 0)
WT_RET_MSG(session, EINVAL,
"join cursor has not yet been joined with any other "
"cursors");
- je = &cjoin->entries[0];
- jeend = &cjoin->entries[cjoin->entries_next];
-
- /*
- * For a single compare=le endpoint in the first iterated entry,
- * construct a companion compare=ge endpoint that will actually
- * be iterated.
- */
- if (((je = cjoin->entries) != jeend) &&
- je->ends_next == 1 && F_ISSET(&je->ends[0], WT_CURJOIN_END_LT)) {
- origcur = je->ends[0].cursor;
- WT_RET(__curjoin_insert_endpoint(session, je, 0, &end));
- WT_RET(__wt_open_cursor(session, origcur->uri,
- (WT_CURSOR *)cjoin,
- F_ISSET(origcur, WT_CURSTD_RAW) ? raw_cfg : def_cfg,
- &end->cursor));
- WT_RET(end->cursor->next(end->cursor));
- end->flags = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ |
- WT_CURJOIN_END_OWN_CURSOR;
+ if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW))
+ config = &raw_cfg[0];
+ else
+ config = &def_cfg[0];
+ urimain = cjoin->table->name;
+ if ((proj = cjoin->projection) != NULL) {
+ size = strlen(urimain) + strlen(proj) + 1;
+ WT_ERR(__wt_calloc(session, size, 1, &mainbuf));
+ snprintf(mainbuf, size, "%s%s", urimain, proj);
+ urimain = mainbuf;
}
- WT_RET(__curjoin_entry_iter_init(session, cjoin, je, &cjoin->iter));
+ WT_ERR(__wt_open_cursor(session, urimain, (WT_CURSOR *)cjoin, config,
+ &cjoin->main));
+ jeend = &cjoin->entries[cjoin->entries_next];
for (je = cjoin->entries; je < jeend; je++) {
+ if (je->subjoin != NULL) {
+ WT_ERR(__curjoin_init_next(session, je->subjoin,
+ iterable));
+ continue;
+ }
__wt_stat_join_init_single(&je->stats);
+ /*
+ * For a single compare=le/lt endpoint in any entry that may
+ * be iterated, construct a companion compare=ge endpoint
+ * that will actually be iterated.
+ */
+ if (iterable && je->ends_next == 1 &&
+ F_ISSET(&je->ends[0], WT_CURJOIN_END_LT)) {
+ origcur = je->ends[0].cursor;
+ WT_ERR(__curjoin_insert_endpoint(session, je, 0, &end));
+ WT_ERR(__wt_open_cursor(session, origcur->uri,
+ (WT_CURSOR *)cjoin,
+ F_ISSET(origcur, WT_CURSTD_RAW) ? raw_cfg : def_cfg,
+ &end->cursor));
+ end->flags = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ |
+ WT_CURJOIN_END_OWN_CURSOR;
+ WT_ERR(end->cursor->next(end->cursor));
+ F_CLR(je, WT_CURJOIN_ENTRY_DISJUNCTION);
+ }
for (end = &je->ends[0]; end < &je->ends[je->ends_next];
end++)
- WT_RET(__curjoin_endpoint_init_key(session, je, end));
+ WT_ERR(__curjoin_endpoint_init_key(session, je, end));
/*
- * The first entry is iterated as the 'outermost' cursor.
- * For the common GE case, we don't have to test against
- * the left reference key, we know it will be true since
- * the btree is ordered.
+ * Do any needed Bloom filter initialization. Ignore Bloom
+ * filters for entries that will be iterated. They won't
+ * help since these entries either don't need an inclusion
+ * check or are doing any needed check during the iteration.
*/
- if (je == cjoin->entries && je->ends[0].flags ==
- (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ))
- F_SET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT);
-
- if (F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) {
+ if (!iterable && F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) {
if (session->txn.isolation == WT_ISO_READ_UNCOMMITTED)
- WT_RET_MSG(session, EINVAL,
+ WT_ERR_MSG(session, EINVAL,
"join cursors with Bloom filters cannot be "
"used with read-uncommitted isolation");
if (je->bloom == NULL) {
@@ -516,10 +998,10 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin)
}
je->bloom_bit_count = f;
je->bloom_hash_count = k;
- WT_RET(__wt_bloom_create(session, NULL,
+ WT_ERR(__wt_bloom_create(session, NULL,
NULL, je->count, f, k, &je->bloom));
F_SET(je, WT_CURJOIN_ENTRY_OWN_BLOOM);
- WT_RET(__curjoin_init_bloom(session, cjoin,
+ WT_ERR(__curjoin_init_bloom(session, cjoin,
je, je->bloom));
/*
* Share the Bloom filter, making all
@@ -541,201 +1023,45 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin)
* merge into the shared one. The Bloom
* parameters of the two filters must match.
*/
- WT_RET(__wt_bloom_create(session, NULL,
+ WT_ERR(__wt_bloom_create(session, NULL,
NULL, je->count, je->bloom_bit_count,
je->bloom_hash_count, &bloom));
- WT_RET(__curjoin_init_bloom(session, cjoin,
+ WT_ERR(__curjoin_init_bloom(session, cjoin,
je, bloom));
- WT_RET(__wt_bloom_intersection(je->bloom,
+ WT_ERR(__wt_bloom_intersection(je->bloom,
bloom));
- WT_RET(__wt_bloom_close(bloom));
+ WT_ERR(__wt_bloom_close(bloom));
}
}
+ if (!F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION))
+ iterable = false;
}
-
F_SET(cjoin, WT_CURJOIN_INITIALIZED);
- return (ret);
-}
-
-/*
- * __curjoin_entry_in_range --
- * Check if a key is in the range specified by the entry, returning
- * WT_NOTFOUND if not.
- */
-static int
-__curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry,
- WT_ITEM *curkey, bool skip_left)
-{
- WT_COLLATOR *collator;
- WT_CURSOR_JOIN_ENDPOINT *end, *endmax;
- int cmp;
-
- collator = (entry->index != NULL) ? entry->index->collator : NULL;
- endmax = &entry->ends[entry->ends_next];
- for (end = &entry->ends[skip_left ? 1 : 0]; end < endmax; end++) {
- WT_RET(__wt_compare(session, collator, curkey, &end->key,
- &cmp));
- if (!F_ISSET(end, WT_CURJOIN_END_LT)) {
- if (cmp < 0 ||
- (cmp == 0 &&
- !F_ISSET(end, WT_CURJOIN_END_EQ)) ||
- (cmp > 0 && !F_ISSET(end, WT_CURJOIN_END_GT)))
- WT_RET(WT_NOTFOUND);
- } else {
- if (cmp > 0 ||
- (cmp == 0 &&
- !F_ISSET(end, WT_CURJOIN_END_EQ)) ||
- (cmp < 0 && !F_ISSET(end, WT_CURJOIN_END_LT)))
- WT_RET(WT_NOTFOUND);
- }
- }
- return (0);
-}
-
-typedef struct {
- WT_CURSOR iface;
- WT_CURSOR_JOIN_ENTRY *entry;
- bool ismember;
-} WT_CURJOIN_EXTRACTOR;
-
-/*
- * __curjoin_extract_insert --
- * Handle a key produced by a custom extractor.
- */
-static int
-__curjoin_extract_insert(WT_CURSOR *cursor) {
- WT_CURJOIN_EXTRACTOR *cextract;
- WT_DECL_RET;
- WT_ITEM ikey;
- WT_SESSION_IMPL *session;
-
- cextract = (WT_CURJOIN_EXTRACTOR *)cursor;
- /*
- * This insert method may be called multiple times during a single
- * extraction. If we already have a definitive answer to the
- * membership question, exit early.
- */
- if (cextract->ismember)
- return (0);
-
- session = (WT_SESSION_IMPL *)cursor->session;
-
- WT_ITEM_SET(ikey, cursor->key);
- /*
- * We appended a padding byte to the key to avoid rewriting the last
- * column. Strip that away here.
- */
- WT_ASSERT(session, ikey.size > 0);
- --ikey.size;
-
- ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false);
- if (ret == WT_NOTFOUND)
- ret = 0;
- else if (ret == 0)
- cextract->ismember = true;
+err: __wt_free(session, mainbuf);
return (ret);
}
/*
- * __curjoin_entry_member --
- * Do a membership check for a particular index that was joined,
- * if not a member, returns WT_NOTFOUND.
+ * __curjoin_insert_endpoint --
+ * Insert a new entry into the endpoint array for the join entry.
*/
static int
-__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
- WT_CURSOR_JOIN_ENTRY *entry, bool skip_left)
+__curjoin_insert_endpoint(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry,
+ u_int pos, WT_CURSOR_JOIN_ENDPOINT **newendp)
{
- WT_CURJOIN_EXTRACTOR extract_cursor;
- WT_CURSOR *c;
- WT_CURSOR_STATIC_INIT(iface,
- __wt_cursor_get_key, /* get-key */
- __wt_cursor_get_value, /* get-value */
- __wt_cursor_set_key, /* set-key */
- __wt_cursor_set_value, /* set-value */
- __wt_cursor_compare_notsup, /* compare */
- __wt_cursor_equals_notsup, /* equals */
- __wt_cursor_notsup, /* next */
- __wt_cursor_notsup, /* prev */
- __wt_cursor_notsup, /* reset */
- __wt_cursor_notsup, /* search */
- __wt_cursor_search_near_notsup, /* search-near */
- __curjoin_extract_insert, /* insert */
- __wt_cursor_notsup, /* update */
- __wt_cursor_notsup, /* remove */
- __wt_cursor_reconfigure_notsup, /* reconfigure */
- __wt_cursor_notsup); /* close */
- WT_DECL_RET;
- WT_INDEX *idx;
- WT_ITEM *key, v;
- bool bloom_found;
-
- if (skip_left && entry->ends_next == 1)
- return (0); /* no checks to make */
- key = cjoin->iter->curkey;
- entry->stats.accesses++;
- bloom_found = false;
-
- if (entry->bloom != NULL) {
- /*
- * If we don't own the Bloom filter, we must be sharing one
- * in a previous entry. So the shared filter has already
- * been checked and passed.
- */
- if (!F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM))
- return (0);
-
- /*
- * If the item is not in the Bloom filter, we return
- * immediately, otherwise, we still need to check the
- * long way.
- */
- WT_ERR(__wt_bloom_inmem_get(entry->bloom, key));
- bloom_found = true;
- }
- if (entry->index != NULL) {
- /*
- * If this entry is used by the iterator, then we already
- * have the index key, and we won't have to do any extraction
- * either.
- */
- if (entry == cjoin->iter->entry)
- WT_ITEM_SET(v, cjoin->iter->idxkey);
- else {
- memset(&v, 0, sizeof(v)); /* Keep lint quiet. */
- c = entry->main;
- c->set_key(c, key);
- if ((ret = c->search(c)) == 0)
- ret = c->get_value(c, &v);
- else if (ret == WT_NOTFOUND)
- WT_ERR_MSG(session, WT_ERROR,
- "main table for join is missing entry");
- WT_TRET(c->reset(c));
- WT_ERR(ret);
- }
- } else
- WT_ITEM_SET(v, *key);
+ WT_CURSOR_JOIN_ENDPOINT *newend;
- if ((idx = entry->index) != NULL && idx->extractor != NULL &&
- entry != cjoin->iter->entry) {
- WT_CLEAR(extract_cursor);
- extract_cursor.iface = iface;
- extract_cursor.iface.session = &session->iface;
- extract_cursor.iface.key_format = idx->exkey_format;
- extract_cursor.ismember = false;
- extract_cursor.entry = entry;
- WT_ERR(idx->extractor->extract(idx->extractor,
- &session->iface, key, &v, &extract_cursor.iface));
- if (!extract_cursor.ismember)
- WT_ERR(WT_NOTFOUND);
- } else
- WT_ERR(__curjoin_entry_in_range(session, entry, &v, skip_left));
+ WT_RET(__wt_realloc_def(session, &entry->ends_allocated,
+ entry->ends_next + 1, &entry->ends));
+ newend = &entry->ends[pos];
+ memmove(newend + 1, newend,
+ (entry->ends_next - pos) * sizeof(WT_CURSOR_JOIN_ENDPOINT));
+ memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT));
+ entry->ends_next++;
+ *newendp = newend;
- if (0) {
-err: if (ret == WT_NOTFOUND && bloom_found)
- entry->stats.bloom_false_positive++;
- }
- return (ret);
+ return (0);
}
/*
@@ -750,61 +1076,52 @@ __curjoin_next(WT_CURSOR *cursor)
WT_CURSOR_JOIN_ITER *iter;
WT_DECL_RET;
WT_SESSION_IMPL *session;
- bool skip_left;
- u_int i;
+ int tret;
cjoin = (WT_CURSOR_JOIN *)cursor;
- CURSOR_API_CALL(cursor, session, next, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL);
if (F_ISSET(cjoin, WT_CURJOIN_ERROR))
WT_ERR_MSG(session, WT_ERROR,
"join cursor encountered previous error");
if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED))
- WT_ERR(__curjoin_init_iter(session, cjoin));
-
- F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
+ WT_ERR(__curjoin_init_next(session, cjoin, true));
+ if (cjoin->iter == NULL)
+ WT_ERR(__curjoin_iter_init(session, cjoin, &cjoin->iter));
iter = cjoin->iter;
+ F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
-nextkey:
- if ((ret = __curjoin_entry_iter_next(iter, cursor)) == 0) {
- F_SET(cursor, WT_CURSTD_KEY_EXT);
+ while ((ret = __curjoin_iter_next(iter, cursor)) == 0) {
+ if ((ret = __curjoin_entries_in_range(session, cjoin,
+ iter->curkey, iter)) != WT_NOTFOUND)
+ break;
+ }
+ iter->positioned = (ret == 0);
+ if (ret != 0 && ret != WT_NOTFOUND)
+ WT_ERR(ret);
+ if (ret == 0) {
/*
- * We may have already established membership for the
- * 'left' case for the first entry, since we're
- * using that in our iteration.
+ * Position the 'main' cursor, this will be used to retrieve
+ * values from the cursor join. The key we have is raw, but
+ * the main cursor may not be raw.
*/
- skip_left = F_ISSET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT);
- for (i = 0; i < cjoin->entries_next; i++) {
- ret = __curjoin_entry_member(session, cjoin,
- &cjoin->entries[i], skip_left);
- if (ret == WT_NOTFOUND) {
- /*
- * If this is compare=eq on our outer iterator,
- * and we've moved past it, we're done.
- */
- if (iter->isequal && i == 0)
- break;
- goto nextkey;
- }
- skip_left = false;
- WT_ERR(ret);
- }
- } else if (ret != WT_NOTFOUND)
- WT_ERR(ret);
+ c = cjoin->main;
+ __wt_cursor_set_raw_key(c, iter->curkey);
- if (ret == 0) {
/*
- * Position the 'main' cursor, this will be used to
- * retrieve values from the cursor join.
+ * A failed search is not expected, convert WT_NOTFOUND into a
+ * generic error.
*/
- c = iter->main;
- c->set_key(c, iter->curkey);
- if ((ret = c->search(c)) != 0)
- WT_ERR(c->search(c));
+ if ((ret = c->search(c)) == WT_NOTFOUND)
+ ret = WT_ERROR;
+ WT_ERR(ret);
+
F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
- }
+ } else if (ret == WT_NOTFOUND &&
+ (tret = __curjoin_iter_close_all(iter)) != 0)
+ WT_ERR(tret);
if (0) {
err: F_SET(cjoin, WT_CURJOIN_ERROR);
@@ -813,78 +1130,148 @@ err: F_SET(cjoin, WT_CURJOIN_ERROR);
}
/*
- * __curjoin_reset --
- * WT_CURSOR::reset for join cursors.
+ * __curjoin_open_main --
+ * For the given index, open the main file with a projection
+ * that is the index keys.
*/
static int
-__curjoin_reset(WT_CURSOR *cursor)
+__curjoin_open_main(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_CURSOR_JOIN_ENTRY *entry)
{
- WT_CURSOR_JOIN *cjoin;
WT_DECL_RET;
- WT_SESSION_IMPL *session;
+ WT_INDEX *idx;
+ char *main_uri, *newformat;
+ const char *raw_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), "raw", NULL };
+ size_t len, newsize;
- cjoin = (WT_CURSOR_JOIN *)cursor;
+ main_uri = NULL;
+ idx = entry->index;
+
+ newsize = strlen(cjoin->table->name) + idx->colconf.len + 1;
+ WT_ERR(__wt_calloc(session, 1, newsize, &main_uri));
+ snprintf(main_uri, newsize, "%s%.*s",
+ cjoin->table->name, (int)idx->colconf.len,
+ idx->colconf.str);
+ WT_ERR(__wt_open_cursor(session, main_uri,
+ (WT_CURSOR *)cjoin, raw_cfg, &entry->main));
+ if (idx->extractor == NULL) {
+ /*
+ * Add no-op padding so trailing 'u' formats are not
+ * transformed to 'U'. This matches what happens in
+ * the index. We don't do this when we have an
+ * extractor, extractors already use the padding
+ * byte trick.
+ */
+ len = strlen(entry->main->value_format) + 3;
+ WT_ERR(__wt_calloc(session, len, 1, &newformat));
+ snprintf(newformat, len, "%s0x",
+ entry->main->value_format);
+ __wt_free(session, entry->main->value_format);
+ entry->main->value_format = newformat;
+ }
- CURSOR_API_CALL(cursor, session, reset, NULL);
+err: __wt_free(session, main_uri);
+ return (ret);
+}
- if (F_ISSET(cjoin, WT_CURJOIN_INITIALIZED))
- WT_ERR(__curjoin_entry_iter_reset(cjoin->iter));
+/*
+ * __curjoin_pack_recno --
+ * Pack the given recno into a buffer; prepare an item referencing it.
+ *
+ */
+static int
+__curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf,
+ size_t bufsize, WT_ITEM *item)
+{
+ WT_SESSION *wtsession;
+ size_t sz;
-err: API_END_RET(session, ret);
+ wtsession = (WT_SESSION *)session;
+ WT_RET(wiredtiger_struct_size(wtsession, &sz, "r", r));
+ WT_ASSERT(session, sz < bufsize);
+ WT_RET(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r));
+ item->size = sz;
+ item->data = buf;
+ return (0);
}
/*
- * __curjoin_close --
- * WT_CURSOR::close for join cursors.
+ * __curjoin_reset --
+ * WT_CURSOR::reset for join cursors.
*/
static int
-__curjoin_close(WT_CURSOR *cursor)
+__curjoin_reset(WT_CURSOR *cursor)
{
WT_CURSOR_JOIN *cjoin;
- WT_CURSOR_JOIN_ENDPOINT *end;
- WT_CURSOR_JOIN_ENTRY *entry;
WT_DECL_RET;
WT_SESSION_IMPL *session;
- u_int i;
cjoin = (WT_CURSOR_JOIN *)cursor;
- CURSOR_API_CALL(cursor, session, close, NULL);
-
- __wt_schema_release_table(session, cjoin->table);
- /* These are owned by the table */
- cursor->internal_uri = NULL;
- cursor->key_format = NULL;
- if (cjoin->projection != NULL) {
- __wt_free(session, cjoin->projection);
- __wt_free(session, cursor->value_format);
- }
-
- for (entry = cjoin->entries, i = 0; i < cjoin->entries_next;
- entry++, i++) {
- if (entry->main != NULL)
- WT_TRET(entry->main->close(entry->main));
- if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM))
- WT_TRET(__wt_bloom_close(entry->bloom));
- for (end = &entry->ends[0];
- end < &entry->ends[entry->ends_next]; end++) {
- F_CLR(end->cursor, WT_CURSTD_JOINED);
- if (F_ISSET(end, WT_CURJOIN_END_OWN_CURSOR))
- WT_TRET(end->cursor->close(end->cursor));
- }
- __wt_free(session, entry->ends);
- __wt_free(session, entry->repack_format);
- }
+ JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL);
if (cjoin->iter != NULL)
- WT_TRET(__curjoin_entry_iter_close(cjoin->iter));
- __wt_free(session, cjoin->entries);
- WT_TRET(__wt_cursor_close(cursor));
+ WT_ERR(__curjoin_iter_reset(cjoin->iter));
err: API_END_RET(session, ret);
}
/*
+ * __curjoin_split_key --
+ * Copy the primary key from a cursor (either main table or index)
+ * to another cursor. When copying from an index file, the index
+ * key is also returned.
+ *
+ */
+static int
+__curjoin_split_key(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_ITEM *idxkey, WT_CURSOR *tocur, WT_CURSOR *fromcur,
+ const char *repack_fmt, bool isindex)
+{
+ WT_CURSOR *firstcg_cur;
+ WT_CURSOR_INDEX *cindex;
+ WT_ITEM *keyp;
+ const uint8_t *p;
+
+ if (isindex) {
+ cindex = ((WT_CURSOR_INDEX *)fromcur);
+ /*
+ * Repack tells us where the index key ends; advance past
+ * that to get where the raw primary key starts.
+ */
+ WT_RET(__wt_struct_repack(session, cindex->child->key_format,
+ repack_fmt != NULL ? repack_fmt : cindex->iface.key_format,
+ &cindex->child->key, idxkey));
+ WT_ASSERT(session, cindex->child->key.size > idxkey->size);
+ tocur->key.data = (uint8_t *)idxkey->data + idxkey->size;
+ tocur->key.size = cindex->child->key.size - idxkey->size;
+ if (WT_CURSOR_RECNO(tocur)) {
+ p = (const uint8_t *)tocur->key.data;
+ WT_RET(__wt_vunpack_uint(&p, tocur->key.size,
+ &tocur->recno));
+ } else
+ tocur->recno = 0;
+ } else {
+ firstcg_cur = ((WT_CURSOR_TABLE *)fromcur)->cg_cursors[0];
+ keyp = &firstcg_cur->key;
+ if (WT_CURSOR_RECNO(tocur)) {
+ WT_ASSERT(session, keyp->size == sizeof(uint64_t));
+ tocur->recno = *(uint64_t *)keyp->data;
+ WT_RET(__curjoin_pack_recno(session, tocur->recno,
+ cjoin->recno_buf, sizeof(cjoin->recno_buf),
+ &tocur->key));
+ } else {
+ WT_ITEM_SET(tocur->key, *keyp);
+ tocur->recno = 0;
+ }
+ idxkey->data = NULL;
+ idxkey->size = 0;
+ }
+ return (0);
+}
+
+/*
* __wt_curjoin_open --
* Initialize a join cursor.
*
@@ -979,33 +1366,51 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
WT_CURSOR_INDEX *cindex;
WT_CURSOR_JOIN_ENDPOINT *end;
WT_CURSOR_JOIN_ENTRY *entry;
- WT_DECL_RET;
- bool hasins, needbloom, range_eq;
- char *main_uri, *newformat;
- const char *raw_cfg[] = { WT_CONFIG_BASE(
- session, WT_SESSION_open_cursor), "raw", NULL };
- size_t len, newsize;
+ WT_CURSOR_JOIN *child;
+ bool hasins, needbloom, nested, range_eq;
+ size_t len;
u_int i, ins, nonbloom;
+ uint8_t endrange;
entry = NULL;
hasins = needbloom = false;
- ins = 0; /* -Wuninitialized */
- main_uri = NULL;
- nonbloom = 0; /* -Wuninitialized */
+ ins = nonbloom = 0; /* -Wuninitialized */
- for (i = 0; i < cjoin->entries_next; i++) {
- if (cjoin->entries[i].index == idx) {
- entry = &cjoin->entries[i];
- break;
- }
- if (!needbloom && i > 0 &&
- !F_ISSET(&cjoin->entries[i], WT_CURJOIN_ENTRY_BLOOM)) {
- needbloom = true;
- nonbloom = i;
+ if (cjoin->entries_next == 0) {
+ if (LF_ISSET(WT_CURJOIN_ENTRY_DISJUNCTION))
+ F_SET(cjoin, WT_CURJOIN_DISJUNCTION);
+ } else if (LF_ISSET(WT_CURJOIN_ENTRY_DISJUNCTION) &&
+ !F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION))
+ WT_RET_MSG(session, EINVAL,
+ "operation=or does not match previous operation=and");
+ else if (!LF_ISSET(WT_CURJOIN_ENTRY_DISJUNCTION) &&
+ F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION))
+ WT_RET_MSG(session, EINVAL,
+ "operation=and does not match previous operation=or");
+
+ nested = WT_PREFIX_MATCH(ref_cursor->uri, "join:");
+ if (!nested)
+ for (i = 0; i < cjoin->entries_next; i++) {
+ if (cjoin->entries[i].index == idx &&
+ cjoin->entries[i].subjoin == NULL) {
+ entry = &cjoin->entries[i];
+ break;
+ }
+ if (!needbloom && i > 0 &&
+ !F_ISSET(&cjoin->entries[i],
+ WT_CURJOIN_ENTRY_BLOOM)) {
+ needbloom = true;
+ nonbloom = i;
+ }
}
+ else {
+ if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM))
+ WT_RET_MSG(session, EINVAL,
+ "Bloom filters cannot be used with subjoins");
}
+
if (entry == NULL) {
- WT_ERR(__wt_realloc_def(session, &cjoin->entries_allocated,
+ WT_RET(__wt_realloc_def(session, &cjoin->entries_allocated,
cjoin->entries_next + 1, &cjoin->entries));
if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM) && needbloom) {
/*
@@ -1034,13 +1439,13 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
} else {
/* Merge the join into an existing entry for this index */
if (count != 0 && entry->count != 0 && entry->count != count)
- WT_ERR_MSG(session, EINVAL,
+ WT_RET_MSG(session, EINVAL,
"count=%" PRIu64 " does not match "
"previous count=%" PRIu64 " for this index",
count, entry->count);
if (LF_MASK(WT_CURJOIN_ENTRY_BLOOM) !=
F_MASK(entry, WT_CURJOIN_ENTRY_BLOOM))
- WT_ERR_MSG(session, EINVAL,
+ WT_RET_MSG(session, EINVAL,
"join has incompatible strategy "
"values for the same index");
@@ -1063,19 +1468,20 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
for (i = 0; i < entry->ends_next; i++) {
end = &entry->ends[i];
range_eq = (range == WT_CURJOIN_END_EQ);
+ endrange = WT_CURJOIN_END_RANGE(end);
if ((F_ISSET(end, WT_CURJOIN_END_GT) &&
((range & WT_CURJOIN_END_GT) != 0 || range_eq)) ||
(F_ISSET(end, WT_CURJOIN_END_LT) &&
((range & WT_CURJOIN_END_LT) != 0 || range_eq)) ||
- (WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ &&
+ (endrange == WT_CURJOIN_END_EQ &&
(range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT))
!= 0))
- WT_ERR_MSG(session, EINVAL,
+ WT_RET_MSG(session, EINVAL,
"join has overlapping ranges");
if (range == WT_CURJOIN_END_EQ &&
- WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ &&
+ endrange == WT_CURJOIN_END_EQ &&
!F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION))
- WT_ERR_MSG(session, EINVAL,
+ WT_RET_MSG(session, EINVAL,
"compare=eq can only be combined "
"using operation=or");
@@ -1086,6 +1492,7 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
if (!hasins &&
((range & WT_CURJOIN_END_GT) != 0 ||
(range == WT_CURJOIN_END_EQ &&
+ endrange != WT_CURJOIN_END_EQ &&
!F_ISSET(end, WT_CURJOIN_END_GT)))) {
ins = i;
hasins = true;
@@ -1098,70 +1505,35 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
entry->bloom_hash_count =
WT_MAX(entry->bloom_hash_count, bloom_hash_count);
}
- WT_ERR(__curjoin_insert_endpoint(session, entry,
- hasins ? ins : entry->ends_next, &end));
- end->cursor = ref_cursor;
- F_SET(end, range);
-
- /* Open the main file with a projection of the indexed columns. */
- if (entry->main == NULL && idx != NULL) {
- newsize = strlen(cjoin->table->name) + idx->colconf.len + 1;
- WT_ERR(__wt_calloc(session, 1, newsize, &main_uri));
- snprintf(main_uri, newsize, "%s%.*s",
- cjoin->table->name, (int)idx->colconf.len,
- idx->colconf.str);
- WT_ERR(__wt_open_cursor(session, main_uri,
- (WT_CURSOR *)cjoin, raw_cfg, &entry->main));
- if (idx->extractor == NULL) {
+ if (nested) {
+ child = (WT_CURSOR_JOIN *)ref_cursor;
+ entry->subjoin = child;
+ child->parent = cjoin;
+ } else {
+ WT_RET(__curjoin_insert_endpoint(session, entry,
+ hasins ? ins : entry->ends_next, &end));
+ end->cursor = ref_cursor;
+ F_SET(end, range);
+
+ if (entry->main == NULL && idx != NULL) {
/*
- * Add no-op padding so trailing 'u' formats are not
- * transformed to 'U'. This matches what happens in
- * the index. We don't do this when we have an
- * extractor, extractors already use the padding
- * byte trick.
+ * Open the main file with a projection of the
+ * indexed columns.
*/
- len = strlen(entry->main->value_format) + 3;
- WT_ERR(__wt_calloc(session, len, 1, &newformat));
- snprintf(newformat, len, "%s0x",
- entry->main->value_format);
- __wt_free(session, entry->main->value_format);
- entry->main->value_format = newformat;
- }
+ WT_RET(__curjoin_open_main(session, cjoin, entry));
- /*
- * When we are repacking index keys to remove the primary
- * key, we never want to transform trailing 'u'. Use no-op
- * padding to force this.
- */
- cindex = (WT_CURSOR_INDEX *)ref_cursor;
- len = strlen(cindex->iface.key_format) + 3;
- WT_ERR(__wt_calloc(session, len, 1, &entry->repack_format));
- snprintf(entry->repack_format, len, "%s0x",
- cindex->iface.key_format);
+ /*
+ * When we are repacking index keys to remove the
+ * primary key, we never want to transform trailing
+ * 'u'. Use no-op padding to force this.
+ */
+ cindex = (WT_CURSOR_INDEX *)ref_cursor;
+ len = strlen(cindex->iface.key_format) + 3;
+ WT_RET(__wt_calloc(session, len, 1,
+ &entry->repack_format));
+ snprintf(entry->repack_format, len, "%s0x",
+ cindex->iface.key_format);
+ }
}
-
-err: __wt_free(session, main_uri);
- return (ret);
-}
-
-/*
- * __curjoin_insert_endpoint --
- * Insert a new entry into the endpoint array for the join entry.
- */
-static int
-__curjoin_insert_endpoint(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry,
- u_int pos, WT_CURSOR_JOIN_ENDPOINT **newendp)
-{
- WT_CURSOR_JOIN_ENDPOINT *newend;
-
- WT_RET(__wt_realloc_def(session, &entry->ends_allocated,
- entry->ends_next + 1, &entry->ends));
- newend = &entry->ends[pos];
- memmove(newend + 1, newend,
- (entry->ends_next - pos) * sizeof(WT_CURSOR_JOIN_ENDPOINT));
- memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT));
- entry->ends_next++;
- *newendp = newend;
-
return (0);
}