diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cursor/cur_join.c | 1132 |
1 files changed, 568 insertions, 564 deletions
diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index 0388a05a34b..93c1711ef93 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -10,16 +10,20 @@ static int __curjoin_entries_in_range(WT_SESSION_IMPL *, WT_CURSOR_JOIN *, WT_ITEM *, WT_CURSOR_JOIN_ITER *); -static int __curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *); -static int __curjoin_entry_iter_close_all(WT_CURSOR_JOIN_ITER *); -static bool __curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *); static int __curjoin_entry_in_range(WT_SESSION_IMPL *, WT_CURSOR_JOIN_ENTRY *, WT_ITEM *, WT_CURSOR_JOIN_ITER *); static int __curjoin_entry_member(WT_SESSION_IMPL *, WT_CURSOR_JOIN_ENTRY *, WT_ITEM *, WT_CURSOR_JOIN_ITER *); static int __curjoin_insert_endpoint(WT_SESSION_IMPL *, WT_CURSOR_JOIN_ENTRY *, u_int, WT_CURSOR_JOIN_ENDPOINT **); +static int __curjoin_iter_close(WT_CURSOR_JOIN_ITER *); +static int __curjoin_iter_close_all(WT_CURSOR_JOIN_ITER *); +static bool __curjoin_iter_ready(WT_CURSOR_JOIN_ITER *); static int __curjoin_iter_set_entry(WT_CURSOR_JOIN_ITER *, u_int); +static int __curjoin_pack_recno(WT_SESSION_IMPL *, uint64_t, uint8_t *, + size_t, WT_ITEM *); +static int __curjoin_split_key(WT_SESSION_IMPL *, WT_CURSOR_JOIN *, WT_ITEM *, + WT_CURSOR *, WT_CURSOR *, const char *, bool); #define WT_CURJOIN_ITER_CONSUMED(iter) \ ((iter)->entry_pos >= (iter)->entry_count) @@ -39,12 +43,12 @@ __wt_curjoin_joined(WT_CURSOR *cursor) } /* - * __curjoin_entry_iter_init -- + * __curjoin_iter_init -- * Initialize an iteration for the index managed by a join entry. * */ static int -__curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, +__curjoin_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_CURSOR_JOIN_ITER **iterp) { WT_CURSOR_JOIN_ITER *iter; @@ -60,6 +64,76 @@ __curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, } /* + * __curjoin_iter_close -- + * Close the iteration, release resources. + * + */ +static int +__curjoin_iter_close(WT_CURSOR_JOIN_ITER *iter) +{ + WT_DECL_RET; + + if (iter->cursor != NULL) + WT_TRET(iter->cursor->close(iter->cursor)); + __wt_free(iter->session, iter); + return (ret); +} + +/* + * __curjoin_iter_close_all -- + * Free the iterator and all of its children recursively. + * + */ +static int +__curjoin_iter_close_all(WT_CURSOR_JOIN_ITER *iter) +{ + WT_CURSOR_JOIN *parent; + WT_DECL_RET; + + if (iter->child) + WT_TRET(__curjoin_iter_close_all(iter->child)); + iter->child = NULL; + WT_ASSERT(iter->session, iter->cjoin->parent == NULL || + iter->cjoin->parent->iter->child == iter); + if ((parent = iter->cjoin->parent) != NULL) + parent->iter->child = NULL; + iter->cjoin->iter = NULL; + WT_TRET(__curjoin_iter_close(iter)); + return (ret); +} + +/* + * __curjoin_iter_reset -- + * Reset an iteration to the starting point. + * + */ +static int +__curjoin_iter_reset(WT_CURSOR_JOIN_ITER *iter) +{ + if (iter->child != NULL) + WT_RET(__curjoin_iter_close_all(iter->child)); + WT_RET(__curjoin_iter_set_entry(iter, 0)); + iter->positioned = false; + return (0); +} + +/* + * __curjoin_iter_ready -- + * Check the positioned flag for all nested iterators. + * + */ +static bool +__curjoin_iter_ready(WT_CURSOR_JOIN_ITER *iter) +{ + while (iter != NULL) { + if (!iter->positioned) + return (false); + iter = iter->child; + } + return (true); +} + +/* * __curjoin_iter_set_entry -- * Set the current entry for an iterator. * @@ -136,34 +210,13 @@ err: __wt_free(session, uri); } /* - * __curjoin_pack_recno -- - * Pack the given recno into a buffer; prepare an item referencing it. - * - */ -static int -__curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf, - size_t bufsize, WT_ITEM *item) -{ - WT_SESSION *wtsession; - size_t sz; - - wtsession = (WT_SESSION *)session; - WT_RET(wiredtiger_struct_size(wtsession, &sz, "r", r)); - WT_ASSERT(session, sz < bufsize); - WT_RET(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r)); - item->size = sz; - item->data = buf; - return (0); -} - -/* - * __curjoin_entry_iter_bump -- + * __curjoin_iter_bump -- * Called to advance the iterator to the next endpoint, * which may in turn advance to the next entry. * */ static int -__curjoin_entry_iter_bump(WT_CURSOR_JOIN_ITER *iter) +__curjoin_iter_bump(WT_CURSOR_JOIN_ITER *iter) { WT_CURSOR_JOIN_ENTRY *entry; WT_SESSION_IMPL *session; @@ -179,7 +232,7 @@ __curjoin_entry_iter_bump(WT_CURSOR_JOIN_ITER *iter) } iter->end_pos = iter->end_count = iter->end_skip = 0; if (entry->subjoin != NULL && entry->subjoin->iter != NULL) - WT_RET(__curjoin_entry_iter_close_all(entry->subjoin->iter)); + WT_RET(__curjoin_iter_close_all(entry->subjoin->iter)); if (++iter->entry_pos >= iter->entry_count) { iter->entry = NULL; @@ -187,7 +240,7 @@ __curjoin_entry_iter_bump(WT_CURSOR_JOIN_ITER *iter) } iter->entry = ++entry; if (entry->subjoin != NULL) { - WT_RET(__curjoin_entry_iter_init(session, entry->subjoin, + WT_RET(__curjoin_iter_init(session, entry->subjoin, &iter->child)); return (0); } @@ -196,66 +249,12 @@ __curjoin_entry_iter_bump(WT_CURSOR_JOIN_ITER *iter) } /* - * __curjoin_split_key -- - * Copy the primary key from a cursor (either main table or index) - * to another cursor. When copying from an index file, the index - * key is also returned. - * - */ -static int -__curjoin_split_key(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, - WT_ITEM *idxkey, WT_CURSOR *tocur, WT_CURSOR *fromcur, - const char *repack_fmt, bool isindex) -{ - WT_CURSOR *firstcg_cur; - WT_CURSOR_INDEX *cindex; - WT_ITEM *keyp; - const uint8_t *p; - - if (isindex) { - cindex = ((WT_CURSOR_INDEX *)fromcur); - /* - * Repack tells us where the index key ends; advance past - * that to get where the raw primary key starts. - */ - WT_RET(__wt_struct_repack(session, cindex->child->key_format, - repack_fmt != NULL ? repack_fmt : cindex->iface.key_format, - &cindex->child->key, idxkey)); - WT_ASSERT(session, cindex->child->key.size > idxkey->size); - tocur->key.data = (uint8_t *)idxkey->data + idxkey->size; - tocur->key.size = cindex->child->key.size - idxkey->size; - if (WT_CURSOR_RECNO(tocur)) { - p = (const uint8_t *)tocur->key.data; - WT_RET(__wt_vunpack_uint(&p, tocur->key.size, - &tocur->recno)); - } else - tocur->recno = 0; - } else { - firstcg_cur = ((WT_CURSOR_TABLE *)fromcur)->cg_cursors[0]; - keyp = &firstcg_cur->key; - if (WT_CURSOR_RECNO(tocur)) { - WT_ASSERT(session, keyp->size == sizeof(uint64_t)); - tocur->recno = *(uint64_t *)keyp->data; - WT_RET(__curjoin_pack_recno(session, tocur->recno, - cjoin->recno_buf, sizeof(cjoin->recno_buf), - &tocur->key)); - } else { - WT_ITEM_SET(tocur->key, *keyp); - tocur->recno = 0; - } - idxkey->data = NULL; - idxkey->size = 0; - } - return (0); -} - -/* - * __curjoin_entry_iter_next -- + * __curjoin_iter_next -- * Get the next item in an iteration. * */ static int -__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor) +__curjoin_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor) { WT_CURSOR_JOIN_ENTRY *entry; WT_DECL_RET; @@ -269,9 +268,9 @@ again: entry = iter->entry; if (entry->subjoin != NULL) { if (iter->child == NULL) - WT_RET(__curjoin_entry_iter_init(session, + WT_RET(__curjoin_iter_init(session, entry->subjoin, &iter->child)); - ret = __curjoin_entry_iter_next(iter->child, cursor); + ret = __curjoin_iter_next(iter->child, cursor); if (ret == 0) { /* The child did the work, we're done. */ iter->curkey = &cursor->key; @@ -279,16 +278,16 @@ again: return (ret); } else if (ret == WT_NOTFOUND) { - WT_RET(__curjoin_entry_iter_close_all(iter->child)); + WT_RET(__curjoin_iter_close_all(iter->child)); entry->subjoin->iter = NULL; iter->child = NULL; - WT_RET(__curjoin_entry_iter_bump(iter)); + WT_RET(__curjoin_iter_bump(iter)); ret = 0; } } else if (iter->positioned) { ret = iter->cursor->next(iter->cursor); if (ret == WT_NOTFOUND) { - WT_RET(__curjoin_entry_iter_bump(iter)); + WT_RET(__curjoin_iter_bump(iter)); ret = 0; } else WT_RET(ret); @@ -298,7 +297,7 @@ again: if (WT_CURJOIN_ITER_CONSUMED(iter)) return (WT_NOTFOUND); - if (!__curjoin_entry_iter_ready(iter)) + if (!__curjoin_iter_ready(iter)) goto again; WT_RET(ret); @@ -317,72 +316,380 @@ again: } /* - * __curjoin_entry_iter_reset -- - * Reset an iteration to the starting point. - * + * __curjoin_close -- + * WT_CURSOR::close for join cursors. */ static int -__curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter) +__curjoin_close(WT_CURSOR *cursor) { - if (iter->child != NULL) - WT_RET(__curjoin_entry_iter_close_all(iter->child)); - WT_RET(__curjoin_iter_set_entry(iter, 0)); - iter->positioned = false; + WT_CURSOR_JOIN *cjoin; + WT_CURSOR_JOIN_ENDPOINT *end; + WT_CURSOR_JOIN_ENTRY *entry; + WT_DECL_RET; + WT_SESSION_IMPL *session; + u_int i; + + cjoin = (WT_CURSOR_JOIN *)cursor; + + JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL); + + __wt_schema_release_table(session, cjoin->table); + /* These are owned by the table */ + cursor->internal_uri = NULL; + cursor->key_format = NULL; + if (cjoin->projection != NULL) { + __wt_free(session, cjoin->projection); + __wt_free(session, cursor->value_format); + } + + for (entry = cjoin->entries, i = 0; i < cjoin->entries_next; + entry++, i++) { + if (entry->subjoin != NULL) { + F_CLR(&entry->subjoin->iface, WT_CURSTD_JOINED); + entry->subjoin->parent = NULL; + } + if (entry->main != NULL) + WT_TRET(entry->main->close(entry->main)); + if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) + WT_TRET(__wt_bloom_close(entry->bloom)); + for (end = &entry->ends[0]; + end < &entry->ends[entry->ends_next]; end++) { + F_CLR(end->cursor, WT_CURSTD_JOINED); + if (F_ISSET(end, WT_CURJOIN_END_OWN_CURSOR)) + WT_TRET(end->cursor->close(end->cursor)); + } + __wt_free(session, entry->ends); + __wt_free(session, entry->repack_format); + } + + if (cjoin->iter != NULL) + WT_TRET(__curjoin_iter_close_all(cjoin->iter)); + if (cjoin->main != NULL) + WT_TRET(cjoin->main->close(cjoin->main)); + + __wt_free(session, cjoin->entries); + WT_TRET(__wt_cursor_close(cursor)); + +err: API_END_RET(session, ret); +} + +/* + * __curjoin_endpoint_init_key -- + * Set the key in the reference endpoint. + */ +static int +__curjoin_endpoint_init_key(WT_SESSION_IMPL *session, + WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ENDPOINT *endpoint) +{ + WT_CURSOR *cursor; + WT_CURSOR_INDEX *cindex; + WT_ITEM *k; + uint64_t r; + + if ((cursor = endpoint->cursor) != NULL) { + if (entry->index != NULL) { + /* Extract and save the index's logical key. */ + cindex = (WT_CURSOR_INDEX *)endpoint->cursor; + WT_RET(__wt_struct_repack(session, + cindex->child->key_format, + (entry->repack_format != NULL ? + entry->repack_format : cindex->iface.key_format), + &cindex->child->key, &endpoint->key)); + } else { + k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key; + if (WT_CURSOR_RECNO(cursor)) { + r = *(uint64_t *)k->data; + WT_RET(__curjoin_pack_recno(session, r, + endpoint->recno_buf, + sizeof(endpoint->recno_buf), + &endpoint->key)); + } + else + endpoint->key = *k; + } + } return (0); } /* - * __curjoin_entry_iter_ready -- - * Check the positioned flag for all nested iterators. - * + * __curjoin_entries_in_range -- + * Check if a key is in the range specified by the remaining entries, + * returning WT_NOTFOUND if not. */ -static bool -__curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter) +static int +__curjoin_entries_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_ITEM *curkey, WT_CURSOR_JOIN_ITER *iterarg) { - while (iter != NULL) { - if (!iter->positioned) - return (false); - iter = iter->child; + WT_CURSOR_JOIN_ENTRY *entry; + WT_CURSOR_JOIN_ITER *iter; + WT_DECL_RET; + int fastret, slowret; + u_int pos; + + iter = iterarg; + if (F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION)) { + fastret = 0; + slowret = WT_NOTFOUND; + } else { + fastret = WT_NOTFOUND; + slowret = 0; } - return (true); + pos = iter == NULL ? 0 : iter->entry_pos; + for (entry = &cjoin->entries[pos]; pos < cjoin->entries_next; + entry++, pos++) { + ret = __curjoin_entry_member(session, entry, curkey, iter); + if (ret == fastret) + return (fastret); + if (ret != slowret) + break; + iter = NULL; + } + + return (ret == 0 ? slowret : ret); } /* - * __curjoin_entry_iter_close -- - * Close the iteration, release resources. - * + * __curjoin_entry_in_range -- + * Check if a key is in the range specified by the entry, returning + * WT_NOTFOUND if not. */ static int -__curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *iter) +__curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, + WT_ITEM *curkey, WT_CURSOR_JOIN_ITER *iter) { + WT_COLLATOR *collator; + WT_CURSOR_JOIN_ENDPOINT *end, *endmax; + bool disjunction, passed; + int cmp; + u_int pos; + + collator = (entry->index != NULL) ? entry->index->collator : NULL; + endmax = &entry->ends[entry->ends_next]; + disjunction = F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION); + passed = false; + + /* + * The iterator may have already satisfied some endpoint conditions. + * If so and we're a disjunction, we're done. If so and we're a + * conjunction, we can start past the satisfied conditions. + */ + if (iter == NULL) + pos = 0; + else { + if (disjunction && iter->end_skip) + return (0); + pos = iter->end_pos + iter->end_skip; + } + + for (end = &entry->ends[pos]; end < endmax; end++) { + WT_RET(__wt_compare(session, collator, curkey, &end->key, + &cmp)); + switch (WT_CURJOIN_END_RANGE(end)) { + case WT_CURJOIN_END_EQ: + passed = (cmp == 0); + break; + + case WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ: + passed = (cmp >= 0); + WT_ASSERT(session, iter == NULL); + break; + + case WT_CURJOIN_END_GT: + passed = (cmp > 0); + if (passed && iter != NULL && pos == 0) + iter->end_skip = 1; + break; + + case WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ: + passed = (cmp <= 0); + break; + + case WT_CURJOIN_END_LT: + passed = (cmp < 0); + break; + + default: + WT_RET(__wt_illegal_value(session, NULL)); + break; + } + + if (!passed) { + if (iter != NULL && + (iter->is_equal || + F_ISSET(end, WT_CURJOIN_END_LT))) { + WT_RET(__curjoin_iter_bump(iter)); + return (WT_NOTFOUND); + } + if (!disjunction) + return (WT_NOTFOUND); + iter = NULL; + } else if (disjunction) + break; + } + if (disjunction && end == endmax) + return (WT_NOTFOUND); + else + return (0); +} + +typedef struct { + WT_CURSOR iface; + WT_CURSOR_JOIN_ENTRY *entry; + bool ismember; +} WT_CURJOIN_EXTRACTOR; + +/* + * __curjoin_extract_insert -- + * Handle a key produced by a custom extractor. + */ +static int +__curjoin_extract_insert(WT_CURSOR *cursor) { + WT_CURJOIN_EXTRACTOR *cextract; WT_DECL_RET; + WT_ITEM ikey; + WT_SESSION_IMPL *session; + + cextract = (WT_CURJOIN_EXTRACTOR *)cursor; + /* + * This insert method may be called multiple times during a single + * extraction. If we already have a definitive answer to the + * membership question, exit early. + */ + if (cextract->ismember) + return (0); + + session = (WT_SESSION_IMPL *)cursor->session; + + WT_ITEM_SET(ikey, cursor->key); + /* + * We appended a padding byte to the key to avoid rewriting the last + * column. Strip that away here. + */ + WT_ASSERT(session, ikey.size > 0); + --ikey.size; + + ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false); + if (ret == WT_NOTFOUND) + ret = 0; + else if (ret == 0) + cextract->ismember = true; - if (iter->cursor != NULL) - WT_TRET(iter->cursor->close(iter->cursor)); - __wt_free(iter->session, iter); return (ret); } /* - * __curjoin_entry_iter_close_all -- - * Free the iterator and all of its children recursively. - * + * __curjoin_entry_member -- + * Do a membership check for a particular index that was joined, + * if not a member, returns WT_NOTFOUND. */ static int -__curjoin_entry_iter_close_all(WT_CURSOR_JOIN_ITER *iter) +__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, + WT_ITEM *key, WT_CURSOR_JOIN_ITER *iter) { - WT_CURSOR_JOIN *parent; + WT_CURJOIN_EXTRACTOR extract_cursor; + WT_CURSOR *c; + WT_CURSOR_STATIC_INIT(iface, + __wt_cursor_get_key, /* get-key */ + __wt_cursor_get_value, /* get-value */ + __wt_cursor_set_key, /* set-key */ + __wt_cursor_set_value, /* set-value */ + __wt_cursor_compare_notsup, /* compare */ + __wt_cursor_equals_notsup, /* equals */ + __wt_cursor_notsup, /* next */ + __wt_cursor_notsup, /* prev */ + __wt_cursor_notsup, /* reset */ + __wt_cursor_notsup, /* search */ + __wt_cursor_search_near_notsup, /* search-near */ + __curjoin_extract_insert, /* insert */ + __wt_cursor_notsup, /* update */ + __wt_cursor_notsup, /* remove */ + __wt_cursor_reconfigure_notsup, /* reconfigure */ + __wt_cursor_notsup); /* close */ WT_DECL_RET; + WT_INDEX *idx; + WT_ITEM v; + bool bloom_found; - if (iter->child) - WT_TRET(__curjoin_entry_iter_close_all(iter->child)); - iter->child = NULL; - WT_ASSERT(iter->session, iter->cjoin->parent == NULL || - iter->cjoin->parent->iter->child == iter); - if ((parent = iter->cjoin->parent) != NULL) - parent->iter->child = NULL; - iter->cjoin->iter = NULL; - WT_TRET(__curjoin_entry_iter_close(iter)); + if (entry->subjoin == NULL && iter != NULL && + (iter->end_pos + iter->end_skip >= entry->ends_next || + (iter->end_skip > 0 && + F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)))) + return (0); /* no checks to make */ + + entry->stats.accesses++; + bloom_found = false; + + if (entry->bloom != NULL) { + /* + * If we don't own the Bloom filter, we must be sharing one + * in a previous entry. So the shared filter has already + * been checked and passed. + */ + if (!F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) + return (0); + + /* + * If the item is not in the Bloom filter, we return + * immediately, otherwise, we still need to check the + * long way. + */ + WT_ERR(__wt_bloom_inmem_get(entry->bloom, key)); + bloom_found = true; + } + if (entry->subjoin != NULL) { + WT_ASSERT(session, + iter == NULL || entry->subjoin == iter->child->cjoin); + ret = __curjoin_entries_in_range(session, entry->subjoin, + key, iter == NULL ? NULL : iter->child); + if (iter != NULL && + WT_CURJOIN_ITER_CONSUMED(iter->child)) { + WT_ERR(__curjoin_iter_bump(iter)); + ret = WT_NOTFOUND; + } + return (ret); + } + if (entry->index != NULL) { + /* + * If this entry is used by the iterator, then we already + * have the index key, and we won't have to do any + * extraction either. + */ + if (iter != NULL && entry == iter->entry) + WT_ITEM_SET(v, iter->idxkey); + else { + memset(&v, 0, sizeof(v)); /* Keep lint quiet. */ + c = entry->main; + c->set_key(c, key); + if ((ret = c->search(c)) == 0) + ret = c->get_value(c, &v); + else if (ret == WT_NOTFOUND) + WT_ERR_MSG(session, WT_ERROR, + "main table for join is missing entry"); + WT_TRET(c->reset(c)); + WT_ERR(ret); + } + } else + WT_ITEM_SET(v, *key); + + if ((idx = entry->index) != NULL && idx->extractor != NULL && + (iter == NULL || entry != iter->entry)) { + WT_CLEAR(extract_cursor); + extract_cursor.iface = iface; + extract_cursor.iface.session = &session->iface; + extract_cursor.iface.key_format = idx->exkey_format; + extract_cursor.ismember = false; + extract_cursor.entry = entry; + WT_ERR(idx->extractor->extract(idx->extractor, + &session->iface, key, &v, &extract_cursor.iface)); + if (!extract_cursor.ismember) + WT_ERR(WT_NOTFOUND); + } else + WT_ERR(__curjoin_entry_in_range(session, entry, &v, iter)); + + if (0) { +err: if (ret == WT_NOTFOUND && bloom_found) + entry->stats.bloom_false_positive++; + } return (ret); } @@ -591,44 +898,6 @@ err: if (c != NULL) } /* - * __curjoin_endpoint_init_key -- - * Set the key in the reference endpoint. - */ -static int -__curjoin_endpoint_init_key(WT_SESSION_IMPL *session, - WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ENDPOINT *endpoint) -{ - WT_CURSOR *cursor; - WT_CURSOR_INDEX *cindex; - WT_ITEM *k; - uint64_t r; - - if ((cursor = endpoint->cursor) != NULL) { - if (entry->index != NULL) { - /* Extract and save the index's logical key. */ - cindex = (WT_CURSOR_INDEX *)endpoint->cursor; - WT_RET(__wt_struct_repack(session, - cindex->child->key_format, - (entry->repack_format != NULL ? - entry->repack_format : cindex->iface.key_format), - &cindex->child->key, &endpoint->key)); - } else { - k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key; - if (WT_CURSOR_RECNO(cursor)) { - r = *(uint64_t *)k->data; - WT_RET(__curjoin_pack_recno(session, r, - endpoint->recno_buf, - sizeof(endpoint->recno_buf), - &endpoint->key)); - } - else - endpoint->key = *k; - } - } - return (0); -} - -/* * __curjoin_init_next -- * Initialize the cursor join when the next function is first called. */ @@ -775,285 +1044,25 @@ err: __wt_free(session, mainbuf); } /* - * __curjoin_entries_in_range -- - * Check if a key is in the range specified by the remaining entries, - * returning WT_NOTFOUND if not. - */ -static int -__curjoin_entries_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, - WT_ITEM *curkey, WT_CURSOR_JOIN_ITER *iterarg) -{ - WT_CURSOR_JOIN_ENTRY *entry; - WT_CURSOR_JOIN_ITER *iter; - WT_DECL_RET; - int fastret, slowret; - u_int pos; - - iter = iterarg; - if (F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION)) { - fastret = 0; - slowret = WT_NOTFOUND; - } else { - fastret = WT_NOTFOUND; - slowret = 0; - } - pos = iter == NULL ? 0 : iter->entry_pos; - for (entry = &cjoin->entries[pos]; pos < cjoin->entries_next; - entry++, pos++) { - ret = __curjoin_entry_member(session, entry, curkey, iter); - if (ret == fastret) - return (fastret); - if (ret != slowret) - break; - iter = NULL; - } - - return (ret == 0 ? slowret : ret); -} - -/* - * __curjoin_entry_in_range -- - * Check if a key is in the range specified by the entry, returning - * WT_NOTFOUND if not. - */ -static int -__curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, - WT_ITEM *curkey, WT_CURSOR_JOIN_ITER *iter) -{ - WT_COLLATOR *collator; - WT_CURSOR_JOIN_ENDPOINT *end, *endmax; - bool disjunction, passed; - int cmp; - u_int pos; - - collator = (entry->index != NULL) ? entry->index->collator : NULL; - endmax = &entry->ends[entry->ends_next]; - disjunction = F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION); - passed = false; - - /* - * The iterator may have already satisfied some endpoint conditions. - * If so and we're a disjunction, we're done. If so and we're a - * conjunction, we can start past the satisfied conditions. - */ - if (iter == NULL) - pos = 0; - else { - if (disjunction && iter->end_skip) - return (0); - pos = iter->end_pos + iter->end_skip; - } - - for (end = &entry->ends[pos]; end < endmax; end++) { - WT_RET(__wt_compare(session, collator, curkey, &end->key, - &cmp)); - switch (WT_CURJOIN_END_RANGE(end)) { - case WT_CURJOIN_END_EQ: - passed = (cmp == 0); - break; - - case WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ: - passed = (cmp >= 0); - WT_ASSERT(session, iter == NULL); - break; - - case WT_CURJOIN_END_GT: - passed = (cmp > 0); - if (passed && iter != NULL && pos == 0) - iter->end_skip = 1; - break; - - case WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ: - passed = (cmp <= 0); - break; - - case WT_CURJOIN_END_LT: - passed = (cmp < 0); - break; - - default: - WT_RET(__wt_illegal_value(session, NULL)); - break; - } - - if (!passed) { - if (iter != NULL && - (iter->is_equal || - F_ISSET(end, WT_CURJOIN_END_LT))) { - WT_RET(__curjoin_entry_iter_bump(iter)); - return (WT_NOTFOUND); - } - if (!disjunction) - return (WT_NOTFOUND); - iter = NULL; - } else if (disjunction) - break; - } - if (disjunction && end == endmax) - return (WT_NOTFOUND); - else - return (0); -} - -typedef struct { - WT_CURSOR iface; - WT_CURSOR_JOIN_ENTRY *entry; - bool ismember; -} WT_CURJOIN_EXTRACTOR; - -/* - * __curjoin_extract_insert -- - * Handle a key produced by a custom extractor. - */ -static int -__curjoin_extract_insert(WT_CURSOR *cursor) { - WT_CURJOIN_EXTRACTOR *cextract; - WT_DECL_RET; - WT_ITEM ikey; - WT_SESSION_IMPL *session; - - cextract = (WT_CURJOIN_EXTRACTOR *)cursor; - /* - * This insert method may be called multiple times during a single - * extraction. If we already have a definitive answer to the - * membership question, exit early. - */ - if (cextract->ismember) - return (0); - - session = (WT_SESSION_IMPL *)cursor->session; - - WT_ITEM_SET(ikey, cursor->key); - /* - * We appended a padding byte to the key to avoid rewriting the last - * column. Strip that away here. - */ - WT_ASSERT(session, ikey.size > 0); - --ikey.size; - - ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false); - if (ret == WT_NOTFOUND) - ret = 0; - else if (ret == 0) - cextract->ismember = true; - - return (ret); -} - -/* - * __curjoin_entry_member -- - * Do a membership check for a particular index that was joined, - * if not a member, returns WT_NOTFOUND. + * __curjoin_insert_endpoint -- + * Insert a new entry into the endpoint array for the join entry. */ static int -__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, - WT_ITEM *key, WT_CURSOR_JOIN_ITER *iter) +__curjoin_insert_endpoint(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, + u_int pos, WT_CURSOR_JOIN_ENDPOINT **newendp) { - WT_CURJOIN_EXTRACTOR extract_cursor; - WT_CURSOR *c; - WT_CURSOR_STATIC_INIT(iface, - __wt_cursor_get_key, /* get-key */ - __wt_cursor_get_value, /* get-value */ - __wt_cursor_set_key, /* set-key */ - __wt_cursor_set_value, /* set-value */ - __wt_cursor_compare_notsup, /* compare */ - __wt_cursor_equals_notsup, /* equals */ - __wt_cursor_notsup, /* next */ - __wt_cursor_notsup, /* prev */ - __wt_cursor_notsup, /* reset */ - __wt_cursor_notsup, /* search */ - __wt_cursor_search_near_notsup, /* search-near */ - __curjoin_extract_insert, /* insert */ - __wt_cursor_notsup, /* update */ - __wt_cursor_notsup, /* remove */ - __wt_cursor_reconfigure_notsup, /* reconfigure */ - __wt_cursor_notsup); /* close */ - WT_DECL_RET; - WT_INDEX *idx; - WT_ITEM v; - bool bloom_found; - - if (entry->subjoin == NULL && iter != NULL && - (iter->end_pos + iter->end_skip >= entry->ends_next || - (iter->end_skip > 0 && - F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)))) - return (0); /* no checks to make */ - - entry->stats.accesses++; - bloom_found = false; - - if (entry->bloom != NULL) { - /* - * If we don't own the Bloom filter, we must be sharing one - * in a previous entry. So the shared filter has already - * been checked and passed. - */ - if (!F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) - return (0); - - /* - * If the item is not in the Bloom filter, we return - * immediately, otherwise, we still need to check the - * long way. - */ - WT_ERR(__wt_bloom_inmem_get(entry->bloom, key)); - bloom_found = true; - } - if (entry->subjoin != NULL) { - WT_ASSERT(session, - iter == NULL || entry->subjoin == iter->child->cjoin); - ret = __curjoin_entries_in_range(session, entry->subjoin, - key, iter == NULL ? NULL : iter->child); - if (iter != NULL && - WT_CURJOIN_ITER_CONSUMED(iter->child)) { - WT_ERR(__curjoin_entry_iter_bump(iter)); - ret = WT_NOTFOUND; - } - return (ret); - } - if (entry->index != NULL) { - /* - * If this entry is used by the iterator, then we already - * have the index key, and we won't have to do any - * extraction either. - */ - if (iter != NULL && entry == iter->entry) - WT_ITEM_SET(v, iter->idxkey); - else { - memset(&v, 0, sizeof(v)); /* Keep lint quiet. */ - c = entry->main; - c->set_key(c, key); - if ((ret = c->search(c)) == 0) - ret = c->get_value(c, &v); - else if (ret == WT_NOTFOUND) - WT_ERR_MSG(session, WT_ERROR, - "main table for join is missing entry"); - WT_TRET(c->reset(c)); - WT_ERR(ret); - } - } else - WT_ITEM_SET(v, *key); + WT_CURSOR_JOIN_ENDPOINT *newend; - if ((idx = entry->index) != NULL && idx->extractor != NULL && - (iter == NULL || entry != iter->entry)) { - WT_CLEAR(extract_cursor); - extract_cursor.iface = iface; - extract_cursor.iface.session = &session->iface; - extract_cursor.iface.key_format = idx->exkey_format; - extract_cursor.ismember = false; - extract_cursor.entry = entry; - WT_ERR(idx->extractor->extract(idx->extractor, - &session->iface, key, &v, &extract_cursor.iface)); - if (!extract_cursor.ismember) - WT_ERR(WT_NOTFOUND); - } else - WT_ERR(__curjoin_entry_in_range(session, entry, &v, iter)); + WT_RET(__wt_realloc_def(session, &entry->ends_allocated, + entry->ends_next + 1, &entry->ends)); + newend = &entry->ends[pos]; + memmove(newend + 1, newend, + (entry->ends_next - pos) * sizeof(WT_CURSOR_JOIN_ENDPOINT)); + memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT)); + entry->ends_next++; + *newendp = newend; - if (0) { -err: if (ret == WT_NOTFOUND && bloom_found) - entry->stats.bloom_false_positive++; - } - return (ret); + return (0); } /* @@ -1081,11 +1090,11 @@ __curjoin_next(WT_CURSOR *cursor) if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED)) WT_ERR(__curjoin_init_next(session, cjoin, true)); if (cjoin->iter == NULL) - WT_ERR(__curjoin_entry_iter_init(session, cjoin, &cjoin->iter)); + WT_ERR(__curjoin_iter_init(session, cjoin, &cjoin->iter)); iter = cjoin->iter; F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); - while ((ret = __curjoin_entry_iter_next(iter, cursor)) == 0) { + while ((ret = __curjoin_iter_next(iter, cursor)) == 0) { if ((ret = __curjoin_entries_in_range(session, cjoin, iter->curkey, iter)) != WT_NOTFOUND) break; @@ -1111,7 +1120,7 @@ __curjoin_next(WT_CURSOR *cursor) WT_ERR(c->search(c)); F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT); } else if (ret == WT_NOTFOUND && - (tret = __curjoin_entry_iter_close_all(iter)) != 0) + (tret = __curjoin_iter_close_all(iter)) != 0) WT_ERR(tret); if (0) { @@ -1121,6 +1130,73 @@ err: F_SET(cjoin, WT_CURJOIN_ERROR); } /* + * __curjoin_open_main -- + * For the given index, open the main file with a projection + * that is the index keys. + */ +static int +__curjoin_open_main(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_CURSOR_JOIN_ENTRY *entry) +{ + WT_DECL_RET; + WT_INDEX *idx; + char *main_uri, *newformat; + const char *raw_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), "raw", NULL }; + size_t len, newsize; + + main_uri = NULL; + idx = entry->index; + + newsize = strlen(cjoin->table->name) + idx->colconf.len + 1; + WT_ERR(__wt_calloc(session, 1, newsize, &main_uri)); + snprintf(main_uri, newsize, "%s%.*s", + cjoin->table->name, (int)idx->colconf.len, + idx->colconf.str); + WT_ERR(__wt_open_cursor(session, main_uri, + (WT_CURSOR *)cjoin, raw_cfg, &entry->main)); + if (idx->extractor == NULL) { + /* + * Add no-op padding so trailing 'u' formats are not + * transformed to 'U'. This matches what happens in + * the index. We don't do this when we have an + * extractor, extractors already use the padding + * byte trick. + */ + len = strlen(entry->main->value_format) + 3; + WT_ERR(__wt_calloc(session, len, 1, &newformat)); + snprintf(newformat, len, "%s0x", + entry->main->value_format); + __wt_free(session, entry->main->value_format); + entry->main->value_format = newformat; + } + +err: __wt_free(session, main_uri); + return (ret); +} + +/* + * __curjoin_pack_recno -- + * Pack the given recno into a buffer; prepare an item referencing it. + * + */ +static int +__curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf, + size_t bufsize, WT_ITEM *item) +{ + WT_SESSION *wtsession; + size_t sz; + + wtsession = (WT_SESSION *)session; + WT_RET(wiredtiger_struct_size(wtsession, &sz, "r", r)); + WT_ASSERT(session, sz < bufsize); + WT_RET(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r)); + item->size = sz; + item->data = buf; + return (0); +} + +/* * __curjoin_reset -- * WT_CURSOR::reset for join cursors. */ @@ -1136,67 +1212,63 @@ __curjoin_reset(WT_CURSOR *cursor) JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL); if (cjoin->iter != NULL) - WT_ERR(__curjoin_entry_iter_reset(cjoin->iter)); + WT_ERR(__curjoin_iter_reset(cjoin->iter)); err: API_END_RET(session, ret); } /* - * __curjoin_close -- - * WT_CURSOR::close for join cursors. + * __curjoin_split_key -- + * Copy the primary key from a cursor (either main table or index) + * to another cursor. When copying from an index file, the index + * key is also returned. + * */ static int -__curjoin_close(WT_CURSOR *cursor) +__curjoin_split_key(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_ITEM *idxkey, WT_CURSOR *tocur, WT_CURSOR *fromcur, + const char *repack_fmt, bool isindex) { - WT_CURSOR_JOIN *cjoin; - WT_CURSOR_JOIN_ENDPOINT *end; - WT_CURSOR_JOIN_ENTRY *entry; - WT_DECL_RET; - WT_SESSION_IMPL *session; - u_int i; - - cjoin = (WT_CURSOR_JOIN *)cursor; - - JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL); - - __wt_schema_release_table(session, cjoin->table); - /* These are owned by the table */ - cursor->internal_uri = NULL; - cursor->key_format = NULL; - if (cjoin->projection != NULL) { - __wt_free(session, cjoin->projection); - __wt_free(session, cursor->value_format); - } + WT_CURSOR *firstcg_cur; + WT_CURSOR_INDEX *cindex; + WT_ITEM *keyp; + const uint8_t *p; - for (entry = cjoin->entries, i = 0; i < cjoin->entries_next; - entry++, i++) { - if (entry->subjoin != NULL) { - F_CLR(&entry->subjoin->iface, WT_CURSTD_JOINED); - entry->subjoin->parent = NULL; - } - if (entry->main != NULL) - WT_TRET(entry->main->close(entry->main)); - if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) - WT_TRET(__wt_bloom_close(entry->bloom)); - for (end = &entry->ends[0]; - end < &entry->ends[entry->ends_next]; end++) { - F_CLR(end->cursor, WT_CURSTD_JOINED); - if (F_ISSET(end, WT_CURJOIN_END_OWN_CURSOR)) - WT_TRET(end->cursor->close(end->cursor)); + if (isindex) { + cindex = ((WT_CURSOR_INDEX *)fromcur); + /* + * Repack tells us where the index key ends; advance past + * that to get where the raw primary key starts. + */ + WT_RET(__wt_struct_repack(session, cindex->child->key_format, + repack_fmt != NULL ? repack_fmt : cindex->iface.key_format, + &cindex->child->key, idxkey)); + WT_ASSERT(session, cindex->child->key.size > idxkey->size); + tocur->key.data = (uint8_t *)idxkey->data + idxkey->size; + tocur->key.size = cindex->child->key.size - idxkey->size; + if (WT_CURSOR_RECNO(tocur)) { + p = (const uint8_t *)tocur->key.data; + WT_RET(__wt_vunpack_uint(&p, tocur->key.size, + &tocur->recno)); + } else + tocur->recno = 0; + } else { + firstcg_cur = ((WT_CURSOR_TABLE *)fromcur)->cg_cursors[0]; + keyp = &firstcg_cur->key; + if (WT_CURSOR_RECNO(tocur)) { + WT_ASSERT(session, keyp->size == sizeof(uint64_t)); + tocur->recno = *(uint64_t *)keyp->data; + WT_RET(__curjoin_pack_recno(session, tocur->recno, + cjoin->recno_buf, sizeof(cjoin->recno_buf), + &tocur->key)); + } else { + WT_ITEM_SET(tocur->key, *keyp); + tocur->recno = 0; } - __wt_free(session, entry->ends); - __wt_free(session, entry->repack_format); + idxkey->data = NULL; + idxkey->size = 0; } - - if (cjoin->iter != NULL) - WT_TRET(__curjoin_entry_iter_close_all(cjoin->iter)); - if (cjoin->main != NULL) - WT_TRET(cjoin->main->close(cjoin->main)); - - __wt_free(session, cjoin->entries); - WT_TRET(__wt_cursor_close(cursor)); - -err: API_END_RET(session, ret); + return (0); } /* @@ -1283,52 +1355,6 @@ err: WT_TRET(__curjoin_close(cursor)); } /* - * __curjoin_open_main -- - * For the given index, open the main file with a projection - * that is the index keys. - */ -static int -__curjoin_open_main(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, - WT_CURSOR_JOIN_ENTRY *entry) -{ - WT_DECL_RET; - WT_INDEX *idx; - char *main_uri, *newformat; - const char *raw_cfg[] = { WT_CONFIG_BASE( - session, WT_SESSION_open_cursor), "raw", NULL }; - size_t len, newsize; - - main_uri = NULL; - idx = entry->index; - - newsize = strlen(cjoin->table->name) + idx->colconf.len + 1; - WT_ERR(__wt_calloc(session, 1, newsize, &main_uri)); - snprintf(main_uri, newsize, "%s%.*s", - cjoin->table->name, (int)idx->colconf.len, - idx->colconf.str); - WT_ERR(__wt_open_cursor(session, main_uri, - (WT_CURSOR *)cjoin, raw_cfg, &entry->main)); - if (idx->extractor == NULL) { - /* - * Add no-op padding so trailing 'u' formats are not - * transformed to 'U'. This matches what happens in - * the index. We don't do this when we have an - * extractor, extractors already use the padding - * byte trick. - */ - len = strlen(entry->main->value_format) + 3; - WT_ERR(__wt_calloc(session, len, 1, &newformat)); - snprintf(newformat, len, "%s0x", - entry->main->value_format); - __wt_free(session, entry->main->value_format); - entry->main->value_format = newformat; - } - -err: __wt_free(session, main_uri); - return (ret); -} - -/* * __wt_curjoin_join -- * Add a new join to a join cursor. */ @@ -1511,25 +1537,3 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, } return (0); } - -/* - * __curjoin_insert_endpoint -- - * Insert a new entry into the endpoint array for the join entry. - */ -static int -__curjoin_insert_endpoint(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, - u_int pos, WT_CURSOR_JOIN_ENDPOINT **newendp) -{ - WT_CURSOR_JOIN_ENDPOINT *newend; - - WT_RET(__wt_realloc_def(session, &entry->ends_allocated, - entry->ends_next + 1, &entry->ends)); - newend = &entry->ends[pos]; - memmove(newend + 1, newend, - (entry->ends_next - pos) * sizeof(WT_CURSOR_JOIN_ENDPOINT)); - memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT)); - entry->ends_next++; - *newendp = newend; - - return (0); -} |