diff options
Diffstat (limited to 'src/cursor/cur_join.c')
-rw-r--r-- | src/cursor/cur_join.c | 538 |
1 files changed, 331 insertions, 207 deletions
diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index 2cbefa68c5e..38a83217933 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -8,6 +8,9 @@ #include "wt_internal.h" +static int __curjoin_insert_endpoint(WT_SESSION_IMPL *, + WT_CURSOR_JOIN_ENTRY *, u_int, WT_CURSOR_JOIN_ENDPOINT **); + /* * __curjoin_entry_iter_init -- * Initialize an iteration for the index managed by a join entry. @@ -17,49 +20,56 @@ static int __curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ITER **iterp) { - WT_CURSOR *newcur; WT_CURSOR *to_dup; WT_DECL_RET; const char *raw_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), "raw", NULL }; const char *def_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), NULL }; - const char *uri, **config; - char *uribuf; + const char *urimain, **config; + char *mainbuf, *uri; WT_CURSOR_JOIN_ITER *iter; size_t size; iter = NULL; - uribuf = NULL; + mainbuf = uri = NULL; to_dup = entry->ends[0].cursor; - uri = to_dup->uri; if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW)) config = &raw_cfg[0]; else config = &def_cfg[0]; + size = strlen(to_dup->internal_uri) + 3; + WT_ERR(__wt_calloc(session, size, 1, &uri)); + snprintf(uri, size, "%s()", to_dup->internal_uri); + urimain = cjoin->table->name; if (cjoin->projection != NULL) { - size = strlen(uri) + strlen(cjoin->projection) + 1; - WT_ERR(__wt_calloc(session, size, 1, &uribuf)); - snprintf(uribuf, size, "%s%s", uri, cjoin->projection); - uri = uribuf; + size = strlen(urimain) + strlen(cjoin->projection) + 1; + WT_ERR(__wt_calloc(session, size, 1, &mainbuf)); + snprintf(mainbuf, size, "%s%s", urimain, cjoin->projection); + urimain = mainbuf; } - WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, config, - &newcur)); - WT_ERR(__wt_cursor_dup_position(to_dup, newcur)); + WT_ERR(__wt_calloc_one(session, &iter)); + WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, config, + &iter->cursor)); + WT_ERR(__wt_cursor_dup_position(to_dup, iter->cursor)); + WT_ERR(__wt_open_cursor(session, urimain, (WT_CURSOR *)cjoin, config, + &iter->main)); iter->cjoin = cjoin; iter->session = session; iter->entry = entry; - iter->cursor = newcur; - iter->advance = false; + iter->positioned = false; + iter->isequal = (entry->ends_next == 1 && + WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_EQ); *iterp = iter; if (0) { err: __wt_free(session, iter); } - __wt_free(session, uribuf); + __wt_free(session, mainbuf); + __wt_free(session, uri); return (ret); } @@ -72,18 +82,70 @@ static int __curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf, size_t bufsize, WT_ITEM *item) { - WT_DECL_RET; WT_SESSION *wtsession; size_t sz; wtsession = (WT_SESSION *)session; - WT_ERR(wiredtiger_struct_size(wtsession, &sz, "r", r)); + WT_RET(wiredtiger_struct_size(wtsession, &sz, "r", r)); WT_ASSERT(session, sz < bufsize); - WT_ERR(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r)); + WT_RET(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r)); item->size = sz; item->data = buf; + return (0); +} + +/* + * __curjoin_split_key -- + * Copy the primary key from a cursor (either main table or index) + * to another cursor. When copying from an index file, the index + * key is also returned. + * + */ +static int +__curjoin_split_key(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_ITEM *idxkey, WT_CURSOR *tocur, WT_CURSOR *fromcur, + const char *repack_fmt, bool isindex) +{ + WT_CURSOR *firstcg_cur; + WT_CURSOR_INDEX *cindex; + WT_ITEM *keyp; + const uint8_t *p; -err: return (ret); + if (isindex) { + cindex = ((WT_CURSOR_INDEX *)fromcur); + /* + * Repack tells us where the index key ends; advance past + * that to get where the raw primary key starts. + */ + WT_RET(__wt_struct_repack(session, cindex->child->key_format, + repack_fmt != NULL ? repack_fmt : cindex->iface.key_format, + &cindex->child->key, idxkey)); + WT_ASSERT(session, cindex->child->key.size > idxkey->size); + tocur->key.data = (uint8_t *)idxkey->data + idxkey->size; + tocur->key.size = cindex->child->key.size - idxkey->size; + if (WT_CURSOR_RECNO(tocur)) { + p = (const uint8_t *)tocur->key.data; + WT_RET(__wt_vunpack_uint(&p, tocur->key.size, + &tocur->recno)); + } else + tocur->recno = 0; + } else { + firstcg_cur = ((WT_CURSOR_TABLE *)fromcur)->cg_cursors[0]; + keyp = &firstcg_cur->key; + if (WT_CURSOR_RECNO(tocur)) { + WT_ASSERT(session, keyp->size == sizeof(uint64_t)); + tocur->recno = *(uint64_t *)keyp->data; + WT_RET(__curjoin_pack_recno(session, tocur->recno, + cjoin->recno_buf, sizeof(cjoin->recno_buf), + &tocur->key)); + } else { + WT_ITEM_SET(tocur->key, *keyp); + tocur->recno = 0; + } + idxkey->data = NULL; + idxkey->size = 0; + } + return (0); } /* @@ -92,45 +154,24 @@ err: return (ret); * */ static int -__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey, - uint64_t *rp) +__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor) { - WT_CURSOR *firstcg_cur; - WT_CURSOR_JOIN *cjoin; - WT_DECL_RET; - WT_SESSION_IMPL *session; - uint64_t r; - - if (iter->advance) - WT_ERR(iter->cursor->next(iter->cursor)); + if (iter->positioned) + WT_RET(iter->cursor->next(iter->cursor)); else - iter->advance = true; - - session = iter->session; - cjoin = iter->cjoin; + iter->positioned = true; /* * Set our key to the primary key, we'll also need this * to check membership. */ - if (iter->entry->index != NULL) - firstcg_cur = ((WT_CURSOR_INDEX *)iter->cursor)->cg_cursors[0]; - else - firstcg_cur = ((WT_CURSOR_TABLE *)iter->cursor)->cg_cursors[0]; - if (WT_CURSOR_RECNO(&cjoin->iface)) { - r = *(uint64_t *)firstcg_cur->key.data; - WT_ERR(__curjoin_pack_recno(session, r, cjoin->recno_buf, - sizeof(cjoin->recno_buf), primkey)); - *rp = r; - } else { - WT_ITEM_SET(*primkey, firstcg_cur->key); - *rp = 0; - } - iter->curkey = primkey; + WT_RET(__curjoin_split_key(iter->session, iter->cjoin, &iter->idxkey, + cursor, iter->cursor, iter->entry->repack_format, + iter->entry->index != NULL)); + iter->curkey = &cursor->key; iter->entry->stats.actual_count++; iter->entry->stats.accesses++; - -err: return (ret); + return (0); } /* @@ -141,17 +182,15 @@ err: return (ret); static int __curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter) { - WT_DECL_RET; - - if (iter->advance) { - WT_ERR(iter->cursor->reset(iter->cursor)); - WT_ERR(__wt_cursor_dup_position( + if (iter->positioned) { + WT_RET(iter->cursor->reset(iter->cursor)); + WT_RET(iter->main->reset(iter->main)); + WT_RET(__wt_cursor_dup_position( iter->cjoin->entries[0].ends[0].cursor, iter->cursor)); - iter->advance = false; + iter->positioned = false; iter->entry->stats.actual_count = 0; } - -err: return (ret); + return (0); } /* @@ -162,7 +201,7 @@ err: return (ret); static bool __curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter) { - return (iter->advance); + return (iter->positioned); } /* @@ -177,6 +216,8 @@ __curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *iter) if (iter->cursor != NULL) WT_TRET(iter->cursor->close(iter->cursor)); + if (iter->main != NULL) + WT_TRET(iter->main->close(iter->main)); __wt_free(iter->session, iter); return (ret); @@ -232,10 +273,8 @@ __curjoin_get_value(WT_CURSOR *cursor, ...) !__curjoin_entry_iter_ready(iter)) WT_ERR_MSG(session, EINVAL, "join cursor must be advanced with next()"); - if (iter->entry->index != NULL) - WT_ERR(__wt_curindex_get_valuev(iter->cursor, ap)); - else - WT_ERR(__wt_curtable_get_valuev(iter->cursor, ap)); + + WT_ERR(__wt_curtable_get_valuev(iter->main, ap)); err: va_end(ap); API_END_RET(session, ret); @@ -251,43 +290,26 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, { WT_COLLATOR *collator; WT_CURSOR *c; - WT_CURSOR_INDEX *cindex; WT_CURSOR_JOIN_ENDPOINT *end, *endmax; WT_DECL_RET; WT_DECL_ITEM(uribuf); - WT_ITEM curkey, curvalue, *k; - WT_TABLE *maintable; + WT_ITEM curkey, curvalue; const char *raw_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), "raw", NULL }; - const char *mainkey_str, *p; - void *allocbuf; - size_t mainkey_len, size; - u_int i; + const char *uri; + size_t size; int cmp, skip; c = NULL; - allocbuf = NULL; skip = 0; - if (entry->index != NULL) { + if (entry->index != NULL) /* - * Open a cursor having a projection of the keys of the - * index we're comparing against. Open it raw, we're - * going to compare it to the raw keys of the - * reference cursors. + * Open the raw index. We're avoiding any references + * to the main table, they may be expensive. */ - maintable = ((WT_CURSOR_TABLE *)entry->main)->table; - mainkey_str = maintable->colconf.str + 1; - for (p = mainkey_str, i = 0; - p != NULL && i < maintable->nkey_columns; i++) - p = strchr(p + 1, ','); - WT_ASSERT(session, p != 0); - mainkey_len = WT_PTRDIFF(p, mainkey_str); - size = strlen(entry->index->name) + mainkey_len + 3; - WT_ERR(__wt_scr_alloc(session, size, &uribuf)); - WT_ERR(__wt_buf_fmt(session, uribuf, "%s(%.*s)", - entry->index->name, (int)mainkey_len, mainkey_str)); - } else { + uri = entry->index->source; + else { /* * For joins on the main table, we just need the primary * key for comparison, we don't need any values. @@ -296,35 +318,38 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_ERR(__wt_scr_alloc(session, size, &uribuf)); WT_ERR(__wt_buf_fmt(session, uribuf, "%s()", cjoin->table->name)); + uri = uribuf->data; } - WT_ERR(__wt_open_cursor( - session, uribuf->data, &cjoin->iface, raw_cfg, &c)); + WT_ERR(__wt_open_cursor(session, uri, &cjoin->iface, raw_cfg, &c)); /* Initially position the cursor if necessary. */ endmax = &entry->ends[entry->ends_next]; - if ((end = &entry->ends[0]) < endmax && - F_ISSET(end, WT_CURJOIN_END_GE)) { - WT_ERR(__wt_cursor_dup_position(end->cursor, c)); - if (end->flags == WT_CURJOIN_END_GE) - skip = 1; + if ((end = &entry->ends[0]) < endmax) { + if (F_ISSET(end, WT_CURJOIN_END_GT) || + WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ) { + WT_ERR(__wt_cursor_dup_position(end->cursor, c)); + if (WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_GE) + skip = 1; + } else if (F_ISSET(end, WT_CURJOIN_END_LT)) { + if ((ret = c->next(c)) == WT_NOTFOUND) + goto done; + WT_ERR(ret); + } else + WT_ERR(__wt_illegal_value(session, NULL)); } collator = (entry->index == NULL) ? NULL : entry->index->collator; while (ret == 0) { WT_ERR(c->get_key(c, &curkey)); if (entry->index != NULL) { - cindex = (WT_CURSOR_INDEX *)c; - if (cindex->index->extractor == NULL) { - /* - * Repack so it's comparable to the - * reference endpoints. - */ - k = &cindex->child->key; - WT_ERR(__wt_struct_repack(session, - cindex->child->key_format, - entry->main->value_format, k, &curkey, - &allocbuf)); - } else - curkey = cindex->child->key; + /* + * Repack so it's comparable to the + * reference endpoints. + */ + WT_ERR(__wt_struct_repack(session, + c->key_format, + (entry->repack_format != NULL ? + entry->repack_format : entry->index->idxkey_format), + &c->key, &curkey)); } for (end = &entry->ends[skip]; end < endmax; end++) { WT_ERR(__wt_compare(session, collator, &curkey, @@ -345,8 +370,12 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, goto done; } } - if (entry->index != NULL) - WT_ERR(c->get_value(c, &curvalue)); + if (entry->index != NULL) { + curvalue.data = + (unsigned char *)curkey.data + curkey.size; + WT_ASSERT(session, c->key.size > curkey.size); + curvalue.size = c->key.size - curkey.size; + } else WT_ERR(c->get_key(c, &curvalue)); WT_ERR(__wt_bloom_insert(bloom, &curvalue)); @@ -361,7 +390,6 @@ done: err: if (c != NULL) WT_TRET(c->close(c)); __wt_scr_free(session, &uribuf); - __wt_free(session, allocbuf); return (ret); } @@ -375,27 +403,23 @@ __curjoin_endpoint_init_key(WT_SESSION_IMPL *session, { WT_CURSOR *cursor; WT_CURSOR_INDEX *cindex; - WT_DECL_RET; WT_ITEM *k; uint64_t r; - void *allocbuf; - allocbuf = NULL; if ((cursor = endpoint->cursor) != NULL) { if (entry->index != NULL) { /* Extract and save the index's logical key. */ cindex = (WT_CURSOR_INDEX *)endpoint->cursor; - WT_ERR(__wt_struct_repack(session, + WT_RET(__wt_struct_repack(session, cindex->child->key_format, - cindex->iface.key_format, - &cindex->child->key, &endpoint->key, &allocbuf)); - if (allocbuf != NULL) - F_SET(endpoint, WT_CURJOIN_END_OWN_KEY); + (entry->repack_format != NULL ? + entry->repack_format : cindex->iface.key_format), + &cindex->child->key, &endpoint->key)); } else { k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key; if (WT_CURSOR_RECNO(cursor)) { r = *(uint64_t *)k->data; - WT_ERR(__curjoin_pack_recno(session, r, + WT_RET(__curjoin_pack_recno(session, r, endpoint->recno_buf, sizeof(endpoint->recno_buf), &endpoint->key)); @@ -404,10 +428,7 @@ __curjoin_endpoint_init_key(WT_SESSION_IMPL *session, endpoint->key = *k; } } - if (0) { -err: __wt_free(session, allocbuf); - } - return (ret); + return (0); } /* @@ -419,8 +440,13 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) { WT_BLOOM *bloom; WT_DECL_RET; + WT_CURSOR *origcur; WT_CURSOR_JOIN_ENTRY *je, *jeend, *je2; WT_CURSOR_JOIN_ENDPOINT *end; + const char *def_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), NULL }; + const char *raw_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), "raw", NULL }; uint32_t f, k; if (cjoin->entries_next == 0) @@ -429,9 +455,27 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) "cursors"); je = &cjoin->entries[0]; + jeend = &cjoin->entries[cjoin->entries_next]; + + /* + * For a single compare=le endpoint in the first iterated entry, + * construct a companion compare=ge endpoint that will actually + * be iterated. + */ + if (((je = cjoin->entries) != jeend) && + je->ends_next == 1 && F_ISSET(&je->ends[0], WT_CURJOIN_END_LT)) { + origcur = je->ends[0].cursor; + WT_RET(__curjoin_insert_endpoint(session, je, 0, &end)); + WT_RET(__wt_open_cursor(session, origcur->uri, + (WT_CURSOR *)cjoin, + F_ISSET(origcur, WT_CURSTD_RAW) ? raw_cfg : def_cfg, + &end->cursor)); + WT_RET(end->cursor->next(end->cursor)); + end->flags = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ | + WT_CURJOIN_END_OWN_CURSOR; + } WT_RET(__curjoin_entry_iter_init(session, cjoin, je, &cjoin->iter)); - jeend = &cjoin->entries[cjoin->entries_next]; for (je = cjoin->entries; je < jeend; je++) { __wt_stat_join_init_single(&je->stats); for (end = &je->ends[0]; end < &je->ends[je->ends_next]; @@ -449,6 +493,10 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) F_SET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT); if (F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) { + if (session->txn.isolation == WT_ISO_READ_UNCOMMITTED) + WT_RET_MSG(session, EINVAL, + "join cursors with Bloom filters cannot be " + "used with read-uncommitted isolation"); if (je->bloom == NULL) { /* * Look for compatible filters to be shared, @@ -520,35 +568,34 @@ __curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, { WT_COLLATOR *collator; WT_CURSOR_JOIN_ENDPOINT *end, *endmax; - WT_DECL_RET; int cmp; collator = (entry->index != NULL) ? entry->index->collator : NULL; endmax = &entry->ends[entry->ends_next]; for (end = &entry->ends[skip_left ? 1 : 0]; end < endmax; end++) { - WT_ERR(__wt_compare(session, collator, curkey, &end->key, + WT_RET(__wt_compare(session, collator, curkey, &end->key, &cmp)); if (!F_ISSET(end, WT_CURJOIN_END_LT)) { if (cmp < 0 || (cmp == 0 && !F_ISSET(end, WT_CURJOIN_END_EQ)) || (cmp > 0 && !F_ISSET(end, WT_CURJOIN_END_GT))) - WT_ERR(WT_NOTFOUND); + WT_RET(WT_NOTFOUND); } else { if (cmp > 0 || (cmp == 0 && !F_ISSET(end, WT_CURJOIN_END_EQ)) || (cmp < 0 && !F_ISSET(end, WT_CURJOIN_END_LT))) - WT_ERR(WT_NOTFOUND); + WT_RET(WT_NOTFOUND); } } -err: return (ret); + return (0); } typedef struct { WT_CURSOR iface; WT_CURSOR_JOIN_ENTRY *entry; - int ismember; + bool ismember; } WT_CURJOIN_EXTRACTOR; /* @@ -584,8 +631,8 @@ __curjoin_extract_insert(WT_CURSOR *cursor) { ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false); if (ret == WT_NOTFOUND) ret = 0; - else - cextract->ismember = 1; + else if (ret == 0) + cextract->ismember = true; return (ret); } @@ -602,27 +649,29 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_CURJOIN_EXTRACTOR extract_cursor; WT_CURSOR *c; WT_CURSOR_STATIC_INIT(iface, - __wt_cursor_get_key, /* get-key */ - __wt_cursor_get_value, /* get-value */ - __wt_cursor_set_key, /* set-key */ - __wt_cursor_set_value, /* set-value */ - __wt_cursor_notsup, /* compare */ - __wt_cursor_notsup, /* equals */ - __wt_cursor_notsup, /* next */ - __wt_cursor_notsup, /* prev */ - __wt_cursor_notsup, /* reset */ - __wt_cursor_notsup, /* search */ - __wt_cursor_notsup, /* search-near */ - __curjoin_extract_insert, /* insert */ - __wt_cursor_notsup, /* update */ - __wt_cursor_notsup, /* reconfigure */ - __wt_cursor_notsup, /* remove */ - __wt_cursor_notsup); /* close */ + __wt_cursor_get_key, /* get-key */ + __wt_cursor_get_value, /* get-value */ + __wt_cursor_set_key, /* set-key */ + __wt_cursor_set_value, /* set-value */ + __wt_cursor_compare_notsup, /* compare */ + __wt_cursor_equals_notsup, /* equals */ + __wt_cursor_notsup, /* next */ + __wt_cursor_notsup, /* prev */ + __wt_cursor_notsup, /* reset */ + __wt_cursor_notsup, /* search */ + __wt_cursor_search_near_notsup, /* search-near */ + __curjoin_extract_insert, /* insert */ + __wt_cursor_notsup, /* update */ + __wt_cursor_notsup, /* remove */ + __wt_cursor_reconfigure_notsup, /* reconfigure */ + __wt_cursor_notsup); /* close */ WT_DECL_RET; WT_INDEX *idx; WT_ITEM *key, v; bool bloom_found; + if (skip_left && entry->ends_next == 1) + return (0); /* no checks to make */ key = cjoin->iter->curkey; entry->stats.accesses++; bloom_found = false; @@ -645,24 +694,35 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, bloom_found = true; } if (entry->index != NULL) { - memset(&v, 0, sizeof(v)); /* Keep lint quiet. */ - c = entry->main; - c->set_key(c, key); - if ((ret = c->search(c)) == 0) - ret = c->get_value(c, &v); - else if (ret == WT_NOTFOUND) - WT_ERR_MSG(session, WT_ERROR, - "main table for join is missing entry."); - WT_TRET(c->reset(c)); - WT_ERR(ret); + /* + * If this entry is used by the iterator, then we already + * have the index key, and we won't have to do any extraction + * either. + */ + if (entry == cjoin->iter->entry) + WT_ITEM_SET(v, cjoin->iter->idxkey); + else { + memset(&v, 0, sizeof(v)); /* Keep lint quiet. */ + c = entry->main; + c->set_key(c, key); + if ((ret = c->search(c)) == 0) + ret = c->get_value(c, &v); + else if (ret == WT_NOTFOUND) + WT_ERR_MSG(session, WT_ERROR, + "main table for join is missing entry"); + WT_TRET(c->reset(c)); + WT_ERR(ret); + } } else - v = *key; + WT_ITEM_SET(v, *key); - if ((idx = entry->index) != NULL && idx->extractor != NULL) { + if ((idx = entry->index) != NULL && idx->extractor != NULL && + entry != cjoin->iter->entry) { + WT_CLEAR(extract_cursor); extract_cursor.iface = iface; extract_cursor.iface.session = &session->iface; extract_cursor.iface.key_format = idx->exkey_format; - extract_cursor.ismember = 0; + extract_cursor.ismember = false; extract_cursor.entry = entry; WT_ERR(idx->extractor->extract(idx->extractor, &session->iface, key, &v, &extract_cursor.iface)); @@ -685,7 +745,9 @@ err: if (ret == WT_NOTFOUND && bloom_found) static int __curjoin_next(WT_CURSOR *cursor) { + WT_CURSOR *c; WT_CURSOR_JOIN *cjoin; + WT_CURSOR_JOIN_ITER *iter; WT_DECL_RET; WT_SESSION_IMPL *session; bool skip_left; @@ -701,9 +763,11 @@ __curjoin_next(WT_CURSOR *cursor) if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED)) WT_ERR(__curjoin_init_iter(session, cjoin)); + F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); + iter = cjoin->iter; + nextkey: - if ((ret = __curjoin_entry_iter_next(cjoin->iter, &cursor->key, - &cursor->recno)) == 0) { + if ((ret = __curjoin_entry_iter_next(iter, cursor)) == 0) { F_SET(cursor, WT_CURSTD_KEY_EXT); /* @@ -715,11 +779,31 @@ nextkey: for (i = 0; i < cjoin->entries_next; i++) { ret = __curjoin_entry_member(session, cjoin, &cjoin->entries[i], skip_left); - if (ret == WT_NOTFOUND) + if (ret == WT_NOTFOUND) { + /* + * If this is compare=eq on our outer iterator, + * and we've moved past it, we're done. + */ + if (iter->isequal && i == 0) + break; goto nextkey; + } skip_left = false; WT_ERR(ret); } + } else if (ret != WT_NOTFOUND) + WT_ERR(ret); + + if (ret == 0) { + /* + * Position the 'main' cursor, this will be used to + * retrieve values from the cursor join. + */ + c = iter->main; + c->set_key(c, iter->curkey); + if ((ret = c->search(c)) != 0) + WT_ERR(c->search(c)); + F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT); } if (0) { @@ -785,10 +869,11 @@ __curjoin_close(WT_CURSOR *cursor) for (end = &entry->ends[0]; end < &entry->ends[entry->ends_next]; end++) { F_CLR(end->cursor, WT_CURSTD_JOINED); - if (F_ISSET(end, WT_CURJOIN_END_OWN_KEY)) - __wt_free(session, end->key.data); + if (F_ISSET(end, WT_CURJOIN_END_OWN_CURSOR)) + WT_TRET(end->cursor->close(end->cursor)); } __wt_free(session, entry->ends); + __wt_free(session, entry->repack_format); } if (cjoin->iter != NULL) @@ -810,22 +895,22 @@ __wt_curjoin_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp) { WT_CURSOR_STATIC_INIT(iface, - __curjoin_get_key, /* get-key */ - __curjoin_get_value, /* get-value */ - __wt_cursor_notsup, /* set-key */ - __wt_cursor_notsup, /* set-value */ - __wt_cursor_notsup, /* compare */ - __wt_cursor_notsup, /* equals */ - __curjoin_next, /* next */ - __wt_cursor_notsup, /* prev */ - __curjoin_reset, /* reset */ - __wt_cursor_notsup, /* search */ - __wt_cursor_notsup, /* search-near */ - __wt_cursor_notsup, /* insert */ - __wt_cursor_notsup, /* update */ - __wt_cursor_notsup, /* remove */ - __wt_cursor_notsup, /* reconfigure */ - __curjoin_close); /* close */ + __curjoin_get_key, /* get-key */ + __curjoin_get_value, /* get-value */ + __wt_cursor_set_key_notsup, /* set-key */ + __wt_cursor_set_value_notsup, /* set-value */ + __wt_cursor_compare_notsup, /* compare */ + __wt_cursor_equals_notsup, /* equals */ + __curjoin_next, /* next */ + __wt_cursor_notsup, /* prev */ + __curjoin_reset, /* reset */ + __wt_cursor_notsup, /* search */ + __wt_cursor_search_near_notsup, /* search-near */ + __wt_cursor_notsup, /* insert */ + __wt_cursor_notsup, /* update */ + __wt_cursor_notsup, /* remove */ + __wt_cursor_reconfigure_notsup, /* reconfigure */ + __curjoin_close); /* close */ WT_CURSOR *cursor; WT_CURSOR_JOIN *cjoin; WT_DECL_ITEM(tmp); @@ -891,22 +976,22 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_INDEX *idx, WT_CURSOR *ref_cursor, uint8_t flags, uint8_t range, uint64_t count, uint32_t bloom_bit_count, uint32_t bloom_hash_count) { + WT_CURSOR_INDEX *cindex; + WT_CURSOR_JOIN_ENDPOINT *end; WT_CURSOR_JOIN_ENTRY *entry; WT_DECL_RET; - WT_CURSOR_JOIN_ENDPOINT *end, *newend; bool hasins, needbloom, range_eq; - u_int i, ins, nonbloom; + char *main_uri, *newformat; const char *raw_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), "raw", NULL }; - char *main_uri; - size_t namesize, newsize; + size_t len, newsize; + u_int i, ins, nonbloom; entry = NULL; hasins = needbloom = false; ins = 0; /* -Wuninitialized */ main_uri = NULL; nonbloom = 0; /* -Wuninitialized */ - namesize = strlen(cjoin->table->name); for (i = 0; i < cjoin->entries_next; i++) { if (cjoin->entries[i].index == idx) { @@ -982,13 +1067,13 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, ((range & WT_CURJOIN_END_GT) != 0 || range_eq)) || (F_ISSET(end, WT_CURJOIN_END_LT) && ((range & WT_CURJOIN_END_LT) != 0 || range_eq)) || - (end->flags == WT_CURJOIN_END_EQ && + (WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ && (range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT)) != 0)) WT_ERR_MSG(session, EINVAL, "join has overlapping ranges"); if (range == WT_CURJOIN_END_EQ && - end->flags == WT_CURJOIN_END_EQ && + WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ && !F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)) WT_ERR_MSG(session, EINVAL, "compare=eq can only be combined " @@ -1013,31 +1098,70 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, entry->bloom_hash_count = WT_MAX(entry->bloom_hash_count, bloom_hash_count); } - WT_ERR(__wt_realloc_def(session, &entry->ends_allocated, - entry->ends_next + 1, &entry->ends)); - if (!hasins) - ins = entry->ends_next; - newend = &entry->ends[ins]; - memmove(newend + 1, newend, - (entry->ends_next - ins) * sizeof(WT_CURSOR_JOIN_ENDPOINT)); - memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT)); - entry->ends_next++; - newend->cursor = ref_cursor; - F_SET(newend, range); + WT_ERR(__curjoin_insert_endpoint(session, entry, + hasins ? ins : entry->ends_next, &end)); + end->cursor = ref_cursor; + F_SET(end, range); /* Open the main file with a projection of the indexed columns. */ - if (entry->main == NULL && entry->index != NULL) { - namesize = strlen(cjoin->table->name); - newsize = namesize + entry->index->colconf.len + 1; + if (entry->main == NULL && idx != NULL) { + newsize = strlen(cjoin->table->name) + idx->colconf.len + 1; WT_ERR(__wt_calloc(session, 1, newsize, &main_uri)); snprintf(main_uri, newsize, "%s%.*s", - cjoin->table->name, (int)entry->index->colconf.len, - entry->index->colconf.str); + cjoin->table->name, (int)idx->colconf.len, + idx->colconf.str); WT_ERR(__wt_open_cursor(session, main_uri, (WT_CURSOR *)cjoin, raw_cfg, &entry->main)); + if (idx->extractor == NULL) { + /* + * Add no-op padding so trailing 'u' formats are not + * transformed to 'U'. This matches what happens in + * the index. We don't do this when we have an + * extractor, extractors already use the padding + * byte trick. + */ + len = strlen(entry->main->value_format) + 3; + WT_ERR(__wt_calloc(session, len, 1, &newformat)); + snprintf(newformat, len, "%s0x", + entry->main->value_format); + __wt_free(session, entry->main->value_format); + entry->main->value_format = newformat; + } + + /* + * When we are repacking index keys to remove the primary + * key, we never want to transform trailing 'u'. Use no-op + * padding to force this. + */ + cindex = (WT_CURSOR_INDEX *)ref_cursor; + len = strlen(cindex->iface.key_format) + 3; + WT_ERR(__wt_calloc(session, len, 1, &entry->repack_format)); + snprintf(entry->repack_format, len, "%s0x", + cindex->iface.key_format); } -err: if (main_uri != NULL) - __wt_free(session, main_uri); +err: __wt_free(session, main_uri); return (ret); } + +/* + * __curjoin_insert_endpoint -- + * Insert a new entry into the endpoint array for the join entry. + */ +static int +__curjoin_insert_endpoint(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, + u_int pos, WT_CURSOR_JOIN_ENDPOINT **newendp) +{ + WT_CURSOR_JOIN_ENDPOINT *newend; + + WT_RET(__wt_realloc_def(session, &entry->ends_allocated, + entry->ends_next + 1, &entry->ends)); + newend = &entry->ends[pos]; + memmove(newend + 1, newend, + (entry->ends_next - pos) * sizeof(WT_CURSOR_JOIN_ENDPOINT)); + memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT)); + entry->ends_next++; + *newendp = newend; + + return (0); +} |