summaryrefslogtreecommitdiff
path: root/src/cursor/cur_join.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cursor/cur_join.c')
-rw-r--r--src/cursor/cur_join.c538
1 files changed, 331 insertions, 207 deletions
diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c
index 2cbefa68c5e..38a83217933 100644
--- a/src/cursor/cur_join.c
+++ b/src/cursor/cur_join.c
@@ -8,6 +8,9 @@
#include "wt_internal.h"
+static int __curjoin_insert_endpoint(WT_SESSION_IMPL *,
+ WT_CURSOR_JOIN_ENTRY *, u_int, WT_CURSOR_JOIN_ENDPOINT **);
+
/*
* __curjoin_entry_iter_init --
* Initialize an iteration for the index managed by a join entry.
@@ -17,49 +20,56 @@ static int
__curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ITER **iterp)
{
- WT_CURSOR *newcur;
WT_CURSOR *to_dup;
WT_DECL_RET;
const char *raw_cfg[] = { WT_CONFIG_BASE(
session, WT_SESSION_open_cursor), "raw", NULL };
const char *def_cfg[] = { WT_CONFIG_BASE(
session, WT_SESSION_open_cursor), NULL };
- const char *uri, **config;
- char *uribuf;
+ const char *urimain, **config;
+ char *mainbuf, *uri;
WT_CURSOR_JOIN_ITER *iter;
size_t size;
iter = NULL;
- uribuf = NULL;
+ mainbuf = uri = NULL;
to_dup = entry->ends[0].cursor;
- uri = to_dup->uri;
if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW))
config = &raw_cfg[0];
else
config = &def_cfg[0];
+ size = strlen(to_dup->internal_uri) + 3;
+ WT_ERR(__wt_calloc(session, size, 1, &uri));
+ snprintf(uri, size, "%s()", to_dup->internal_uri);
+ urimain = cjoin->table->name;
if (cjoin->projection != NULL) {
- size = strlen(uri) + strlen(cjoin->projection) + 1;
- WT_ERR(__wt_calloc(session, size, 1, &uribuf));
- snprintf(uribuf, size, "%s%s", uri, cjoin->projection);
- uri = uribuf;
+ size = strlen(urimain) + strlen(cjoin->projection) + 1;
+ WT_ERR(__wt_calloc(session, size, 1, &mainbuf));
+ snprintf(mainbuf, size, "%s%s", urimain, cjoin->projection);
+ urimain = mainbuf;
}
- WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, config,
- &newcur));
- WT_ERR(__wt_cursor_dup_position(to_dup, newcur));
+
WT_ERR(__wt_calloc_one(session, &iter));
+ WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, config,
+ &iter->cursor));
+ WT_ERR(__wt_cursor_dup_position(to_dup, iter->cursor));
+ WT_ERR(__wt_open_cursor(session, urimain, (WT_CURSOR *)cjoin, config,
+ &iter->main));
iter->cjoin = cjoin;
iter->session = session;
iter->entry = entry;
- iter->cursor = newcur;
- iter->advance = false;
+ iter->positioned = false;
+ iter->isequal = (entry->ends_next == 1 &&
+ WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_EQ);
*iterp = iter;
if (0) {
err: __wt_free(session, iter);
}
- __wt_free(session, uribuf);
+ __wt_free(session, mainbuf);
+ __wt_free(session, uri);
return (ret);
}
@@ -72,18 +82,70 @@ static int
__curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf,
size_t bufsize, WT_ITEM *item)
{
- WT_DECL_RET;
WT_SESSION *wtsession;
size_t sz;
wtsession = (WT_SESSION *)session;
- WT_ERR(wiredtiger_struct_size(wtsession, &sz, "r", r));
+ WT_RET(wiredtiger_struct_size(wtsession, &sz, "r", r));
WT_ASSERT(session, sz < bufsize);
- WT_ERR(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r));
+ WT_RET(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r));
item->size = sz;
item->data = buf;
+ return (0);
+}
+
+/*
+ * __curjoin_split_key --
+ * Copy the primary key from a cursor (either main table or index)
+ * to another cursor. When copying from an index file, the index
+ * key is also returned.
+ *
+ */
+static int
+__curjoin_split_key(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_ITEM *idxkey, WT_CURSOR *tocur, WT_CURSOR *fromcur,
+ const char *repack_fmt, bool isindex)
+{
+ WT_CURSOR *firstcg_cur;
+ WT_CURSOR_INDEX *cindex;
+ WT_ITEM *keyp;
+ const uint8_t *p;
-err: return (ret);
+ if (isindex) {
+ cindex = ((WT_CURSOR_INDEX *)fromcur);
+ /*
+ * Repack tells us where the index key ends; advance past
+ * that to get where the raw primary key starts.
+ */
+ WT_RET(__wt_struct_repack(session, cindex->child->key_format,
+ repack_fmt != NULL ? repack_fmt : cindex->iface.key_format,
+ &cindex->child->key, idxkey));
+ WT_ASSERT(session, cindex->child->key.size > idxkey->size);
+ tocur->key.data = (uint8_t *)idxkey->data + idxkey->size;
+ tocur->key.size = cindex->child->key.size - idxkey->size;
+ if (WT_CURSOR_RECNO(tocur)) {
+ p = (const uint8_t *)tocur->key.data;
+ WT_RET(__wt_vunpack_uint(&p, tocur->key.size,
+ &tocur->recno));
+ } else
+ tocur->recno = 0;
+ } else {
+ firstcg_cur = ((WT_CURSOR_TABLE *)fromcur)->cg_cursors[0];
+ keyp = &firstcg_cur->key;
+ if (WT_CURSOR_RECNO(tocur)) {
+ WT_ASSERT(session, keyp->size == sizeof(uint64_t));
+ tocur->recno = *(uint64_t *)keyp->data;
+ WT_RET(__curjoin_pack_recno(session, tocur->recno,
+ cjoin->recno_buf, sizeof(cjoin->recno_buf),
+ &tocur->key));
+ } else {
+ WT_ITEM_SET(tocur->key, *keyp);
+ tocur->recno = 0;
+ }
+ idxkey->data = NULL;
+ idxkey->size = 0;
+ }
+ return (0);
}
/*
@@ -92,45 +154,24 @@ err: return (ret);
*
*/
static int
-__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey,
- uint64_t *rp)
+__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor)
{
- WT_CURSOR *firstcg_cur;
- WT_CURSOR_JOIN *cjoin;
- WT_DECL_RET;
- WT_SESSION_IMPL *session;
- uint64_t r;
-
- if (iter->advance)
- WT_ERR(iter->cursor->next(iter->cursor));
+ if (iter->positioned)
+ WT_RET(iter->cursor->next(iter->cursor));
else
- iter->advance = true;
-
- session = iter->session;
- cjoin = iter->cjoin;
+ iter->positioned = true;
/*
* Set our key to the primary key, we'll also need this
* to check membership.
*/
- if (iter->entry->index != NULL)
- firstcg_cur = ((WT_CURSOR_INDEX *)iter->cursor)->cg_cursors[0];
- else
- firstcg_cur = ((WT_CURSOR_TABLE *)iter->cursor)->cg_cursors[0];
- if (WT_CURSOR_RECNO(&cjoin->iface)) {
- r = *(uint64_t *)firstcg_cur->key.data;
- WT_ERR(__curjoin_pack_recno(session, r, cjoin->recno_buf,
- sizeof(cjoin->recno_buf), primkey));
- *rp = r;
- } else {
- WT_ITEM_SET(*primkey, firstcg_cur->key);
- *rp = 0;
- }
- iter->curkey = primkey;
+ WT_RET(__curjoin_split_key(iter->session, iter->cjoin, &iter->idxkey,
+ cursor, iter->cursor, iter->entry->repack_format,
+ iter->entry->index != NULL));
+ iter->curkey = &cursor->key;
iter->entry->stats.actual_count++;
iter->entry->stats.accesses++;
-
-err: return (ret);
+ return (0);
}
/*
@@ -141,17 +182,15 @@ err: return (ret);
static int
__curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter)
{
- WT_DECL_RET;
-
- if (iter->advance) {
- WT_ERR(iter->cursor->reset(iter->cursor));
- WT_ERR(__wt_cursor_dup_position(
+ if (iter->positioned) {
+ WT_RET(iter->cursor->reset(iter->cursor));
+ WT_RET(iter->main->reset(iter->main));
+ WT_RET(__wt_cursor_dup_position(
iter->cjoin->entries[0].ends[0].cursor, iter->cursor));
- iter->advance = false;
+ iter->positioned = false;
iter->entry->stats.actual_count = 0;
}
-
-err: return (ret);
+ return (0);
}
/*
@@ -162,7 +201,7 @@ err: return (ret);
static bool
__curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter)
{
- return (iter->advance);
+ return (iter->positioned);
}
/*
@@ -177,6 +216,8 @@ __curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *iter)
if (iter->cursor != NULL)
WT_TRET(iter->cursor->close(iter->cursor));
+ if (iter->main != NULL)
+ WT_TRET(iter->main->close(iter->main));
__wt_free(iter->session, iter);
return (ret);
@@ -232,10 +273,8 @@ __curjoin_get_value(WT_CURSOR *cursor, ...)
!__curjoin_entry_iter_ready(iter))
WT_ERR_MSG(session, EINVAL,
"join cursor must be advanced with next()");
- if (iter->entry->index != NULL)
- WT_ERR(__wt_curindex_get_valuev(iter->cursor, ap));
- else
- WT_ERR(__wt_curtable_get_valuev(iter->cursor, ap));
+
+ WT_ERR(__wt_curtable_get_valuev(iter->main, ap));
err: va_end(ap);
API_END_RET(session, ret);
@@ -251,43 +290,26 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
{
WT_COLLATOR *collator;
WT_CURSOR *c;
- WT_CURSOR_INDEX *cindex;
WT_CURSOR_JOIN_ENDPOINT *end, *endmax;
WT_DECL_RET;
WT_DECL_ITEM(uribuf);
- WT_ITEM curkey, curvalue, *k;
- WT_TABLE *maintable;
+ WT_ITEM curkey, curvalue;
const char *raw_cfg[] = { WT_CONFIG_BASE(
session, WT_SESSION_open_cursor), "raw", NULL };
- const char *mainkey_str, *p;
- void *allocbuf;
- size_t mainkey_len, size;
- u_int i;
+ const char *uri;
+ size_t size;
int cmp, skip;
c = NULL;
- allocbuf = NULL;
skip = 0;
- if (entry->index != NULL) {
+ if (entry->index != NULL)
/*
- * Open a cursor having a projection of the keys of the
- * index we're comparing against. Open it raw, we're
- * going to compare it to the raw keys of the
- * reference cursors.
+ * Open the raw index. We're avoiding any references
+ * to the main table, they may be expensive.
*/
- maintable = ((WT_CURSOR_TABLE *)entry->main)->table;
- mainkey_str = maintable->colconf.str + 1;
- for (p = mainkey_str, i = 0;
- p != NULL && i < maintable->nkey_columns; i++)
- p = strchr(p + 1, ',');
- WT_ASSERT(session, p != 0);
- mainkey_len = WT_PTRDIFF(p, mainkey_str);
- size = strlen(entry->index->name) + mainkey_len + 3;
- WT_ERR(__wt_scr_alloc(session, size, &uribuf));
- WT_ERR(__wt_buf_fmt(session, uribuf, "%s(%.*s)",
- entry->index->name, (int)mainkey_len, mainkey_str));
- } else {
+ uri = entry->index->source;
+ else {
/*
* For joins on the main table, we just need the primary
* key for comparison, we don't need any values.
@@ -296,35 +318,38 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
WT_ERR(__wt_scr_alloc(session, size, &uribuf));
WT_ERR(__wt_buf_fmt(session, uribuf, "%s()",
cjoin->table->name));
+ uri = uribuf->data;
}
- WT_ERR(__wt_open_cursor(
- session, uribuf->data, &cjoin->iface, raw_cfg, &c));
+ WT_ERR(__wt_open_cursor(session, uri, &cjoin->iface, raw_cfg, &c));
/* Initially position the cursor if necessary. */
endmax = &entry->ends[entry->ends_next];
- if ((end = &entry->ends[0]) < endmax &&
- F_ISSET(end, WT_CURJOIN_END_GE)) {
- WT_ERR(__wt_cursor_dup_position(end->cursor, c));
- if (end->flags == WT_CURJOIN_END_GE)
- skip = 1;
+ if ((end = &entry->ends[0]) < endmax) {
+ if (F_ISSET(end, WT_CURJOIN_END_GT) ||
+ WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ) {
+ WT_ERR(__wt_cursor_dup_position(end->cursor, c));
+ if (WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_GE)
+ skip = 1;
+ } else if (F_ISSET(end, WT_CURJOIN_END_LT)) {
+ if ((ret = c->next(c)) == WT_NOTFOUND)
+ goto done;
+ WT_ERR(ret);
+ } else
+ WT_ERR(__wt_illegal_value(session, NULL));
}
collator = (entry->index == NULL) ? NULL : entry->index->collator;
while (ret == 0) {
WT_ERR(c->get_key(c, &curkey));
if (entry->index != NULL) {
- cindex = (WT_CURSOR_INDEX *)c;
- if (cindex->index->extractor == NULL) {
- /*
- * Repack so it's comparable to the
- * reference endpoints.
- */
- k = &cindex->child->key;
- WT_ERR(__wt_struct_repack(session,
- cindex->child->key_format,
- entry->main->value_format, k, &curkey,
- &allocbuf));
- } else
- curkey = cindex->child->key;
+ /*
+ * Repack so it's comparable to the
+ * reference endpoints.
+ */
+ WT_ERR(__wt_struct_repack(session,
+ c->key_format,
+ (entry->repack_format != NULL ?
+ entry->repack_format : entry->index->idxkey_format),
+ &c->key, &curkey));
}
for (end = &entry->ends[skip]; end < endmax; end++) {
WT_ERR(__wt_compare(session, collator, &curkey,
@@ -345,8 +370,12 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
goto done;
}
}
- if (entry->index != NULL)
- WT_ERR(c->get_value(c, &curvalue));
+ if (entry->index != NULL) {
+ curvalue.data =
+ (unsigned char *)curkey.data + curkey.size;
+ WT_ASSERT(session, c->key.size > curkey.size);
+ curvalue.size = c->key.size - curkey.size;
+ }
else
WT_ERR(c->get_key(c, &curvalue));
WT_ERR(__wt_bloom_insert(bloom, &curvalue));
@@ -361,7 +390,6 @@ done:
err: if (c != NULL)
WT_TRET(c->close(c));
__wt_scr_free(session, &uribuf);
- __wt_free(session, allocbuf);
return (ret);
}
@@ -375,27 +403,23 @@ __curjoin_endpoint_init_key(WT_SESSION_IMPL *session,
{
WT_CURSOR *cursor;
WT_CURSOR_INDEX *cindex;
- WT_DECL_RET;
WT_ITEM *k;
uint64_t r;
- void *allocbuf;
- allocbuf = NULL;
if ((cursor = endpoint->cursor) != NULL) {
if (entry->index != NULL) {
/* Extract and save the index's logical key. */
cindex = (WT_CURSOR_INDEX *)endpoint->cursor;
- WT_ERR(__wt_struct_repack(session,
+ WT_RET(__wt_struct_repack(session,
cindex->child->key_format,
- cindex->iface.key_format,
- &cindex->child->key, &endpoint->key, &allocbuf));
- if (allocbuf != NULL)
- F_SET(endpoint, WT_CURJOIN_END_OWN_KEY);
+ (entry->repack_format != NULL ?
+ entry->repack_format : cindex->iface.key_format),
+ &cindex->child->key, &endpoint->key));
} else {
k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key;
if (WT_CURSOR_RECNO(cursor)) {
r = *(uint64_t *)k->data;
- WT_ERR(__curjoin_pack_recno(session, r,
+ WT_RET(__curjoin_pack_recno(session, r,
endpoint->recno_buf,
sizeof(endpoint->recno_buf),
&endpoint->key));
@@ -404,10 +428,7 @@ __curjoin_endpoint_init_key(WT_SESSION_IMPL *session,
endpoint->key = *k;
}
}
- if (0) {
-err: __wt_free(session, allocbuf);
- }
- return (ret);
+ return (0);
}
/*
@@ -419,8 +440,13 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin)
{
WT_BLOOM *bloom;
WT_DECL_RET;
+ WT_CURSOR *origcur;
WT_CURSOR_JOIN_ENTRY *je, *jeend, *je2;
WT_CURSOR_JOIN_ENDPOINT *end;
+ const char *def_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), NULL };
+ const char *raw_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), "raw", NULL };
uint32_t f, k;
if (cjoin->entries_next == 0)
@@ -429,9 +455,27 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin)
"cursors");
je = &cjoin->entries[0];
+ jeend = &cjoin->entries[cjoin->entries_next];
+
+ /*
+ * For a single compare=le endpoint in the first iterated entry,
+ * construct a companion compare=ge endpoint that will actually
+ * be iterated.
+ */
+ if (((je = cjoin->entries) != jeend) &&
+ je->ends_next == 1 && F_ISSET(&je->ends[0], WT_CURJOIN_END_LT)) {
+ origcur = je->ends[0].cursor;
+ WT_RET(__curjoin_insert_endpoint(session, je, 0, &end));
+ WT_RET(__wt_open_cursor(session, origcur->uri,
+ (WT_CURSOR *)cjoin,
+ F_ISSET(origcur, WT_CURSTD_RAW) ? raw_cfg : def_cfg,
+ &end->cursor));
+ WT_RET(end->cursor->next(end->cursor));
+ end->flags = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ |
+ WT_CURJOIN_END_OWN_CURSOR;
+ }
WT_RET(__curjoin_entry_iter_init(session, cjoin, je, &cjoin->iter));
- jeend = &cjoin->entries[cjoin->entries_next];
for (je = cjoin->entries; je < jeend; je++) {
__wt_stat_join_init_single(&je->stats);
for (end = &je->ends[0]; end < &je->ends[je->ends_next];
@@ -449,6 +493,10 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin)
F_SET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT);
if (F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) {
+ if (session->txn.isolation == WT_ISO_READ_UNCOMMITTED)
+ WT_RET_MSG(session, EINVAL,
+ "join cursors with Bloom filters cannot be "
+ "used with read-uncommitted isolation");
if (je->bloom == NULL) {
/*
* Look for compatible filters to be shared,
@@ -520,35 +568,34 @@ __curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry,
{
WT_COLLATOR *collator;
WT_CURSOR_JOIN_ENDPOINT *end, *endmax;
- WT_DECL_RET;
int cmp;
collator = (entry->index != NULL) ? entry->index->collator : NULL;
endmax = &entry->ends[entry->ends_next];
for (end = &entry->ends[skip_left ? 1 : 0]; end < endmax; end++) {
- WT_ERR(__wt_compare(session, collator, curkey, &end->key,
+ WT_RET(__wt_compare(session, collator, curkey, &end->key,
&cmp));
if (!F_ISSET(end, WT_CURJOIN_END_LT)) {
if (cmp < 0 ||
(cmp == 0 &&
!F_ISSET(end, WT_CURJOIN_END_EQ)) ||
(cmp > 0 && !F_ISSET(end, WT_CURJOIN_END_GT)))
- WT_ERR(WT_NOTFOUND);
+ WT_RET(WT_NOTFOUND);
} else {
if (cmp > 0 ||
(cmp == 0 &&
!F_ISSET(end, WT_CURJOIN_END_EQ)) ||
(cmp < 0 && !F_ISSET(end, WT_CURJOIN_END_LT)))
- WT_ERR(WT_NOTFOUND);
+ WT_RET(WT_NOTFOUND);
}
}
-err: return (ret);
+ return (0);
}
typedef struct {
WT_CURSOR iface;
WT_CURSOR_JOIN_ENTRY *entry;
- int ismember;
+ bool ismember;
} WT_CURJOIN_EXTRACTOR;
/*
@@ -584,8 +631,8 @@ __curjoin_extract_insert(WT_CURSOR *cursor) {
ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false);
if (ret == WT_NOTFOUND)
ret = 0;
- else
- cextract->ismember = 1;
+ else if (ret == 0)
+ cextract->ismember = true;
return (ret);
}
@@ -602,27 +649,29 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
WT_CURJOIN_EXTRACTOR extract_cursor;
WT_CURSOR *c;
WT_CURSOR_STATIC_INIT(iface,
- __wt_cursor_get_key, /* get-key */
- __wt_cursor_get_value, /* get-value */
- __wt_cursor_set_key, /* set-key */
- __wt_cursor_set_value, /* set-value */
- __wt_cursor_notsup, /* compare */
- __wt_cursor_notsup, /* equals */
- __wt_cursor_notsup, /* next */
- __wt_cursor_notsup, /* prev */
- __wt_cursor_notsup, /* reset */
- __wt_cursor_notsup, /* search */
- __wt_cursor_notsup, /* search-near */
- __curjoin_extract_insert, /* insert */
- __wt_cursor_notsup, /* update */
- __wt_cursor_notsup, /* reconfigure */
- __wt_cursor_notsup, /* remove */
- __wt_cursor_notsup); /* close */
+ __wt_cursor_get_key, /* get-key */
+ __wt_cursor_get_value, /* get-value */
+ __wt_cursor_set_key, /* set-key */
+ __wt_cursor_set_value, /* set-value */
+ __wt_cursor_compare_notsup, /* compare */
+ __wt_cursor_equals_notsup, /* equals */
+ __wt_cursor_notsup, /* next */
+ __wt_cursor_notsup, /* prev */
+ __wt_cursor_notsup, /* reset */
+ __wt_cursor_notsup, /* search */
+ __wt_cursor_search_near_notsup, /* search-near */
+ __curjoin_extract_insert, /* insert */
+ __wt_cursor_notsup, /* update */
+ __wt_cursor_notsup, /* remove */
+ __wt_cursor_reconfigure_notsup, /* reconfigure */
+ __wt_cursor_notsup); /* close */
WT_DECL_RET;
WT_INDEX *idx;
WT_ITEM *key, v;
bool bloom_found;
+ if (skip_left && entry->ends_next == 1)
+ return (0); /* no checks to make */
key = cjoin->iter->curkey;
entry->stats.accesses++;
bloom_found = false;
@@ -645,24 +694,35 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
bloom_found = true;
}
if (entry->index != NULL) {
- memset(&v, 0, sizeof(v)); /* Keep lint quiet. */
- c = entry->main;
- c->set_key(c, key);
- if ((ret = c->search(c)) == 0)
- ret = c->get_value(c, &v);
- else if (ret == WT_NOTFOUND)
- WT_ERR_MSG(session, WT_ERROR,
- "main table for join is missing entry.");
- WT_TRET(c->reset(c));
- WT_ERR(ret);
+ /*
+ * If this entry is used by the iterator, then we already
+ * have the index key, and we won't have to do any extraction
+ * either.
+ */
+ if (entry == cjoin->iter->entry)
+ WT_ITEM_SET(v, cjoin->iter->idxkey);
+ else {
+ memset(&v, 0, sizeof(v)); /* Keep lint quiet. */
+ c = entry->main;
+ c->set_key(c, key);
+ if ((ret = c->search(c)) == 0)
+ ret = c->get_value(c, &v);
+ else if (ret == WT_NOTFOUND)
+ WT_ERR_MSG(session, WT_ERROR,
+ "main table for join is missing entry");
+ WT_TRET(c->reset(c));
+ WT_ERR(ret);
+ }
} else
- v = *key;
+ WT_ITEM_SET(v, *key);
- if ((idx = entry->index) != NULL && idx->extractor != NULL) {
+ if ((idx = entry->index) != NULL && idx->extractor != NULL &&
+ entry != cjoin->iter->entry) {
+ WT_CLEAR(extract_cursor);
extract_cursor.iface = iface;
extract_cursor.iface.session = &session->iface;
extract_cursor.iface.key_format = idx->exkey_format;
- extract_cursor.ismember = 0;
+ extract_cursor.ismember = false;
extract_cursor.entry = entry;
WT_ERR(idx->extractor->extract(idx->extractor,
&session->iface, key, &v, &extract_cursor.iface));
@@ -685,7 +745,9 @@ err: if (ret == WT_NOTFOUND && bloom_found)
static int
__curjoin_next(WT_CURSOR *cursor)
{
+ WT_CURSOR *c;
WT_CURSOR_JOIN *cjoin;
+ WT_CURSOR_JOIN_ITER *iter;
WT_DECL_RET;
WT_SESSION_IMPL *session;
bool skip_left;
@@ -701,9 +763,11 @@ __curjoin_next(WT_CURSOR *cursor)
if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED))
WT_ERR(__curjoin_init_iter(session, cjoin));
+ F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
+ iter = cjoin->iter;
+
nextkey:
- if ((ret = __curjoin_entry_iter_next(cjoin->iter, &cursor->key,
- &cursor->recno)) == 0) {
+ if ((ret = __curjoin_entry_iter_next(iter, cursor)) == 0) {
F_SET(cursor, WT_CURSTD_KEY_EXT);
/*
@@ -715,11 +779,31 @@ nextkey:
for (i = 0; i < cjoin->entries_next; i++) {
ret = __curjoin_entry_member(session, cjoin,
&cjoin->entries[i], skip_left);
- if (ret == WT_NOTFOUND)
+ if (ret == WT_NOTFOUND) {
+ /*
+ * If this is compare=eq on our outer iterator,
+ * and we've moved past it, we're done.
+ */
+ if (iter->isequal && i == 0)
+ break;
goto nextkey;
+ }
skip_left = false;
WT_ERR(ret);
}
+ } else if (ret != WT_NOTFOUND)
+ WT_ERR(ret);
+
+ if (ret == 0) {
+ /*
+ * Position the 'main' cursor, this will be used to
+ * retrieve values from the cursor join.
+ */
+ c = iter->main;
+ c->set_key(c, iter->curkey);
+ if ((ret = c->search(c)) != 0)
+ WT_ERR(c->search(c));
+ F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
}
if (0) {
@@ -785,10 +869,11 @@ __curjoin_close(WT_CURSOR *cursor)
for (end = &entry->ends[0];
end < &entry->ends[entry->ends_next]; end++) {
F_CLR(end->cursor, WT_CURSTD_JOINED);
- if (F_ISSET(end, WT_CURJOIN_END_OWN_KEY))
- __wt_free(session, end->key.data);
+ if (F_ISSET(end, WT_CURJOIN_END_OWN_CURSOR))
+ WT_TRET(end->cursor->close(end->cursor));
}
__wt_free(session, entry->ends);
+ __wt_free(session, entry->repack_format);
}
if (cjoin->iter != NULL)
@@ -810,22 +895,22 @@ __wt_curjoin_open(WT_SESSION_IMPL *session,
const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp)
{
WT_CURSOR_STATIC_INIT(iface,
- __curjoin_get_key, /* get-key */
- __curjoin_get_value, /* get-value */
- __wt_cursor_notsup, /* set-key */
- __wt_cursor_notsup, /* set-value */
- __wt_cursor_notsup, /* compare */
- __wt_cursor_notsup, /* equals */
- __curjoin_next, /* next */
- __wt_cursor_notsup, /* prev */
- __curjoin_reset, /* reset */
- __wt_cursor_notsup, /* search */
- __wt_cursor_notsup, /* search-near */
- __wt_cursor_notsup, /* insert */
- __wt_cursor_notsup, /* update */
- __wt_cursor_notsup, /* remove */
- __wt_cursor_notsup, /* reconfigure */
- __curjoin_close); /* close */
+ __curjoin_get_key, /* get-key */
+ __curjoin_get_value, /* get-value */
+ __wt_cursor_set_key_notsup, /* set-key */
+ __wt_cursor_set_value_notsup, /* set-value */
+ __wt_cursor_compare_notsup, /* compare */
+ __wt_cursor_equals_notsup, /* equals */
+ __curjoin_next, /* next */
+ __wt_cursor_notsup, /* prev */
+ __curjoin_reset, /* reset */
+ __wt_cursor_notsup, /* search */
+ __wt_cursor_search_near_notsup, /* search-near */
+ __wt_cursor_notsup, /* insert */
+ __wt_cursor_notsup, /* update */
+ __wt_cursor_notsup, /* remove */
+ __wt_cursor_reconfigure_notsup, /* reconfigure */
+ __curjoin_close); /* close */
WT_CURSOR *cursor;
WT_CURSOR_JOIN *cjoin;
WT_DECL_ITEM(tmp);
@@ -891,22 +976,22 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
WT_INDEX *idx, WT_CURSOR *ref_cursor, uint8_t flags, uint8_t range,
uint64_t count, uint32_t bloom_bit_count, uint32_t bloom_hash_count)
{
+ WT_CURSOR_INDEX *cindex;
+ WT_CURSOR_JOIN_ENDPOINT *end;
WT_CURSOR_JOIN_ENTRY *entry;
WT_DECL_RET;
- WT_CURSOR_JOIN_ENDPOINT *end, *newend;
bool hasins, needbloom, range_eq;
- u_int i, ins, nonbloom;
+ char *main_uri, *newformat;
const char *raw_cfg[] = { WT_CONFIG_BASE(
session, WT_SESSION_open_cursor), "raw", NULL };
- char *main_uri;
- size_t namesize, newsize;
+ size_t len, newsize;
+ u_int i, ins, nonbloom;
entry = NULL;
hasins = needbloom = false;
ins = 0; /* -Wuninitialized */
main_uri = NULL;
nonbloom = 0; /* -Wuninitialized */
- namesize = strlen(cjoin->table->name);
for (i = 0; i < cjoin->entries_next; i++) {
if (cjoin->entries[i].index == idx) {
@@ -982,13 +1067,13 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
((range & WT_CURJOIN_END_GT) != 0 || range_eq)) ||
(F_ISSET(end, WT_CURJOIN_END_LT) &&
((range & WT_CURJOIN_END_LT) != 0 || range_eq)) ||
- (end->flags == WT_CURJOIN_END_EQ &&
+ (WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ &&
(range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT))
!= 0))
WT_ERR_MSG(session, EINVAL,
"join has overlapping ranges");
if (range == WT_CURJOIN_END_EQ &&
- end->flags == WT_CURJOIN_END_EQ &&
+ WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ &&
!F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION))
WT_ERR_MSG(session, EINVAL,
"compare=eq can only be combined "
@@ -1013,31 +1098,70 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
entry->bloom_hash_count =
WT_MAX(entry->bloom_hash_count, bloom_hash_count);
}
- WT_ERR(__wt_realloc_def(session, &entry->ends_allocated,
- entry->ends_next + 1, &entry->ends));
- if (!hasins)
- ins = entry->ends_next;
- newend = &entry->ends[ins];
- memmove(newend + 1, newend,
- (entry->ends_next - ins) * sizeof(WT_CURSOR_JOIN_ENDPOINT));
- memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT));
- entry->ends_next++;
- newend->cursor = ref_cursor;
- F_SET(newend, range);
+ WT_ERR(__curjoin_insert_endpoint(session, entry,
+ hasins ? ins : entry->ends_next, &end));
+ end->cursor = ref_cursor;
+ F_SET(end, range);
/* Open the main file with a projection of the indexed columns. */
- if (entry->main == NULL && entry->index != NULL) {
- namesize = strlen(cjoin->table->name);
- newsize = namesize + entry->index->colconf.len + 1;
+ if (entry->main == NULL && idx != NULL) {
+ newsize = strlen(cjoin->table->name) + idx->colconf.len + 1;
WT_ERR(__wt_calloc(session, 1, newsize, &main_uri));
snprintf(main_uri, newsize, "%s%.*s",
- cjoin->table->name, (int)entry->index->colconf.len,
- entry->index->colconf.str);
+ cjoin->table->name, (int)idx->colconf.len,
+ idx->colconf.str);
WT_ERR(__wt_open_cursor(session, main_uri,
(WT_CURSOR *)cjoin, raw_cfg, &entry->main));
+ if (idx->extractor == NULL) {
+ /*
+ * Add no-op padding so trailing 'u' formats are not
+ * transformed to 'U'. This matches what happens in
+ * the index. We don't do this when we have an
+ * extractor, extractors already use the padding
+ * byte trick.
+ */
+ len = strlen(entry->main->value_format) + 3;
+ WT_ERR(__wt_calloc(session, len, 1, &newformat));
+ snprintf(newformat, len, "%s0x",
+ entry->main->value_format);
+ __wt_free(session, entry->main->value_format);
+ entry->main->value_format = newformat;
+ }
+
+ /*
+ * When we are repacking index keys to remove the primary
+ * key, we never want to transform trailing 'u'. Use no-op
+ * padding to force this.
+ */
+ cindex = (WT_CURSOR_INDEX *)ref_cursor;
+ len = strlen(cindex->iface.key_format) + 3;
+ WT_ERR(__wt_calloc(session, len, 1, &entry->repack_format));
+ snprintf(entry->repack_format, len, "%s0x",
+ cindex->iface.key_format);
}
-err: if (main_uri != NULL)
- __wt_free(session, main_uri);
+err: __wt_free(session, main_uri);
return (ret);
}
+
+/*
+ * __curjoin_insert_endpoint --
+ * Insert a new entry into the endpoint array for the join entry.
+ */
+static int
+__curjoin_insert_endpoint(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry,
+ u_int pos, WT_CURSOR_JOIN_ENDPOINT **newendp)
+{
+ WT_CURSOR_JOIN_ENDPOINT *newend;
+
+ WT_RET(__wt_realloc_def(session, &entry->ends_allocated,
+ entry->ends_next + 1, &entry->ends));
+ newend = &entry->ends[pos];
+ memmove(newend + 1, newend,
+ (entry->ends_next - pos) * sizeof(WT_CURSOR_JOIN_ENDPOINT));
+ memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT));
+ entry->ends_next++;
+ *newendp = newend;
+
+ return (0);
+}