diff options
author | Michael Cahill <michael.cahill@mongodb.com> | 2016-05-05 15:38:12 +1000 |
---|---|---|
committer | Michael Cahill <michael.cahill@mongodb.com> | 2016-05-05 15:38:12 +1000 |
commit | 636a7b25ef3eca6b98009330f4d35337d4f35717 (patch) | |
tree | 7cc2e03ad96e206cbe73343feef10197023a37da /src/cursor/cur_join.c | |
parent | eaa7b5f0fcc62f356c33a2c56f45b609a73ca5dd (diff) | |
parent | 75c22bc0c662622c14e5c47d99ff262cede2c6bf (diff) | |
download | mongodb-3.3.6.tar.gz |
Merge branch 'develop' into mongodb-3.4mongodb-3.3.6
Diffstat (limited to 'src/cursor/cur_join.c')
-rw-r--r-- | src/cursor/cur_join.c | 1498 |
1 files changed, 935 insertions, 563 deletions
diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index 38a83217933..fd7de53c981 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -8,159 +8,299 @@ #include "wt_internal.h" +static int __curjoin_entries_in_range(WT_SESSION_IMPL *, WT_CURSOR_JOIN *, + WT_ITEM *, WT_CURSOR_JOIN_ITER *); +static int __curjoin_entry_in_range(WT_SESSION_IMPL *, WT_CURSOR_JOIN_ENTRY *, + WT_ITEM *, WT_CURSOR_JOIN_ITER *); +static int __curjoin_entry_member(WT_SESSION_IMPL *, WT_CURSOR_JOIN_ENTRY *, + WT_ITEM *, WT_CURSOR_JOIN_ITER *); static int __curjoin_insert_endpoint(WT_SESSION_IMPL *, WT_CURSOR_JOIN_ENTRY *, u_int, WT_CURSOR_JOIN_ENDPOINT **); +static int __curjoin_iter_close(WT_CURSOR_JOIN_ITER *); +static int __curjoin_iter_close_all(WT_CURSOR_JOIN_ITER *); +static bool __curjoin_iter_ready(WT_CURSOR_JOIN_ITER *); +static int __curjoin_iter_set_entry(WT_CURSOR_JOIN_ITER *, u_int); +static int __curjoin_pack_recno(WT_SESSION_IMPL *, uint64_t, uint8_t *, + size_t, WT_ITEM *); +static int __curjoin_split_key(WT_SESSION_IMPL *, WT_CURSOR_JOIN *, WT_ITEM *, + WT_CURSOR *, WT_CURSOR *, const char *, bool); + +#define WT_CURJOIN_ITER_CONSUMED(iter) \ + ((iter)->entry_pos >= (iter)->entry_count) /* - * __curjoin_entry_iter_init -- + * __wt_curjoin_joined -- + * Produce an error that this cursor is being used in a join call. + */ +int +__wt_curjoin_joined(WT_CURSOR *cursor) +{ + WT_SESSION_IMPL *session; + + session = (WT_SESSION_IMPL *)cursor->session; + __wt_errx(session, "cursor is being used in a join"); + return (ENOTSUP); +} + +/* + * __curjoin_iter_init -- * Initialize an iteration for the index managed by a join entry. * */ static int -__curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, - WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ITER **iterp) +__curjoin_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_CURSOR_JOIN_ITER **iterp) { - WT_CURSOR *to_dup; - WT_DECL_RET; - const char *raw_cfg[] = { WT_CONFIG_BASE( - session, WT_SESSION_open_cursor), "raw", NULL }; - const char *def_cfg[] = { WT_CONFIG_BASE( - session, WT_SESSION_open_cursor), NULL }; - const char *urimain, **config; - char *mainbuf, *uri; WT_CURSOR_JOIN_ITER *iter; - size_t size; - - iter = NULL; - mainbuf = uri = NULL; - to_dup = entry->ends[0].cursor; - - if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW)) - config = &raw_cfg[0]; - else - config = &def_cfg[0]; - - size = strlen(to_dup->internal_uri) + 3; - WT_ERR(__wt_calloc(session, size, 1, &uri)); - snprintf(uri, size, "%s()", to_dup->internal_uri); - urimain = cjoin->table->name; - if (cjoin->projection != NULL) { - size = strlen(urimain) + strlen(cjoin->projection) + 1; - WT_ERR(__wt_calloc(session, size, 1, &mainbuf)); - snprintf(mainbuf, size, "%s%s", urimain, cjoin->projection); - urimain = mainbuf; - } - WT_ERR(__wt_calloc_one(session, &iter)); - WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, config, - &iter->cursor)); - WT_ERR(__wt_cursor_dup_position(to_dup, iter->cursor)); - WT_ERR(__wt_open_cursor(session, urimain, (WT_CURSOR *)cjoin, config, - &iter->main)); + *iterp = NULL; + WT_RET(__wt_calloc_one(session, iterp)); + iter = *iterp; iter->cjoin = cjoin; iter->session = session; - iter->entry = entry; - iter->positioned = false; - iter->isequal = (entry->ends_next == 1 && - WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_EQ); - *iterp = iter; + cjoin->iter = iter; + WT_RET(__curjoin_iter_set_entry(iter, 0)); + return (0); +} - if (0) { -err: __wt_free(session, iter); - } - __wt_free(session, mainbuf); - __wt_free(session, uri); +/* + * __curjoin_iter_close -- + * Close the iteration, release resources. + * + */ +static int +__curjoin_iter_close(WT_CURSOR_JOIN_ITER *iter) +{ + WT_DECL_RET; + + if (iter->cursor != NULL) + WT_TRET(iter->cursor->close(iter->cursor)); + __wt_free(iter->session, iter); return (ret); } /* - * __curjoin_pack_recno -- - * Pack the given recno into a buffer; prepare an item referencing it. + * __curjoin_iter_close_all -- + * Free the iterator and all of its children recursively. * */ static int -__curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf, - size_t bufsize, WT_ITEM *item) +__curjoin_iter_close_all(WT_CURSOR_JOIN_ITER *iter) { - WT_SESSION *wtsession; - size_t sz; + WT_CURSOR_JOIN *parent; + WT_DECL_RET; - wtsession = (WT_SESSION *)session; - WT_RET(wiredtiger_struct_size(wtsession, &sz, "r", r)); - WT_ASSERT(session, sz < bufsize); - WT_RET(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r)); - item->size = sz; - item->data = buf; + if (iter->child) + WT_TRET(__curjoin_iter_close_all(iter->child)); + iter->child = NULL; + WT_ASSERT(iter->session, iter->cjoin->parent == NULL || + iter->cjoin->parent->iter->child == iter); + if ((parent = iter->cjoin->parent) != NULL) + parent->iter->child = NULL; + iter->cjoin->iter = NULL; + WT_TRET(__curjoin_iter_close(iter)); + return (ret); +} + +/* + * __curjoin_iter_reset -- + * Reset an iteration to the starting point. + * + */ +static int +__curjoin_iter_reset(WT_CURSOR_JOIN_ITER *iter) +{ + if (iter->child != NULL) + WT_RET(__curjoin_iter_close_all(iter->child)); + WT_RET(__curjoin_iter_set_entry(iter, 0)); + iter->positioned = false; return (0); } /* - * __curjoin_split_key -- - * Copy the primary key from a cursor (either main table or index) - * to another cursor. When copying from an index file, the index - * key is also returned. + * __curjoin_iter_ready -- + * Check the positioned flag for all nested iterators. + * + */ +static bool +__curjoin_iter_ready(WT_CURSOR_JOIN_ITER *iter) +{ + while (iter != NULL) { + if (!iter->positioned) + return (false); + iter = iter->child; + } + return (true); +} + +/* + * __curjoin_iter_set_entry -- + * Set the current entry for an iterator. * */ static int -__curjoin_split_key(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, - WT_ITEM *idxkey, WT_CURSOR *tocur, WT_CURSOR *fromcur, - const char *repack_fmt, bool isindex) +__curjoin_iter_set_entry(WT_CURSOR_JOIN_ITER *iter, u_int entry_pos) { - WT_CURSOR *firstcg_cur; - WT_CURSOR_INDEX *cindex; - WT_ITEM *keyp; - const uint8_t *p; + WT_CURSOR *c, *to_dup; + WT_CURSOR_JOIN *cjoin, *topjoin; + WT_CURSOR_JOIN_ENTRY *entry; + WT_DECL_RET; + WT_SESSION_IMPL *session; + char *uri; + const char *raw_cfg[] = { WT_CONFIG_BASE( + iter->session, WT_SESSION_open_cursor), "raw", NULL }; + const char *def_cfg[] = { WT_CONFIG_BASE( + iter->session, WT_SESSION_open_cursor), NULL }; + const char **config; + size_t size; - if (isindex) { - cindex = ((WT_CURSOR_INDEX *)fromcur); - /* - * Repack tells us where the index key ends; advance past - * that to get where the raw primary key starts. - */ - WT_RET(__wt_struct_repack(session, cindex->child->key_format, - repack_fmt != NULL ? repack_fmt : cindex->iface.key_format, - &cindex->child->key, idxkey)); - WT_ASSERT(session, cindex->child->key.size > idxkey->size); - tocur->key.data = (uint8_t *)idxkey->data + idxkey->size; - tocur->key.size = cindex->child->key.size - idxkey->size; - if (WT_CURSOR_RECNO(tocur)) { - p = (const uint8_t *)tocur->key.data; - WT_RET(__wt_vunpack_uint(&p, tocur->key.size, - &tocur->recno)); - } else - tocur->recno = 0; - } else { - firstcg_cur = ((WT_CURSOR_TABLE *)fromcur)->cg_cursors[0]; - keyp = &firstcg_cur->key; - if (WT_CURSOR_RECNO(tocur)) { - WT_ASSERT(session, keyp->size == sizeof(uint64_t)); - tocur->recno = *(uint64_t *)keyp->data; - WT_RET(__curjoin_pack_recno(session, tocur->recno, - cjoin->recno_buf, sizeof(cjoin->recno_buf), - &tocur->key)); - } else { - WT_ITEM_SET(tocur->key, *keyp); - tocur->recno = 0; + session = iter->session; + cjoin = iter->cjoin; + uri = NULL; + entry = iter->entry = &cjoin->entries[entry_pos]; + iter->positioned = false; + iter->entry_pos = entry_pos; + iter->end_pos = 0; + + iter->is_equal = (entry->ends_next == 1 && + WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_EQ); + iter->end_skip = (entry->ends_next > 0 && + WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_GE) ? 1 : 0; + + iter->end_count = WT_MIN(1, entry->ends_next); + if (F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION)) { + iter->entry_count = cjoin->entries_next; + if (iter->is_equal) + iter->end_count = entry->ends_next; + } else + iter->entry_count = 1; + WT_ASSERT(iter->session, iter->entry_pos < iter->entry_count); + + entry->stats.actual_count = 0; + + if (entry->subjoin == NULL) { + for (topjoin = iter->cjoin; topjoin->parent != NULL; + topjoin = topjoin->parent) + ; + to_dup = entry->ends[0].cursor; + + if (F_ISSET((WT_CURSOR *)topjoin, WT_CURSTD_RAW)) + config = &raw_cfg[0]; + else + config = &def_cfg[0]; + + size = strlen(to_dup->internal_uri) + 3; + WT_ERR(__wt_calloc(session, size, 1, &uri)); + snprintf(uri, size, "%s()", to_dup->internal_uri); + if ((c = iter->cursor) == NULL || !WT_STREQ(c->uri, uri)) { + iter->cursor = NULL; + if (c != NULL) + WT_ERR(c->close(c)); + WT_ERR(__wt_open_cursor(session, uri, + (WT_CURSOR *)topjoin, config, &iter->cursor)); } - idxkey->data = NULL; - idxkey->size = 0; + WT_ERR(__wt_cursor_dup_position(to_dup, iter->cursor)); + } else if (iter->cursor != NULL) { + WT_ERR(iter->cursor->close(iter->cursor)); + iter->cursor = NULL; + } + +err: __wt_free(session, uri); + return (ret); +} + +/* + * __curjoin_iter_bump -- + * Called to advance the iterator to the next endpoint, which may in turn + * advance to the next entry. + */ +static int +__curjoin_iter_bump(WT_CURSOR_JOIN_ITER *iter) +{ + WT_CURSOR_JOIN_ENTRY *entry; + WT_SESSION_IMPL *session; + + session = iter->session; + iter->positioned = false; + entry = iter->entry; + if (entry->subjoin == NULL && iter->is_equal && + ++iter->end_pos < iter->end_count) { + WT_RET(__wt_cursor_dup_position( + entry->ends[iter->end_pos].cursor, iter->cursor)); + return (0); } + iter->end_pos = iter->end_count = iter->end_skip = 0; + if (entry->subjoin != NULL && entry->subjoin->iter != NULL) + WT_RET(__curjoin_iter_close_all(entry->subjoin->iter)); + + if (++iter->entry_pos >= iter->entry_count) { + iter->entry = NULL; + return (0); + } + iter->entry = ++entry; + if (entry->subjoin != NULL) { + WT_RET(__curjoin_iter_init(session, entry->subjoin, + &iter->child)); + return (0); + } + WT_RET(__curjoin_iter_set_entry(iter, iter->entry_pos)); return (0); } /* - * __curjoin_entry_iter_next -- + * __curjoin_iter_next -- * Get the next item in an iteration. * */ static int -__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor) +__curjoin_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor) { - if (iter->positioned) - WT_RET(iter->cursor->next(iter->cursor)); - else + WT_CURSOR_JOIN_ENTRY *entry; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + session = iter->session; + + if (WT_CURJOIN_ITER_CONSUMED(iter)) + return (WT_NOTFOUND); +again: + entry = iter->entry; + if (entry->subjoin != NULL) { + if (iter->child == NULL) + WT_RET(__curjoin_iter_init(session, + entry->subjoin, &iter->child)); + ret = __curjoin_iter_next(iter->child, cursor); + if (ret == 0) { + /* The child did the work, we're done. */ + iter->curkey = &cursor->key; + iter->positioned = true; + return (ret); + } + else if (ret == WT_NOTFOUND) { + WT_RET(__curjoin_iter_close_all(iter->child)); + entry->subjoin->iter = NULL; + iter->child = NULL; + WT_RET(__curjoin_iter_bump(iter)); + ret = 0; + } + } else if (iter->positioned) { + ret = iter->cursor->next(iter->cursor); + if (ret == WT_NOTFOUND) { + WT_RET(__curjoin_iter_bump(iter)); + ret = 0; + } else + WT_RET(ret); + } else iter->positioned = true; + if (WT_CURJOIN_ITER_CONSUMED(iter)) + return (WT_NOTFOUND); + + if (!__curjoin_iter_ready(iter)) + goto again; + + WT_RET(ret); + /* * Set our key to the primary key, we'll also need this * to check membership. @@ -175,51 +315,380 @@ __curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor) } /* - * __curjoin_entry_iter_reset -- - * Reset an iteration to the starting point. - * + * __curjoin_close -- + * WT_CURSOR::close for join cursors. */ static int -__curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter) +__curjoin_close(WT_CURSOR *cursor) { - if (iter->positioned) { - WT_RET(iter->cursor->reset(iter->cursor)); - WT_RET(iter->main->reset(iter->main)); - WT_RET(__wt_cursor_dup_position( - iter->cjoin->entries[0].ends[0].cursor, iter->cursor)); - iter->positioned = false; - iter->entry->stats.actual_count = 0; + WT_CURSOR_JOIN *cjoin; + WT_CURSOR_JOIN_ENDPOINT *end; + WT_CURSOR_JOIN_ENTRY *entry; + WT_DECL_RET; + WT_SESSION_IMPL *session; + u_int i; + + cjoin = (WT_CURSOR_JOIN *)cursor; + + JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL); + + __wt_schema_release_table(session, cjoin->table); + /* These are owned by the table */ + cursor->internal_uri = NULL; + cursor->key_format = NULL; + if (cjoin->projection != NULL) { + __wt_free(session, cjoin->projection); + __wt_free(session, cursor->value_format); + } + + for (entry = cjoin->entries, i = 0; i < cjoin->entries_next; + entry++, i++) { + if (entry->subjoin != NULL) { + F_CLR(&entry->subjoin->iface, WT_CURSTD_JOINED); + entry->subjoin->parent = NULL; + } + if (entry->main != NULL) + WT_TRET(entry->main->close(entry->main)); + if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) + WT_TRET(__wt_bloom_close(entry->bloom)); + for (end = &entry->ends[0]; + end < &entry->ends[entry->ends_next]; end++) { + F_CLR(end->cursor, WT_CURSTD_JOINED); + if (F_ISSET(end, WT_CURJOIN_END_OWN_CURSOR)) + WT_TRET(end->cursor->close(end->cursor)); + } + __wt_free(session, entry->ends); + __wt_free(session, entry->repack_format); + } + + if (cjoin->iter != NULL) + WT_TRET(__curjoin_iter_close_all(cjoin->iter)); + if (cjoin->main != NULL) + WT_TRET(cjoin->main->close(cjoin->main)); + + __wt_free(session, cjoin->entries); + WT_TRET(__wt_cursor_close(cursor)); + +err: API_END_RET(session, ret); +} + +/* + * __curjoin_endpoint_init_key -- + * Set the key in the reference endpoint. + */ +static int +__curjoin_endpoint_init_key(WT_SESSION_IMPL *session, + WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ENDPOINT *endpoint) +{ + WT_CURSOR *cursor; + WT_CURSOR_INDEX *cindex; + WT_ITEM *k; + uint64_t r; + + if ((cursor = endpoint->cursor) != NULL) { + if (entry->index != NULL) { + /* Extract and save the index's logical key. */ + cindex = (WT_CURSOR_INDEX *)endpoint->cursor; + WT_RET(__wt_struct_repack(session, + cindex->child->key_format, + (entry->repack_format != NULL ? + entry->repack_format : cindex->iface.key_format), + &cindex->child->key, &endpoint->key)); + } else { + k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key; + if (WT_CURSOR_RECNO(cursor)) { + r = *(uint64_t *)k->data; + WT_RET(__curjoin_pack_recno(session, r, + endpoint->recno_buf, + sizeof(endpoint->recno_buf), + &endpoint->key)); + } + else + endpoint->key = *k; + } } return (0); } /* - * __curjoin_entry_iter_ready -- - * The iterator is positioned. - * + * __curjoin_entries_in_range -- + * Check if a key is in the range specified by the remaining entries, + * returning WT_NOTFOUND if not. */ -static bool -__curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter) +static int +__curjoin_entries_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_ITEM *curkey, WT_CURSOR_JOIN_ITER *iterarg) { - return (iter->positioned); + WT_CURSOR_JOIN_ENTRY *entry; + WT_CURSOR_JOIN_ITER *iter; + WT_DECL_RET; + int fastret, slowret; + u_int pos; + + iter = iterarg; + if (F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION)) { + fastret = 0; + slowret = WT_NOTFOUND; + } else { + fastret = WT_NOTFOUND; + slowret = 0; + } + pos = iter == NULL ? 0 : iter->entry_pos; + for (entry = &cjoin->entries[pos]; pos < cjoin->entries_next; + entry++, pos++) { + ret = __curjoin_entry_member(session, entry, curkey, iter); + if (ret == fastret) + return (fastret); + if (ret != slowret) + break; + iter = NULL; + } + + return (ret == 0 ? slowret : ret); } /* - * __curjoin_entry_iter_close -- - * Close the iteration, release resources. - * + * __curjoin_entry_in_range -- + * Check if a key is in the range specified by the entry, returning + * WT_NOTFOUND if not. */ static int -__curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *iter) +__curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, + WT_ITEM *curkey, WT_CURSOR_JOIN_ITER *iter) { + WT_COLLATOR *collator; + WT_CURSOR_JOIN_ENDPOINT *end, *endmax; + bool disjunction, passed; + int cmp; + u_int pos; + + collator = (entry->index != NULL) ? entry->index->collator : NULL; + endmax = &entry->ends[entry->ends_next]; + disjunction = F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION); + passed = false; + + /* + * The iterator may have already satisfied some endpoint conditions. + * If so and we're a disjunction, we're done. If so and we're a + * conjunction, we can start past the satisfied conditions. + */ + if (iter == NULL) + pos = 0; + else { + if (disjunction && iter->end_skip) + return (0); + pos = iter->end_pos + iter->end_skip; + } + + for (end = &entry->ends[pos]; end < endmax; end++) { + WT_RET(__wt_compare(session, collator, curkey, &end->key, + &cmp)); + switch (WT_CURJOIN_END_RANGE(end)) { + case WT_CURJOIN_END_EQ: + passed = (cmp == 0); + break; + + case WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ: + passed = (cmp >= 0); + WT_ASSERT(session, iter == NULL); + break; + + case WT_CURJOIN_END_GT: + passed = (cmp > 0); + if (passed && iter != NULL && pos == 0) + iter->end_skip = 1; + break; + + case WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ: + passed = (cmp <= 0); + break; + + case WT_CURJOIN_END_LT: + passed = (cmp < 0); + break; + + default: + WT_RET(__wt_illegal_value(session, NULL)); + break; + } + + if (!passed) { + if (iter != NULL && + (iter->is_equal || + F_ISSET(end, WT_CURJOIN_END_LT))) { + WT_RET(__curjoin_iter_bump(iter)); + return (WT_NOTFOUND); + } + if (!disjunction) + return (WT_NOTFOUND); + iter = NULL; + } else if (disjunction) + break; + } + if (disjunction && end == endmax) + return (WT_NOTFOUND); + else + return (0); +} + +typedef struct { + WT_CURSOR iface; + WT_CURSOR_JOIN_ENTRY *entry; + bool ismember; +} WT_CURJOIN_EXTRACTOR; + +/* + * __curjoin_extract_insert -- + * Handle a key produced by a custom extractor. + */ +static int +__curjoin_extract_insert(WT_CURSOR *cursor) { + WT_CURJOIN_EXTRACTOR *cextract; WT_DECL_RET; + WT_ITEM ikey; + WT_SESSION_IMPL *session; - if (iter->cursor != NULL) - WT_TRET(iter->cursor->close(iter->cursor)); - if (iter->main != NULL) - WT_TRET(iter->main->close(iter->main)); - __wt_free(iter->session, iter); + cextract = (WT_CURJOIN_EXTRACTOR *)cursor; + /* + * This insert method may be called multiple times during a single + * extraction. If we already have a definitive answer to the + * membership question, exit early. + */ + if (cextract->ismember) + return (0); + + session = (WT_SESSION_IMPL *)cursor->session; + + WT_ITEM_SET(ikey, cursor->key); + /* + * We appended a padding byte to the key to avoid rewriting the last + * column. Strip that away here. + */ + WT_ASSERT(session, ikey.size > 0); + --ikey.size; + + ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false); + if (ret == WT_NOTFOUND) + ret = 0; + else if (ret == 0) + cextract->ismember = true; + + return (ret); +} + +/* + * __curjoin_entry_member -- + * Do a membership check for a particular index that was joined, + * if not a member, returns WT_NOTFOUND. + */ +static int +__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, + WT_ITEM *key, WT_CURSOR_JOIN_ITER *iter) +{ + WT_CURJOIN_EXTRACTOR extract_cursor; + WT_CURSOR *c; + WT_CURSOR_STATIC_INIT(iface, + __wt_cursor_get_key, /* get-key */ + __wt_cursor_get_value, /* get-value */ + __wt_cursor_set_key, /* set-key */ + __wt_cursor_set_value, /* set-value */ + __wt_cursor_compare_notsup, /* compare */ + __wt_cursor_equals_notsup, /* equals */ + __wt_cursor_notsup, /* next */ + __wt_cursor_notsup, /* prev */ + __wt_cursor_notsup, /* reset */ + __wt_cursor_notsup, /* search */ + __wt_cursor_search_near_notsup, /* search-near */ + __curjoin_extract_insert, /* insert */ + __wt_cursor_notsup, /* update */ + __wt_cursor_notsup, /* remove */ + __wt_cursor_reconfigure_notsup, /* reconfigure */ + __wt_cursor_notsup); /* close */ + WT_DECL_RET; + WT_INDEX *idx; + WT_ITEM v; + bool bloom_found; + + if (entry->subjoin == NULL && iter != NULL && + (iter->end_pos + iter->end_skip >= entry->ends_next || + (iter->end_skip > 0 && + F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)))) + return (0); /* no checks to make */ + + entry->stats.accesses++; + bloom_found = false; + + if (entry->bloom != NULL) { + /* + * If we don't own the Bloom filter, we must be sharing one + * in a previous entry. So the shared filter has already + * been checked and passed. + */ + if (!F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) + return (0); + + /* + * If the item is not in the Bloom filter, we return + * immediately, otherwise, we still need to check the + * long way. + */ + WT_ERR(__wt_bloom_inmem_get(entry->bloom, key)); + bloom_found = true; + } + if (entry->subjoin != NULL) { + WT_ASSERT(session, + iter == NULL || entry->subjoin == iter->child->cjoin); + ret = __curjoin_entries_in_range(session, entry->subjoin, + key, iter == NULL ? NULL : iter->child); + if (iter != NULL && + WT_CURJOIN_ITER_CONSUMED(iter->child)) { + WT_ERR(__curjoin_iter_bump(iter)); + ret = WT_NOTFOUND; + } + return (ret); + } + if (entry->index != NULL) { + /* + * If this entry is used by the iterator, then we already + * have the index key, and we won't have to do any + * extraction either. + */ + if (iter != NULL && entry == iter->entry) + WT_ITEM_SET(v, iter->idxkey); + else { + memset(&v, 0, sizeof(v)); /* Keep lint quiet. */ + c = entry->main; + c->set_key(c, key); + if ((ret = c->search(c)) == 0) + ret = c->get_value(c, &v); + else if (ret == WT_NOTFOUND) + WT_ERR_MSG(session, WT_ERROR, + "main table for join is missing entry"); + WT_TRET(c->reset(c)); + WT_ERR(ret); + } + } else + WT_ITEM_SET(v, *key); + + if ((idx = entry->index) != NULL && idx->extractor != NULL && + (iter == NULL || entry != iter->entry)) { + WT_CLEAR(extract_cursor); + extract_cursor.iface = iface; + extract_cursor.iface.session = &session->iface; + extract_cursor.iface.key_format = idx->exkey_format; + extract_cursor.ismember = false; + extract_cursor.entry = entry; + WT_ERR(idx->extractor->extract(idx->extractor, + &session->iface, key, &v, &extract_cursor.iface)); + if (!extract_cursor.ismember) + WT_ERR(WT_NOTFOUND); + } else + WT_ERR(__curjoin_entry_in_range(session, entry, &v, iter)); + if (0) { +err: if (ret == WT_NOTFOUND && bloom_found) + entry->stats.bloom_false_positive++; + } return (ret); } @@ -238,10 +707,10 @@ __curjoin_get_key(WT_CURSOR *cursor, ...) cjoin = (WT_CURSOR_JOIN *)cursor; va_start(ap, cursor); - CURSOR_API_CALL(cursor, session, get_key, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, get_key, NULL); if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) || - !__curjoin_entry_iter_ready(cjoin->iter)) + !cjoin->iter->positioned) WT_ERR_MSG(session, EINVAL, "join cursor must be advanced with next()"); WT_ERR(__wt_cursor_get_keyv(cursor, cursor->flags, ap)); @@ -258,23 +727,21 @@ static int __curjoin_get_value(WT_CURSOR *cursor, ...) { WT_CURSOR_JOIN *cjoin; - WT_CURSOR_JOIN_ITER *iter; WT_DECL_RET; WT_SESSION_IMPL *session; va_list ap; cjoin = (WT_CURSOR_JOIN *)cursor; - iter = cjoin->iter; va_start(ap, cursor); - CURSOR_API_CALL(cursor, session, get_value, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL); if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) || - !__curjoin_entry_iter_ready(iter)) + !cjoin->iter->positioned) WT_ERR_MSG(session, EINVAL, "join cursor must be advanced with next()"); - WT_ERR(__wt_curtable_get_valuev(iter->main, ap)); + WT_ERR(__wt_curtable_get_valuev(cjoin->main, ap)); err: va_end(ap); API_END_RET(session, ret); @@ -298,7 +765,8 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, session, WT_SESSION_open_cursor), "raw", NULL }; const char *uri; size_t size; - int cmp, skip; + u_int skip; + int cmp; c = NULL; skip = 0; @@ -354,7 +822,34 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, for (end = &entry->ends[skip]; end < endmax; end++) { WT_ERR(__wt_compare(session, collator, &curkey, &end->key, &cmp)); - if (!F_ISSET(end, WT_CURJOIN_END_LT)) { + if (F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)) { + /* if condition satisfied, insert immediately */ + switch (WT_CURJOIN_END_RANGE(end)) { + case WT_CURJOIN_END_EQ: + if (cmp == 0) + goto insert; + break; + case WT_CURJOIN_END_GT: + if (cmp > 0) { + /* skip this check next time */ + skip = entry->ends_next; + goto insert; + } + break; + case WT_CURJOIN_END_GE: + if (cmp >= 0) + goto insert; + break; + case WT_CURJOIN_END_LT: + if (cmp < 0) + goto insert; + break; + case WT_CURJOIN_END_LE: + if (cmp <= 0) + goto insert; + break; + } + } else if (!F_ISSET(end, WT_CURJOIN_END_LT)) { if (cmp < 0 || (cmp == 0 && !F_ISSET(end, WT_CURJOIN_END_EQ))) goto advance; @@ -370,6 +865,14 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, goto done; } } + /* + * Either it's a disjunction that hasn't satisfied any + * condition, or it's a conjunction that has satisfied all + * conditions. + */ + if (F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)) + goto advance; +insert: if (entry->index != NULL) { curvalue.data = (unsigned char *)curkey.data + curkey.size; @@ -394,107 +897,86 @@ err: if (c != NULL) } /* - * __curjoin_endpoint_init_key -- - * Set the key in the reference endpoint. - */ -static int -__curjoin_endpoint_init_key(WT_SESSION_IMPL *session, - WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ENDPOINT *endpoint) -{ - WT_CURSOR *cursor; - WT_CURSOR_INDEX *cindex; - WT_ITEM *k; - uint64_t r; - - if ((cursor = endpoint->cursor) != NULL) { - if (entry->index != NULL) { - /* Extract and save the index's logical key. */ - cindex = (WT_CURSOR_INDEX *)endpoint->cursor; - WT_RET(__wt_struct_repack(session, - cindex->child->key_format, - (entry->repack_format != NULL ? - entry->repack_format : cindex->iface.key_format), - &cindex->child->key, &endpoint->key)); - } else { - k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key; - if (WT_CURSOR_RECNO(cursor)) { - r = *(uint64_t *)k->data; - WT_RET(__curjoin_pack_recno(session, r, - endpoint->recno_buf, - sizeof(endpoint->recno_buf), - &endpoint->key)); - } - else - endpoint->key = *k; - } - } - return (0); -} - -/* - * __curjoin_init_iter -- - * Initialize before any iteration. + * __curjoin_init_next -- + * Initialize the cursor join when the next function is first called. */ static int -__curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) +__curjoin_init_next(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + bool iterable) { WT_BLOOM *bloom; WT_DECL_RET; WT_CURSOR *origcur; WT_CURSOR_JOIN_ENTRY *je, *jeend, *je2; WT_CURSOR_JOIN_ENDPOINT *end; + char *mainbuf; const char *def_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), NULL }; const char *raw_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), "raw", NULL }; + const char **config, *proj, *urimain; + size_t size; uint32_t f, k; + mainbuf = NULL; if (cjoin->entries_next == 0) WT_RET_MSG(session, EINVAL, "join cursor has not yet been joined with any other " "cursors"); - je = &cjoin->entries[0]; - jeend = &cjoin->entries[cjoin->entries_next]; - - /* - * For a single compare=le endpoint in the first iterated entry, - * construct a companion compare=ge endpoint that will actually - * be iterated. - */ - if (((je = cjoin->entries) != jeend) && - je->ends_next == 1 && F_ISSET(&je->ends[0], WT_CURJOIN_END_LT)) { - origcur = je->ends[0].cursor; - WT_RET(__curjoin_insert_endpoint(session, je, 0, &end)); - WT_RET(__wt_open_cursor(session, origcur->uri, - (WT_CURSOR *)cjoin, - F_ISSET(origcur, WT_CURSTD_RAW) ? raw_cfg : def_cfg, - &end->cursor)); - WT_RET(end->cursor->next(end->cursor)); - end->flags = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ | - WT_CURJOIN_END_OWN_CURSOR; + if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW)) + config = &raw_cfg[0]; + else + config = &def_cfg[0]; + urimain = cjoin->table->name; + if ((proj = cjoin->projection) != NULL) { + size = strlen(urimain) + strlen(proj) + 1; + WT_ERR(__wt_calloc(session, size, 1, &mainbuf)); + snprintf(mainbuf, size, "%s%s", urimain, proj); + urimain = mainbuf; } - WT_RET(__curjoin_entry_iter_init(session, cjoin, je, &cjoin->iter)); + WT_ERR(__wt_open_cursor(session, urimain, (WT_CURSOR *)cjoin, config, + &cjoin->main)); + jeend = &cjoin->entries[cjoin->entries_next]; for (je = cjoin->entries; je < jeend; je++) { + if (je->subjoin != NULL) { + WT_ERR(__curjoin_init_next(session, je->subjoin, + iterable)); + continue; + } __wt_stat_join_init_single(&je->stats); + /* + * For a single compare=le/lt endpoint in any entry that may + * be iterated, construct a companion compare=ge endpoint + * that will actually be iterated. + */ + if (iterable && je->ends_next == 1 && + F_ISSET(&je->ends[0], WT_CURJOIN_END_LT)) { + origcur = je->ends[0].cursor; + WT_ERR(__curjoin_insert_endpoint(session, je, 0, &end)); + WT_ERR(__wt_open_cursor(session, origcur->uri, + (WT_CURSOR *)cjoin, + F_ISSET(origcur, WT_CURSTD_RAW) ? raw_cfg : def_cfg, + &end->cursor)); + end->flags = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ | + WT_CURJOIN_END_OWN_CURSOR; + WT_ERR(end->cursor->next(end->cursor)); + F_CLR(je, WT_CURJOIN_ENTRY_DISJUNCTION); + } for (end = &je->ends[0]; end < &je->ends[je->ends_next]; end++) - WT_RET(__curjoin_endpoint_init_key(session, je, end)); + WT_ERR(__curjoin_endpoint_init_key(session, je, end)); /* - * The first entry is iterated as the 'outermost' cursor. - * For the common GE case, we don't have to test against - * the left reference key, we know it will be true since - * the btree is ordered. + * Do any needed Bloom filter initialization. Ignore Bloom + * filters for entries that will be iterated. They won't + * help since these entries either don't need an inclusion + * check or are doing any needed check during the iteration. */ - if (je == cjoin->entries && je->ends[0].flags == - (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ)) - F_SET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT); - - if (F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) { + if (!iterable && F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) { if (session->txn.isolation == WT_ISO_READ_UNCOMMITTED) - WT_RET_MSG(session, EINVAL, + WT_ERR_MSG(session, EINVAL, "join cursors with Bloom filters cannot be " "used with read-uncommitted isolation"); if (je->bloom == NULL) { @@ -516,10 +998,10 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) } je->bloom_bit_count = f; je->bloom_hash_count = k; - WT_RET(__wt_bloom_create(session, NULL, + WT_ERR(__wt_bloom_create(session, NULL, NULL, je->count, f, k, &je->bloom)); F_SET(je, WT_CURJOIN_ENTRY_OWN_BLOOM); - WT_RET(__curjoin_init_bloom(session, cjoin, + WT_ERR(__curjoin_init_bloom(session, cjoin, je, je->bloom)); /* * Share the Bloom filter, making all @@ -541,201 +1023,45 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) * merge into the shared one. The Bloom * parameters of the two filters must match. */ - WT_RET(__wt_bloom_create(session, NULL, + WT_ERR(__wt_bloom_create(session, NULL, NULL, je->count, je->bloom_bit_count, je->bloom_hash_count, &bloom)); - WT_RET(__curjoin_init_bloom(session, cjoin, + WT_ERR(__curjoin_init_bloom(session, cjoin, je, bloom)); - WT_RET(__wt_bloom_intersection(je->bloom, + WT_ERR(__wt_bloom_intersection(je->bloom, bloom)); - WT_RET(__wt_bloom_close(bloom)); + WT_ERR(__wt_bloom_close(bloom)); } } + if (!F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION)) + iterable = false; } - F_SET(cjoin, WT_CURJOIN_INITIALIZED); - return (ret); -} - -/* - * __curjoin_entry_in_range -- - * Check if a key is in the range specified by the entry, returning - * WT_NOTFOUND if not. - */ -static int -__curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, - WT_ITEM *curkey, bool skip_left) -{ - WT_COLLATOR *collator; - WT_CURSOR_JOIN_ENDPOINT *end, *endmax; - int cmp; - - collator = (entry->index != NULL) ? entry->index->collator : NULL; - endmax = &entry->ends[entry->ends_next]; - for (end = &entry->ends[skip_left ? 1 : 0]; end < endmax; end++) { - WT_RET(__wt_compare(session, collator, curkey, &end->key, - &cmp)); - if (!F_ISSET(end, WT_CURJOIN_END_LT)) { - if (cmp < 0 || - (cmp == 0 && - !F_ISSET(end, WT_CURJOIN_END_EQ)) || - (cmp > 0 && !F_ISSET(end, WT_CURJOIN_END_GT))) - WT_RET(WT_NOTFOUND); - } else { - if (cmp > 0 || - (cmp == 0 && - !F_ISSET(end, WT_CURJOIN_END_EQ)) || - (cmp < 0 && !F_ISSET(end, WT_CURJOIN_END_LT))) - WT_RET(WT_NOTFOUND); - } - } - return (0); -} - -typedef struct { - WT_CURSOR iface; - WT_CURSOR_JOIN_ENTRY *entry; - bool ismember; -} WT_CURJOIN_EXTRACTOR; - -/* - * __curjoin_extract_insert -- - * Handle a key produced by a custom extractor. - */ -static int -__curjoin_extract_insert(WT_CURSOR *cursor) { - WT_CURJOIN_EXTRACTOR *cextract; - WT_DECL_RET; - WT_ITEM ikey; - WT_SESSION_IMPL *session; - - cextract = (WT_CURJOIN_EXTRACTOR *)cursor; - /* - * This insert method may be called multiple times during a single - * extraction. If we already have a definitive answer to the - * membership question, exit early. - */ - if (cextract->ismember) - return (0); - - session = (WT_SESSION_IMPL *)cursor->session; - - WT_ITEM_SET(ikey, cursor->key); - /* - * We appended a padding byte to the key to avoid rewriting the last - * column. Strip that away here. - */ - WT_ASSERT(session, ikey.size > 0); - --ikey.size; - - ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false); - if (ret == WT_NOTFOUND) - ret = 0; - else if (ret == 0) - cextract->ismember = true; +err: __wt_free(session, mainbuf); return (ret); } /* - * __curjoin_entry_member -- - * Do a membership check for a particular index that was joined, - * if not a member, returns WT_NOTFOUND. + * __curjoin_insert_endpoint -- + * Insert a new entry into the endpoint array for the join entry. */ static int -__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, - WT_CURSOR_JOIN_ENTRY *entry, bool skip_left) +__curjoin_insert_endpoint(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, + u_int pos, WT_CURSOR_JOIN_ENDPOINT **newendp) { - WT_CURJOIN_EXTRACTOR extract_cursor; - WT_CURSOR *c; - WT_CURSOR_STATIC_INIT(iface, - __wt_cursor_get_key, /* get-key */ - __wt_cursor_get_value, /* get-value */ - __wt_cursor_set_key, /* set-key */ - __wt_cursor_set_value, /* set-value */ - __wt_cursor_compare_notsup, /* compare */ - __wt_cursor_equals_notsup, /* equals */ - __wt_cursor_notsup, /* next */ - __wt_cursor_notsup, /* prev */ - __wt_cursor_notsup, /* reset */ - __wt_cursor_notsup, /* search */ - __wt_cursor_search_near_notsup, /* search-near */ - __curjoin_extract_insert, /* insert */ - __wt_cursor_notsup, /* update */ - __wt_cursor_notsup, /* remove */ - __wt_cursor_reconfigure_notsup, /* reconfigure */ - __wt_cursor_notsup); /* close */ - WT_DECL_RET; - WT_INDEX *idx; - WT_ITEM *key, v; - bool bloom_found; - - if (skip_left && entry->ends_next == 1) - return (0); /* no checks to make */ - key = cjoin->iter->curkey; - entry->stats.accesses++; - bloom_found = false; - - if (entry->bloom != NULL) { - /* - * If we don't own the Bloom filter, we must be sharing one - * in a previous entry. So the shared filter has already - * been checked and passed. - */ - if (!F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) - return (0); - - /* - * If the item is not in the Bloom filter, we return - * immediately, otherwise, we still need to check the - * long way. - */ - WT_ERR(__wt_bloom_inmem_get(entry->bloom, key)); - bloom_found = true; - } - if (entry->index != NULL) { - /* - * If this entry is used by the iterator, then we already - * have the index key, and we won't have to do any extraction - * either. - */ - if (entry == cjoin->iter->entry) - WT_ITEM_SET(v, cjoin->iter->idxkey); - else { - memset(&v, 0, sizeof(v)); /* Keep lint quiet. */ - c = entry->main; - c->set_key(c, key); - if ((ret = c->search(c)) == 0) - ret = c->get_value(c, &v); - else if (ret == WT_NOTFOUND) - WT_ERR_MSG(session, WT_ERROR, - "main table for join is missing entry"); - WT_TRET(c->reset(c)); - WT_ERR(ret); - } - } else - WT_ITEM_SET(v, *key); + WT_CURSOR_JOIN_ENDPOINT *newend; - if ((idx = entry->index) != NULL && idx->extractor != NULL && - entry != cjoin->iter->entry) { - WT_CLEAR(extract_cursor); - extract_cursor.iface = iface; - extract_cursor.iface.session = &session->iface; - extract_cursor.iface.key_format = idx->exkey_format; - extract_cursor.ismember = false; - extract_cursor.entry = entry; - WT_ERR(idx->extractor->extract(idx->extractor, - &session->iface, key, &v, &extract_cursor.iface)); - if (!extract_cursor.ismember) - WT_ERR(WT_NOTFOUND); - } else - WT_ERR(__curjoin_entry_in_range(session, entry, &v, skip_left)); + WT_RET(__wt_realloc_def(session, &entry->ends_allocated, + entry->ends_next + 1, &entry->ends)); + newend = &entry->ends[pos]; + memmove(newend + 1, newend, + (entry->ends_next - pos) * sizeof(WT_CURSOR_JOIN_ENDPOINT)); + memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT)); + entry->ends_next++; + *newendp = newend; - if (0) { -err: if (ret == WT_NOTFOUND && bloom_found) - entry->stats.bloom_false_positive++; - } - return (ret); + return (0); } /* @@ -750,61 +1076,52 @@ __curjoin_next(WT_CURSOR *cursor) WT_CURSOR_JOIN_ITER *iter; WT_DECL_RET; WT_SESSION_IMPL *session; - bool skip_left; - u_int i; + int tret; cjoin = (WT_CURSOR_JOIN *)cursor; - CURSOR_API_CALL(cursor, session, next, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL); if (F_ISSET(cjoin, WT_CURJOIN_ERROR)) WT_ERR_MSG(session, WT_ERROR, "join cursor encountered previous error"); if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED)) - WT_ERR(__curjoin_init_iter(session, cjoin)); - - F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); + WT_ERR(__curjoin_init_next(session, cjoin, true)); + if (cjoin->iter == NULL) + WT_ERR(__curjoin_iter_init(session, cjoin, &cjoin->iter)); iter = cjoin->iter; + F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); -nextkey: - if ((ret = __curjoin_entry_iter_next(iter, cursor)) == 0) { - F_SET(cursor, WT_CURSTD_KEY_EXT); + while ((ret = __curjoin_iter_next(iter, cursor)) == 0) { + if ((ret = __curjoin_entries_in_range(session, cjoin, + iter->curkey, iter)) != WT_NOTFOUND) + break; + } + iter->positioned = (ret == 0); + if (ret != 0 && ret != WT_NOTFOUND) + WT_ERR(ret); + if (ret == 0) { /* - * We may have already established membership for the - * 'left' case for the first entry, since we're - * using that in our iteration. + * Position the 'main' cursor, this will be used to retrieve + * values from the cursor join. The key we have is raw, but + * the main cursor may not be raw. */ - skip_left = F_ISSET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT); - for (i = 0; i < cjoin->entries_next; i++) { - ret = __curjoin_entry_member(session, cjoin, - &cjoin->entries[i], skip_left); - if (ret == WT_NOTFOUND) { - /* - * If this is compare=eq on our outer iterator, - * and we've moved past it, we're done. - */ - if (iter->isequal && i == 0) - break; - goto nextkey; - } - skip_left = false; - WT_ERR(ret); - } - } else if (ret != WT_NOTFOUND) - WT_ERR(ret); + c = cjoin->main; + __wt_cursor_set_raw_key(c, iter->curkey); - if (ret == 0) { /* - * Position the 'main' cursor, this will be used to - * retrieve values from the cursor join. + * A failed search is not expected, convert WT_NOTFOUND into a + * generic error. */ - c = iter->main; - c->set_key(c, iter->curkey); - if ((ret = c->search(c)) != 0) - WT_ERR(c->search(c)); + if ((ret = c->search(c)) == WT_NOTFOUND) + ret = WT_ERROR; + WT_ERR(ret); + F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT); - } + } else if (ret == WT_NOTFOUND && + (tret = __curjoin_iter_close_all(iter)) != 0) + WT_ERR(tret); if (0) { err: F_SET(cjoin, WT_CURJOIN_ERROR); @@ -813,78 +1130,148 @@ err: F_SET(cjoin, WT_CURJOIN_ERROR); } /* - * __curjoin_reset -- - * WT_CURSOR::reset for join cursors. + * __curjoin_open_main -- + * For the given index, open the main file with a projection + * that is the index keys. */ static int -__curjoin_reset(WT_CURSOR *cursor) +__curjoin_open_main(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_CURSOR_JOIN_ENTRY *entry) { - WT_CURSOR_JOIN *cjoin; WT_DECL_RET; - WT_SESSION_IMPL *session; + WT_INDEX *idx; + char *main_uri, *newformat; + const char *raw_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), "raw", NULL }; + size_t len, newsize; - cjoin = (WT_CURSOR_JOIN *)cursor; + main_uri = NULL; + idx = entry->index; + + newsize = strlen(cjoin->table->name) + idx->colconf.len + 1; + WT_ERR(__wt_calloc(session, 1, newsize, &main_uri)); + snprintf(main_uri, newsize, "%s%.*s", + cjoin->table->name, (int)idx->colconf.len, + idx->colconf.str); + WT_ERR(__wt_open_cursor(session, main_uri, + (WT_CURSOR *)cjoin, raw_cfg, &entry->main)); + if (idx->extractor == NULL) { + /* + * Add no-op padding so trailing 'u' formats are not + * transformed to 'U'. This matches what happens in + * the index. We don't do this when we have an + * extractor, extractors already use the padding + * byte trick. + */ + len = strlen(entry->main->value_format) + 3; + WT_ERR(__wt_calloc(session, len, 1, &newformat)); + snprintf(newformat, len, "%s0x", + entry->main->value_format); + __wt_free(session, entry->main->value_format); + entry->main->value_format = newformat; + } - CURSOR_API_CALL(cursor, session, reset, NULL); +err: __wt_free(session, main_uri); + return (ret); +} - if (F_ISSET(cjoin, WT_CURJOIN_INITIALIZED)) - WT_ERR(__curjoin_entry_iter_reset(cjoin->iter)); +/* + * __curjoin_pack_recno -- + * Pack the given recno into a buffer; prepare an item referencing it. + * + */ +static int +__curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf, + size_t bufsize, WT_ITEM *item) +{ + WT_SESSION *wtsession; + size_t sz; -err: API_END_RET(session, ret); + wtsession = (WT_SESSION *)session; + WT_RET(wiredtiger_struct_size(wtsession, &sz, "r", r)); + WT_ASSERT(session, sz < bufsize); + WT_RET(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r)); + item->size = sz; + item->data = buf; + return (0); } /* - * __curjoin_close -- - * WT_CURSOR::close for join cursors. + * __curjoin_reset -- + * WT_CURSOR::reset for join cursors. */ static int -__curjoin_close(WT_CURSOR *cursor) +__curjoin_reset(WT_CURSOR *cursor) { WT_CURSOR_JOIN *cjoin; - WT_CURSOR_JOIN_ENDPOINT *end; - WT_CURSOR_JOIN_ENTRY *entry; WT_DECL_RET; WT_SESSION_IMPL *session; - u_int i; cjoin = (WT_CURSOR_JOIN *)cursor; - CURSOR_API_CALL(cursor, session, close, NULL); - - __wt_schema_release_table(session, cjoin->table); - /* These are owned by the table */ - cursor->internal_uri = NULL; - cursor->key_format = NULL; - if (cjoin->projection != NULL) { - __wt_free(session, cjoin->projection); - __wt_free(session, cursor->value_format); - } - - for (entry = cjoin->entries, i = 0; i < cjoin->entries_next; - entry++, i++) { - if (entry->main != NULL) - WT_TRET(entry->main->close(entry->main)); - if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) - WT_TRET(__wt_bloom_close(entry->bloom)); - for (end = &entry->ends[0]; - end < &entry->ends[entry->ends_next]; end++) { - F_CLR(end->cursor, WT_CURSTD_JOINED); - if (F_ISSET(end, WT_CURJOIN_END_OWN_CURSOR)) - WT_TRET(end->cursor->close(end->cursor)); - } - __wt_free(session, entry->ends); - __wt_free(session, entry->repack_format); - } + JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL); if (cjoin->iter != NULL) - WT_TRET(__curjoin_entry_iter_close(cjoin->iter)); - __wt_free(session, cjoin->entries); - WT_TRET(__wt_cursor_close(cursor)); + WT_ERR(__curjoin_iter_reset(cjoin->iter)); err: API_END_RET(session, ret); } /* + * __curjoin_split_key -- + * Copy the primary key from a cursor (either main table or index) + * to another cursor. When copying from an index file, the index + * key is also returned. + * + */ +static int +__curjoin_split_key(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_ITEM *idxkey, WT_CURSOR *tocur, WT_CURSOR *fromcur, + const char *repack_fmt, bool isindex) +{ + WT_CURSOR *firstcg_cur; + WT_CURSOR_INDEX *cindex; + WT_ITEM *keyp; + const uint8_t *p; + + if (isindex) { + cindex = ((WT_CURSOR_INDEX *)fromcur); + /* + * Repack tells us where the index key ends; advance past + * that to get where the raw primary key starts. + */ + WT_RET(__wt_struct_repack(session, cindex->child->key_format, + repack_fmt != NULL ? repack_fmt : cindex->iface.key_format, + &cindex->child->key, idxkey)); + WT_ASSERT(session, cindex->child->key.size > idxkey->size); + tocur->key.data = (uint8_t *)idxkey->data + idxkey->size; + tocur->key.size = cindex->child->key.size - idxkey->size; + if (WT_CURSOR_RECNO(tocur)) { + p = (const uint8_t *)tocur->key.data; + WT_RET(__wt_vunpack_uint(&p, tocur->key.size, + &tocur->recno)); + } else + tocur->recno = 0; + } else { + firstcg_cur = ((WT_CURSOR_TABLE *)fromcur)->cg_cursors[0]; + keyp = &firstcg_cur->key; + if (WT_CURSOR_RECNO(tocur)) { + WT_ASSERT(session, keyp->size == sizeof(uint64_t)); + tocur->recno = *(uint64_t *)keyp->data; + WT_RET(__curjoin_pack_recno(session, tocur->recno, + cjoin->recno_buf, sizeof(cjoin->recno_buf), + &tocur->key)); + } else { + WT_ITEM_SET(tocur->key, *keyp); + tocur->recno = 0; + } + idxkey->data = NULL; + idxkey->size = 0; + } + return (0); +} + +/* * __wt_curjoin_open -- * Initialize a join cursor. * @@ -979,33 +1366,51 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_CURSOR_INDEX *cindex; WT_CURSOR_JOIN_ENDPOINT *end; WT_CURSOR_JOIN_ENTRY *entry; - WT_DECL_RET; - bool hasins, needbloom, range_eq; - char *main_uri, *newformat; - const char *raw_cfg[] = { WT_CONFIG_BASE( - session, WT_SESSION_open_cursor), "raw", NULL }; - size_t len, newsize; + WT_CURSOR_JOIN *child; + bool hasins, needbloom, nested, range_eq; + size_t len; u_int i, ins, nonbloom; + uint8_t endrange; entry = NULL; hasins = needbloom = false; - ins = 0; /* -Wuninitialized */ - main_uri = NULL; - nonbloom = 0; /* -Wuninitialized */ + ins = nonbloom = 0; /* -Wuninitialized */ - for (i = 0; i < cjoin->entries_next; i++) { - if (cjoin->entries[i].index == idx) { - entry = &cjoin->entries[i]; - break; - } - if (!needbloom && i > 0 && - !F_ISSET(&cjoin->entries[i], WT_CURJOIN_ENTRY_BLOOM)) { - needbloom = true; - nonbloom = i; + if (cjoin->entries_next == 0) { + if (LF_ISSET(WT_CURJOIN_ENTRY_DISJUNCTION)) + F_SET(cjoin, WT_CURJOIN_DISJUNCTION); + } else if (LF_ISSET(WT_CURJOIN_ENTRY_DISJUNCTION) && + !F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION)) + WT_RET_MSG(session, EINVAL, + "operation=or does not match previous operation=and"); + else if (!LF_ISSET(WT_CURJOIN_ENTRY_DISJUNCTION) && + F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION)) + WT_RET_MSG(session, EINVAL, + "operation=and does not match previous operation=or"); + + nested = WT_PREFIX_MATCH(ref_cursor->uri, "join:"); + if (!nested) + for (i = 0; i < cjoin->entries_next; i++) { + if (cjoin->entries[i].index == idx && + cjoin->entries[i].subjoin == NULL) { + entry = &cjoin->entries[i]; + break; + } + if (!needbloom && i > 0 && + !F_ISSET(&cjoin->entries[i], + WT_CURJOIN_ENTRY_BLOOM)) { + needbloom = true; + nonbloom = i; + } } + else { + if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM)) + WT_RET_MSG(session, EINVAL, + "Bloom filters cannot be used with subjoins"); } + if (entry == NULL) { - WT_ERR(__wt_realloc_def(session, &cjoin->entries_allocated, + WT_RET(__wt_realloc_def(session, &cjoin->entries_allocated, cjoin->entries_next + 1, &cjoin->entries)); if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM) && needbloom) { /* @@ -1034,13 +1439,13 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, } else { /* Merge the join into an existing entry for this index */ if (count != 0 && entry->count != 0 && entry->count != count) - WT_ERR_MSG(session, EINVAL, + WT_RET_MSG(session, EINVAL, "count=%" PRIu64 " does not match " "previous count=%" PRIu64 " for this index", count, entry->count); if (LF_MASK(WT_CURJOIN_ENTRY_BLOOM) != F_MASK(entry, WT_CURJOIN_ENTRY_BLOOM)) - WT_ERR_MSG(session, EINVAL, + WT_RET_MSG(session, EINVAL, "join has incompatible strategy " "values for the same index"); @@ -1063,19 +1468,20 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, for (i = 0; i < entry->ends_next; i++) { end = &entry->ends[i]; range_eq = (range == WT_CURJOIN_END_EQ); + endrange = WT_CURJOIN_END_RANGE(end); if ((F_ISSET(end, WT_CURJOIN_END_GT) && ((range & WT_CURJOIN_END_GT) != 0 || range_eq)) || (F_ISSET(end, WT_CURJOIN_END_LT) && ((range & WT_CURJOIN_END_LT) != 0 || range_eq)) || - (WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ && + (endrange == WT_CURJOIN_END_EQ && (range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT)) != 0)) - WT_ERR_MSG(session, EINVAL, + WT_RET_MSG(session, EINVAL, "join has overlapping ranges"); if (range == WT_CURJOIN_END_EQ && - WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ && + endrange == WT_CURJOIN_END_EQ && !F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)) - WT_ERR_MSG(session, EINVAL, + WT_RET_MSG(session, EINVAL, "compare=eq can only be combined " "using operation=or"); @@ -1086,6 +1492,7 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, if (!hasins && ((range & WT_CURJOIN_END_GT) != 0 || (range == WT_CURJOIN_END_EQ && + endrange != WT_CURJOIN_END_EQ && !F_ISSET(end, WT_CURJOIN_END_GT)))) { ins = i; hasins = true; @@ -1098,70 +1505,35 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, entry->bloom_hash_count = WT_MAX(entry->bloom_hash_count, bloom_hash_count); } - WT_ERR(__curjoin_insert_endpoint(session, entry, - hasins ? ins : entry->ends_next, &end)); - end->cursor = ref_cursor; - F_SET(end, range); - - /* Open the main file with a projection of the indexed columns. */ - if (entry->main == NULL && idx != NULL) { - newsize = strlen(cjoin->table->name) + idx->colconf.len + 1; - WT_ERR(__wt_calloc(session, 1, newsize, &main_uri)); - snprintf(main_uri, newsize, "%s%.*s", - cjoin->table->name, (int)idx->colconf.len, - idx->colconf.str); - WT_ERR(__wt_open_cursor(session, main_uri, - (WT_CURSOR *)cjoin, raw_cfg, &entry->main)); - if (idx->extractor == NULL) { + if (nested) { + child = (WT_CURSOR_JOIN *)ref_cursor; + entry->subjoin = child; + child->parent = cjoin; + } else { + WT_RET(__curjoin_insert_endpoint(session, entry, + hasins ? ins : entry->ends_next, &end)); + end->cursor = ref_cursor; + F_SET(end, range); + + if (entry->main == NULL && idx != NULL) { /* - * Add no-op padding so trailing 'u' formats are not - * transformed to 'U'. This matches what happens in - * the index. We don't do this when we have an - * extractor, extractors already use the padding - * byte trick. + * Open the main file with a projection of the + * indexed columns. */ - len = strlen(entry->main->value_format) + 3; - WT_ERR(__wt_calloc(session, len, 1, &newformat)); - snprintf(newformat, len, "%s0x", - entry->main->value_format); - __wt_free(session, entry->main->value_format); - entry->main->value_format = newformat; - } + WT_RET(__curjoin_open_main(session, cjoin, entry)); - /* - * When we are repacking index keys to remove the primary - * key, we never want to transform trailing 'u'. Use no-op - * padding to force this. - */ - cindex = (WT_CURSOR_INDEX *)ref_cursor; - len = strlen(cindex->iface.key_format) + 3; - WT_ERR(__wt_calloc(session, len, 1, &entry->repack_format)); - snprintf(entry->repack_format, len, "%s0x", - cindex->iface.key_format); + /* + * When we are repacking index keys to remove the + * primary key, we never want to transform trailing + * 'u'. Use no-op padding to force this. + */ + cindex = (WT_CURSOR_INDEX *)ref_cursor; + len = strlen(cindex->iface.key_format) + 3; + WT_RET(__wt_calloc(session, len, 1, + &entry->repack_format)); + snprintf(entry->repack_format, len, "%s0x", + cindex->iface.key_format); + } } - -err: __wt_free(session, main_uri); - return (ret); -} - -/* - * __curjoin_insert_endpoint -- - * Insert a new entry into the endpoint array for the join entry. - */ -static int -__curjoin_insert_endpoint(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, - u_int pos, WT_CURSOR_JOIN_ENDPOINT **newendp) -{ - WT_CURSOR_JOIN_ENDPOINT *newend; - - WT_RET(__wt_realloc_def(session, &entry->ends_allocated, - entry->ends_next + 1, &entry->ends)); - newend = &entry->ends[pos]; - memmove(newend + 1, newend, - (entry->ends_next - pos) * sizeof(WT_CURSOR_JOIN_ENDPOINT)); - memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT)); - entry->ends_next++; - *newendp = newend; - return (0); } |