summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dist/api_data.py7
-rw-r--r--examples/c/ex_schema.c76
-rw-r--r--examples/java/com/wiredtiger/examples/ex_schema.java76
-rw-r--r--src/config/config_def.c7
-rw-r--r--src/cursor/cur_index.c14
-rw-r--r--src/cursor/cur_join.c817
-rw-r--r--src/docs/cursor-join.dox25
-rw-r--r--src/docs/spell.ok2
-rw-r--r--src/include/api.h2
-rw-r--r--src/include/cursor.h54
-rw-r--r--src/include/extern.h2
-rw-r--r--src/include/wiredtiger.in22
-rw-r--r--src/session/session_api.c40
-rw-r--r--test/suite/test_join01.py159
-rw-r--r--test/suite/test_join07.py548
15 files changed, 1528 insertions, 323 deletions
diff --git a/dist/api_data.py b/dist/api_data.py
index ab7be373099..44099ff6b1b 100644
--- a/dist/api_data.py
+++ b/dist/api_data.py
@@ -818,6 +818,13 @@ methods = {
Config('bloom_hash_count', '8', r'''
the number of hash values per item for the bloom filter''',
min='2', max='100'),
+ Config('operation', '"and"', r'''
+ the operation applied between this and other joined cursors.
+ When "operation=and" is specified, all the conditions implied by
+ joins must be satisfied for an entry to be returned by the join cursor;
+ when "operation=or" is specified, only one must be satisfied.
+ All cursors joined to a join cursor must have matching operations''',
+ choices=['and', 'or']),
Config('strategy', '', r'''
when set to bloom, a bloom filter is created and populated for
this index. This has an up front cost but may reduce the number
diff --git a/examples/c/ex_schema.c b/examples/c/ex_schema.c
index 70fc7eb2e62..155b982bbbe 100644
--- a/examples/c/ex_schema.c
+++ b/examples/c/ex_schema.c
@@ -69,7 +69,8 @@ main(void)
{
POP_RECORD *p;
WT_CONNECTION *conn;
- WT_CURSOR *cursor, *cursor2, *join_cursor, *stat_cursor;
+ WT_CURSOR *country_cursor, *country_cursor2, *cursor, *join_cursor,
+ *stat_cursor, *subjoin_cursor, *year_cursor;
WT_SESSION *session;
const char *country;
uint64_t recno, population;
@@ -336,18 +337,18 @@ main(void)
ret = session->open_cursor(session,
"join:table:poptable", NULL, NULL, &join_cursor);
ret = session->open_cursor(session,
- "index:poptable:country", NULL, NULL, &cursor);
+ "index:poptable:country", NULL, NULL, &country_cursor);
ret = session->open_cursor(session,
- "index:poptable:immutable_year", NULL, NULL, &cursor2);
+ "index:poptable:immutable_year", NULL, NULL, &year_cursor);
/* select values WHERE country == "AU" AND year > 1900 */
- cursor->set_key(cursor, "AU\0\0\0");
- ret = cursor->search(cursor);
- ret = session->join(session, join_cursor, cursor,
+ country_cursor->set_key(country_cursor, "AU\0\0\0");
+ ret = country_cursor->search(country_cursor);
+ ret = session->join(session, join_cursor, country_cursor,
"compare=eq,count=10");
- cursor2->set_key(cursor2, (uint16_t)1900);
- ret = cursor2->search(cursor2);
- ret = session->join(session, join_cursor, cursor2,
+ year_cursor->set_key(year_cursor, (uint16_t)1900);
+ ret = year_cursor->search(year_cursor);
+ ret = session->join(session, join_cursor, year_cursor,
"compare=gt,count=10,strategy=bloom");
/* List the values that are joined */
@@ -370,8 +371,61 @@ main(void)
ret = stat_cursor->close(stat_cursor);
ret = join_cursor->close(join_cursor);
- ret = cursor2->close(cursor2);
- ret = cursor->close(cursor);
+ ret = year_cursor->close(year_cursor);
+ ret = country_cursor->close(country_cursor);
+
+ /*! [Complex join cursors] */
+ /* Open cursors needed by the join. */
+ ret = session->open_cursor(session,
+ "join:table:poptable", NULL, NULL, &join_cursor);
+ ret = session->open_cursor(session,
+ "join:table:poptable", NULL, NULL, &subjoin_cursor);
+ ret = session->open_cursor(session,
+ "index:poptable:country", NULL, NULL, &country_cursor);
+ ret = session->open_cursor(session,
+ "index:poptable:country", NULL, NULL, &country_cursor2);
+ ret = session->open_cursor(session,
+ "index:poptable:immutable_year", NULL, NULL, &year_cursor);
+
+ /*
+ * select values WHERE (country == "AU" OR country == "UK")
+ * AND year > 1900
+ *
+ * First, set up the join representing the country clause.
+ */
+ country_cursor->set_key(country_cursor, "AU\0\0\0");
+ ret = country_cursor->search(country_cursor);
+ ret = session->join(session, subjoin_cursor, country_cursor,
+ "operation=or,compare=eq,count=10");
+ country_cursor2->set_key(country_cursor2, "UK\0\0\0");
+ ret = country_cursor2->search(country_cursor2);
+ ret = session->join(session, subjoin_cursor, country_cursor2,
+ "operation=or,compare=eq,count=10");
+
+ /* Join that to the top join, and add the year clause */
+ ret = session->join(session, join_cursor, subjoin_cursor, NULL);
+ year_cursor->set_key(year_cursor, (uint16_t)1900);
+ ret = year_cursor->search(year_cursor);
+ ret = session->join(session, join_cursor, year_cursor,
+ "compare=gt,count=10,strategy=bloom");
+
+ /* List the values that are joined */
+ while ((ret = join_cursor->next(join_cursor)) == 0) {
+ ret = join_cursor->get_key(join_cursor, &recno);
+ ret = join_cursor->get_value(join_cursor, &country, &year,
+ &population);
+ printf("ID %" PRIu64, recno);
+ printf(
+ ": country %s, year %" PRIu16 ", population %" PRIu64 "\n",
+ country, year, population);
+ }
+ /*! [Complex join cursors] */
+
+ ret = join_cursor->close(join_cursor);
+ ret = subjoin_cursor->close(subjoin_cursor);
+ ret = country_cursor->close(country_cursor);
+ ret = country_cursor2->close(country_cursor2);
+ ret = year_cursor->close(year_cursor);
ret = conn->close(conn, NULL);
diff --git a/examples/java/com/wiredtiger/examples/ex_schema.java b/examples/java/com/wiredtiger/examples/ex_schema.java
index 7cc26acb479..76bff66a688 100644
--- a/examples/java/com/wiredtiger/examples/ex_schema.java
+++ b/examples/java/com/wiredtiger/examples/ex_schema.java
@@ -76,7 +76,8 @@ public class ex_schema {
throws WiredTigerException
{
Connection conn;
- Cursor cursor, cursor2, join_cursor, stat_cursor;
+ Cursor country_cursor, country_cursor2, cursor, join_cursor,
+ stat_cursor, subjoin_cursor, year_cursor;
Session session;
String country;
long recno, population;
@@ -343,18 +344,18 @@ public class ex_schema {
/* Open cursors needed by the join. */
join_cursor = session.open_cursor(
"join:table:poptable", null, null);
- cursor = session.open_cursor(
+ country_cursor = session.open_cursor(
"index:poptable:country", null, null);
- cursor2 = session.open_cursor(
+ year_cursor = session.open_cursor(
"index:poptable:immutable_year", null, null);
/* select values WHERE country == "AU" AND year > 1900 */
- cursor.putKeyString("AU");
- ret = cursor.search();
- session.join(join_cursor, cursor, "compare=eq,count=10");
- cursor2.putKeyShort((short)1900);
- ret = cursor2.search();
- session.join(join_cursor, cursor2,
+ country_cursor.putKeyString("AU");
+ ret = country_cursor.search();
+ session.join(join_cursor, country_cursor, "compare=eq,count=10");
+ year_cursor.putKeyShort((short)1900);
+ ret = year_cursor.search();
+ session.join(join_cursor, year_cursor,
"compare=gt,count=10,strategy=bloom");
/* List the values that are joined */
@@ -376,8 +377,61 @@ public class ex_schema {
ret = stat_cursor.close();
ret = join_cursor.close();
- ret = cursor2.close();
- ret = cursor.close();
+ ret = year_cursor.close();
+ ret = country_cursor.close();
+
+ /*! [Complex join cursors] */
+ /* Open cursors needed by the join. */
+ join_cursor = session.open_cursor(
+ "join:table:poptable", null, null);
+ subjoin_cursor = session.open_cursor(
+ "join:table:poptable", null, null);
+ country_cursor = session.open_cursor(
+ "index:poptable:country", null, null);
+ country_cursor2 = session.open_cursor(
+ "index:poptable:country", null, null);
+ year_cursor = session.open_cursor(
+ "index:poptable:immutable_year", null, null);
+
+ /*
+ * select values WHERE (country == "AU" OR country == "UK")
+ * AND year > 1900
+ *
+ * First, set up the join representing the country clause.
+ */
+ country_cursor.putKeyString("AU");
+ ret = country_cursor.search();
+ ret = session.join(subjoin_cursor, country_cursor,
+ "operation=or,compare=eq,count=10");
+ country_cursor2.putKeyString("UK");
+ ret = country_cursor2.search();
+ ret = session.join(subjoin_cursor, country_cursor2,
+ "operation=or,compare=eq,count=10");
+
+ /* Join that to the top join, and add the year clause */
+ ret = session.join(join_cursor, subjoin_cursor, null);
+ year_cursor.putKeyShort((short)1900);
+ ret = year_cursor.search();
+ ret = session.join(join_cursor, year_cursor,
+ "compare=gt,count=10,strategy=bloom");
+
+ /* List the values that are joined */
+ while ((ret = join_cursor.next()) == 0) {
+ recno = join_cursor.getKeyRecord();
+ country = join_cursor.getValueString();
+ year = join_cursor.getValueShort();
+ population = join_cursor.getValueLong();
+ System.out.print("ID " + recno);
+ System.out.println( ": country " + country + ", year " + year +
+ ", population " + population);
+ }
+ /*! [Complex join cursors] */
+
+ ret = join_cursor.close();
+ ret = subjoin_cursor.close();
+ ret = year_cursor.close();
+ ret = country_cursor.close();
+ ret = country_cursor2.close();
ret = conn.close(null);
diff --git a/src/config/config_def.c b/src/config/config_def.c
index 5b6f0bac323..3c0940bfc4c 100644
--- a/src/config/config_def.c
+++ b/src/config/config_def.c
@@ -304,6 +304,9 @@ static const WT_CONFIG_CHECK confchk_WT_SESSION_join[] = {
NULL, "choices=[\"eq\",\"ge\",\"gt\",\"le\",\"lt\"]",
NULL, 0 },
{ "count", "int", NULL, NULL, NULL, 0 },
+ { "operation", "string",
+ NULL, "choices=[\"and\",\"or\"]",
+ NULL, 0 },
{ "strategy", "string",
NULL, "choices=[\"bloom\",\"default\"]",
NULL, 0 },
@@ -1031,8 +1034,8 @@ static const WT_CONFIG_ENTRY config_entries[] = {
},
{ "WT_SESSION.join",
"bloom_bit_count=16,bloom_hash_count=8,compare=\"eq\",count=,"
- "strategy=",
- confchk_WT_SESSION_join, 5
+ "operation=\"and\",strategy=",
+ confchk_WT_SESSION_join, 6
},
{ "WT_SESSION.log_flush",
"sync=on",
diff --git a/src/cursor/cur_index.c b/src/cursor/cur_index.c
index dbe8046ca21..5ec1728c8ab 100644
--- a/src/cursor/cur_index.c
+++ b/src/cursor/cur_index.c
@@ -8,20 +8,6 @@
#include "wt_internal.h"
- /*
- * __wt_curindex_joined --
- * Produce an error that this cursor is being used in a join call.
- */
-int
-__wt_curindex_joined(WT_CURSOR *cursor)
-{
- WT_SESSION_IMPL *session;
-
- session = (WT_SESSION_IMPL *)cursor->session;
- __wt_errx(session, "index cursor is being used in a join");
- return (ENOTSUP);
-}
-
/*
* __curindex_get_value --
* WT_CURSOR->get_value implementation for index cursors.
diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c
index 1ee26716212..60d7825249c 100644
--- a/src/cursor/cur_join.c
+++ b/src/cursor/cur_join.c
@@ -8,8 +8,35 @@
#include "wt_internal.h"
+static int __curjoin_entries_in_range(WT_SESSION_IMPL *, WT_CURSOR_JOIN *,
+ WT_ITEM *, WT_CURSOR_JOIN_ITER *);
+static int __curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *);
+static int __curjoin_entry_iter_close_all(WT_CURSOR_JOIN_ITER *);
+static bool __curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *);
+static int __curjoin_entry_in_range(WT_SESSION_IMPL *, WT_CURSOR_JOIN_ENTRY *,
+ WT_ITEM *, WT_CURSOR_JOIN_ITER *);
+static int __curjoin_entry_member(WT_SESSION_IMPL *, WT_CURSOR_JOIN_ENTRY *,
+ WT_ITEM *, WT_CURSOR_JOIN_ITER *);
static int __curjoin_insert_endpoint(WT_SESSION_IMPL *,
WT_CURSOR_JOIN_ENTRY *, u_int, WT_CURSOR_JOIN_ENDPOINT **);
+static int __curjoin_iter_set_entry(WT_CURSOR_JOIN_ITER *, u_int);
+
+#define WT_CURJOIN_ITER_CONSUMED(iter) \
+ ((iter)->entry_pos >= (iter)->entry_count)
+
+/*
+ * __wt_curjoin_joined --
+ * Produce an error that this cursor is being used in a join call.
+ */
+int
+__wt_curjoin_joined(WT_CURSOR *cursor)
+{
+ WT_SESSION_IMPL *session;
+
+ session = (WT_SESSION_IMPL *)cursor->session;
+ __wt_errx(session, "cursor is being used in a join");
+ return (ENOTSUP);
+}
/*
* __curjoin_entry_iter_init --
@@ -18,58 +45,92 @@ static int __curjoin_insert_endpoint(WT_SESSION_IMPL *,
*/
static int
__curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
- WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ITER **iterp)
+ WT_CURSOR_JOIN_ITER **iterp)
{
- WT_CURSOR *to_dup;
+ WT_CURSOR_JOIN_ITER *iter;
+
+ *iterp = NULL;
+ WT_RET(__wt_calloc_one(session, iterp));
+ iter = *iterp;
+ iter->cjoin = cjoin;
+ iter->session = session;
+ cjoin->iter = iter;
+ WT_RET(__curjoin_iter_set_entry(iter, 0));
+ return (0);
+}
+
+/*
+ * __curjoin_iter_set_entry --
+ * Set the current entry for an iterator.
+ *
+ */
+static int
+__curjoin_iter_set_entry(WT_CURSOR_JOIN_ITER *iter, u_int entry_pos)
+{
+ WT_CURSOR *c, *to_dup;
+ WT_CURSOR_JOIN *cjoin, *topjoin;
+ WT_CURSOR_JOIN_ENTRY *entry;
WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ char *uri;
const char *raw_cfg[] = { WT_CONFIG_BASE(
- session, WT_SESSION_open_cursor), "raw", NULL };
+ iter->session, WT_SESSION_open_cursor), "raw", NULL };
const char *def_cfg[] = { WT_CONFIG_BASE(
- session, WT_SESSION_open_cursor), NULL };
- const char *urimain, **config;
- char *mainbuf, *uri;
- WT_CURSOR_JOIN_ITER *iter;
+ iter->session, WT_SESSION_open_cursor), NULL };
+ const char **config;
size_t size;
- iter = NULL;
- mainbuf = uri = NULL;
- to_dup = entry->ends[0].cursor;
+ session = iter->session;
+ cjoin = iter->cjoin;
+ uri = NULL;
+ entry = iter->entry = &cjoin->entries[entry_pos];
+ iter->positioned = false;
+ iter->entry_pos = entry_pos;
+ iter->end_pos = 0;
- if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW))
- config = &raw_cfg[0];
- else
- config = &def_cfg[0];
+ iter->is_equal = (entry->ends_next == 1 &&
+ WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_EQ);
+ iter->end_skip = (entry->ends_next > 0 &&
+ WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_GE) ? 1 : 0;
+
+ iter->end_count = WT_MIN(1, entry->ends_next);
+ if (F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION)) {
+ iter->entry_count = cjoin->entries_next;
+ if (iter->is_equal)
+ iter->end_count = entry->ends_next;
+ } else
+ iter->entry_count = 1;
+ WT_ASSERT(iter->session, iter->entry_pos < iter->entry_count);
- size = strlen(to_dup->internal_uri) + 3;
- WT_ERR(__wt_calloc(session, size, 1, &uri));
- snprintf(uri, size, "%s()", to_dup->internal_uri);
- urimain = cjoin->table->name;
- if (cjoin->projection != NULL) {
- size = strlen(urimain) + strlen(cjoin->projection) + 1;
- WT_ERR(__wt_calloc(session, size, 1, &mainbuf));
- snprintf(mainbuf, size, "%s%s", urimain, cjoin->projection);
- urimain = mainbuf;
- }
+ entry->stats.actual_count = 0;
- WT_ERR(__wt_calloc_one(session, &iter));
- WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, config,
- &iter->cursor));
- WT_ERR(__wt_cursor_dup_position(to_dup, iter->cursor));
- WT_ERR(__wt_open_cursor(session, urimain, (WT_CURSOR *)cjoin, config,
- &iter->main));
- iter->cjoin = cjoin;
- iter->session = session;
- iter->entry = entry;
- iter->positioned = false;
- iter->isequal = (entry->ends_next == 1 &&
- WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_EQ);
- *iterp = iter;
+ if (entry->subjoin == NULL) {
+ for (topjoin = iter->cjoin; topjoin->parent != NULL;
+ topjoin = topjoin->parent)
+ ;
+ to_dup = entry->ends[0].cursor;
- if (0) {
-err: __wt_free(session, iter);
+ if (F_ISSET((WT_CURSOR *)topjoin, WT_CURSTD_RAW))
+ config = &raw_cfg[0];
+ else
+ config = &def_cfg[0];
+
+ size = strlen(to_dup->internal_uri) + 3;
+ WT_ERR(__wt_calloc(session, size, 1, &uri));
+ snprintf(uri, size, "%s()", to_dup->internal_uri);
+ if ((c = iter->cursor) == NULL || !WT_STREQ(c->uri, uri)) {
+ iter->cursor = NULL;
+ if (c != NULL)
+ WT_ERR(c->close(c));
+ WT_ERR(__wt_open_cursor(session, uri,
+ (WT_CURSOR *)topjoin, config, &iter->cursor));
+ }
+ WT_ERR(__wt_cursor_dup_position(to_dup, iter->cursor));
+ } else if (iter->cursor != NULL) {
+ iter->cursor->close(iter->cursor);
+ iter->cursor = NULL;
}
- __wt_free(session, mainbuf);
- __wt_free(session, uri);
+err: __wt_free(session, uri);
return (ret);
}
@@ -95,6 +156,45 @@ __curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf,
}
/*
+ * __curjoin_entry_iter_bump --
+ * Called to advance the iterator to the next endpoint,
+ * which may in turn advance to the next entry.
+ *
+ */
+static int
+__curjoin_entry_iter_bump(WT_CURSOR_JOIN_ITER *iter)
+{
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_SESSION_IMPL *session;
+
+ session = iter->session;
+ iter->positioned = false;
+ entry = iter->entry;
+ if (entry->subjoin == NULL && iter->is_equal &&
+ ++iter->end_pos < iter->end_count) {
+ WT_RET(__wt_cursor_dup_position(
+ entry->ends[iter->end_pos].cursor, iter->cursor));
+ return (0);
+ }
+ iter->end_pos = iter->end_count = iter->end_skip = 0;
+ if (entry->subjoin != NULL && entry->subjoin->iter != NULL)
+ WT_RET(__curjoin_entry_iter_close_all(entry->subjoin->iter));
+
+ if (++iter->entry_pos >= iter->entry_count) {
+ iter->entry = NULL;
+ return (0);
+ }
+ iter->entry = ++entry;
+ if (entry->subjoin != NULL) {
+ WT_RET(__curjoin_entry_iter_init(session, entry->subjoin,
+ &iter->child));
+ return (0);
+ }
+ WT_RET(__curjoin_iter_set_entry(iter, iter->entry_pos));
+ return (0);
+}
+
+/*
* __curjoin_split_key --
* Copy the primary key from a cursor (either main table or index)
* to another cursor. When copying from an index file, the index
@@ -156,11 +256,52 @@ __curjoin_split_key(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
static int
__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor)
{
- if (iter->positioned)
- WT_RET(iter->cursor->next(iter->cursor));
- else
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+
+ session = iter->session;
+
+ if (WT_CURJOIN_ITER_CONSUMED(iter))
+ return (WT_NOTFOUND);
+again:
+ entry = iter->entry;
+ if (entry->subjoin != NULL) {
+ if (iter->child == NULL)
+ WT_RET(__curjoin_entry_iter_init(session,
+ entry->subjoin, &iter->child));
+ ret = __curjoin_entry_iter_next(iter->child, cursor);
+ if (ret == 0) {
+ /* The child did the work, we're done. */
+ iter->curkey = &cursor->key;
+ iter->positioned = true;
+ return (ret);
+ }
+ else if (ret == WT_NOTFOUND) {
+ WT_RET(__curjoin_entry_iter_close_all(iter->child));
+ entry->subjoin->iter = NULL;
+ iter->child = NULL;
+ WT_RET(__curjoin_entry_iter_bump(iter));
+ ret = 0;
+ }
+ } else if (iter->positioned) {
+ ret = iter->cursor->next(iter->cursor);
+ if (ret == WT_NOTFOUND) {
+ WT_RET(__curjoin_entry_iter_bump(iter));
+ ret = 0;
+ } else
+ WT_RET(ret);
+ } else
iter->positioned = true;
+ if (WT_CURJOIN_ITER_CONSUMED(iter))
+ return (WT_NOTFOUND);
+
+ if (!__curjoin_entry_iter_ready(iter))
+ goto again;
+
+ WT_RET(ret);
+
/*
* Set our key to the primary key, we'll also need this
* to check membership.
@@ -182,26 +323,27 @@ __curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_CURSOR *cursor)
static int
__curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter)
{
- if (iter->positioned) {
- WT_RET(iter->cursor->reset(iter->cursor));
- WT_RET(iter->main->reset(iter->main));
- WT_RET(__wt_cursor_dup_position(
- iter->cjoin->entries[0].ends[0].cursor, iter->cursor));
- iter->positioned = false;
- iter->entry->stats.actual_count = 0;
- }
+ if (iter->child != NULL)
+ WT_RET(__curjoin_entry_iter_close_all(iter->child));
+ WT_RET(__curjoin_iter_set_entry(iter, 0));
+ iter->positioned = false;
return (0);
}
/*
* __curjoin_entry_iter_ready --
- * The iterator is positioned.
+ * Check the positioned flag for all nested iterators.
*
*/
static bool
__curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter)
{
- return (iter->positioned);
+ while (iter != NULL) {
+ if (!iter->positioned)
+ return (false);
+ iter = iter->child;
+ }
+ return (true);
}
/*
@@ -216,10 +358,30 @@ __curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *iter)
if (iter->cursor != NULL)
WT_TRET(iter->cursor->close(iter->cursor));
- if (iter->main != NULL)
- WT_TRET(iter->main->close(iter->main));
__wt_free(iter->session, iter);
+ return (ret);
+}
+
+/*
+ * __curjoin_entry_iter_close_all --
+ * Free the iterator and all of its children recursively.
+ *
+ */
+static int
+__curjoin_entry_iter_close_all(WT_CURSOR_JOIN_ITER *iter)
+{
+ WT_CURSOR_JOIN *parent;
+ WT_DECL_RET;
+ if (iter->child)
+ WT_TRET(__curjoin_entry_iter_close_all(iter->child));
+ iter->child = NULL;
+ WT_ASSERT(iter->session, iter->cjoin->parent == NULL ||
+ iter->cjoin->parent->iter->child == iter);
+ if ((parent = iter->cjoin->parent) != NULL)
+ parent->iter->child = NULL;
+ iter->cjoin->iter = NULL;
+ WT_TRET(__curjoin_entry_iter_close(iter));
return (ret);
}
@@ -238,10 +400,10 @@ __curjoin_get_key(WT_CURSOR *cursor, ...)
cjoin = (WT_CURSOR_JOIN *)cursor;
va_start(ap, cursor);
- CURSOR_API_CALL(cursor, session, get_key, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, get_key, NULL);
if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) ||
- !__curjoin_entry_iter_ready(cjoin->iter))
+ !cjoin->iter->positioned)
WT_ERR_MSG(session, EINVAL,
"join cursor must be advanced with next()");
WT_ERR(__wt_cursor_get_keyv(cursor, cursor->flags, ap));
@@ -258,23 +420,21 @@ static int
__curjoin_get_value(WT_CURSOR *cursor, ...)
{
WT_CURSOR_JOIN *cjoin;
- WT_CURSOR_JOIN_ITER *iter;
WT_DECL_RET;
WT_SESSION_IMPL *session;
va_list ap;
cjoin = (WT_CURSOR_JOIN *)cursor;
- iter = cjoin->iter;
va_start(ap, cursor);
- CURSOR_API_CALL(cursor, session, get_value, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL);
if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) ||
- !__curjoin_entry_iter_ready(iter))
+ !cjoin->iter->positioned)
WT_ERR_MSG(session, EINVAL,
"join cursor must be advanced with next()");
- WT_ERR(__wt_curtable_get_valuev(iter->main, ap));
+ WT_ERR(__wt_curtable_get_valuev(cjoin->main, ap));
err: va_end(ap);
API_END_RET(session, ret);
@@ -298,7 +458,8 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
session, WT_SESSION_open_cursor), "raw", NULL };
const char *uri;
size_t size;
- int cmp, skip;
+ u_int skip;
+ int cmp;
c = NULL;
skip = 0;
@@ -354,7 +515,34 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
for (end = &entry->ends[skip]; end < endmax; end++) {
WT_ERR(__wt_compare(session, collator, &curkey,
&end->key, &cmp));
- if (!F_ISSET(end, WT_CURJOIN_END_LT)) {
+ if (F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)) {
+ /* if condition satisfied, insert immediately */
+ switch (WT_CURJOIN_END_RANGE(end)) {
+ case WT_CURJOIN_END_EQ:
+ if (cmp == 0)
+ goto insert;
+ break;
+ case WT_CURJOIN_END_GT:
+ if (cmp > 0) {
+ /* skip this check next time */
+ skip = entry->ends_next;
+ goto insert;
+ }
+ break;
+ case WT_CURJOIN_END_GE:
+ if (cmp >= 0)
+ goto insert;
+ break;
+ case WT_CURJOIN_END_LT:
+ if (cmp < 0)
+ goto insert;
+ break;
+ case WT_CURJOIN_END_LE:
+ if (cmp <= 0)
+ goto insert;
+ break;
+ }
+ } else if (!F_ISSET(end, WT_CURJOIN_END_LT)) {
if (cmp < 0 || (cmp == 0 &&
!F_ISSET(end, WT_CURJOIN_END_EQ)))
goto advance;
@@ -370,6 +558,14 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
goto done;
}
}
+ /*
+ * Either it's a disjunction that hasn't satisfied any
+ * condition, or it's a conjunction that has satisfied all
+ * conditions.
+ */
+ if (F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION))
+ goto advance;
+insert:
if (entry->index != NULL) {
curvalue.data =
(unsigned char *)curkey.data + curkey.size;
@@ -432,68 +628,85 @@ __curjoin_endpoint_init_key(WT_SESSION_IMPL *session,
}
/*
- * __curjoin_init_iter --
- * Initialize before any iteration.
+ * __curjoin_init_next --
+ * Initialize the cursor join when the next function is first called.
*/
static int
-__curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin)
+__curjoin_init_next(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ bool iterable)
{
WT_BLOOM *bloom;
WT_DECL_RET;
WT_CURSOR *origcur;
WT_CURSOR_JOIN_ENTRY *je, *jeend, *je2;
WT_CURSOR_JOIN_ENDPOINT *end;
+ char *mainbuf;
const char *def_cfg[] = { WT_CONFIG_BASE(
session, WT_SESSION_open_cursor), NULL };
const char *raw_cfg[] = { WT_CONFIG_BASE(
session, WT_SESSION_open_cursor), "raw", NULL };
- uint32_t f, k;
+ const char **config, *proj, *urimain;
+ uint32_t f, k, size;
+ mainbuf = NULL;
if (cjoin->entries_next == 0)
WT_RET_MSG(session, EINVAL,
"join cursor has not yet been joined with any other "
"cursors");
- jeend = &cjoin->entries[cjoin->entries_next];
-
- /*
- * For a single compare=le endpoint in the first iterated entry,
- * construct a companion compare=ge endpoint that will actually
- * be iterated.
- */
- if (((je = cjoin->entries) != jeend) &&
- je->ends_next == 1 && F_ISSET(&je->ends[0], WT_CURJOIN_END_LT)) {
- origcur = je->ends[0].cursor;
- WT_RET(__curjoin_insert_endpoint(session, je, 0, &end));
- WT_RET(__wt_open_cursor(session, origcur->uri,
- (WT_CURSOR *)cjoin,
- F_ISSET(origcur, WT_CURSTD_RAW) ? raw_cfg : def_cfg,
- &end->cursor));
- WT_RET(end->cursor->next(end->cursor));
- end->flags = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ |
- WT_CURJOIN_END_OWN_CURSOR;
+ if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW))
+ config = &raw_cfg[0];
+ else
+ config = &def_cfg[0];
+ urimain = cjoin->table->name;
+ if ((proj = cjoin->projection) != NULL) {
+ size = strlen(urimain) + strlen(proj) + 1;
+ WT_ERR(__wt_calloc(session, size, 1, &mainbuf));
+ snprintf(mainbuf, size, "%s%s", urimain, proj);
+ urimain = mainbuf;
}
- WT_RET(__curjoin_entry_iter_init(session, cjoin, je, &cjoin->iter));
+ WT_ERR(__wt_open_cursor(session, urimain, (WT_CURSOR *)cjoin, config,
+ &cjoin->main));
+ jeend = &cjoin->entries[cjoin->entries_next];
for (je = cjoin->entries; je < jeend; je++) {
+ if (je->subjoin != NULL) {
+ WT_ERR(__curjoin_init_next(session, je->subjoin,
+ iterable));
+ continue;
+ }
__wt_stat_join_init_single(&je->stats);
+ /*
+ * For a single compare=le/lt endpoint in any entry that may
+ * be iterated, construct a companion compare=ge endpoint
+ * that will actually be iterated.
+ */
+ if (iterable && je->ends_next == 1 &&
+ F_ISSET(&je->ends[0], WT_CURJOIN_END_LT)) {
+ origcur = je->ends[0].cursor;
+ WT_ERR(__curjoin_insert_endpoint(session, je, 0, &end));
+ WT_ERR(__wt_open_cursor(session, origcur->uri,
+ (WT_CURSOR *)cjoin,
+ F_ISSET(origcur, WT_CURSTD_RAW) ? raw_cfg : def_cfg,
+ &end->cursor));
+ end->flags = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ |
+ WT_CURJOIN_END_OWN_CURSOR;
+ WT_ERR(end->cursor->next(end->cursor));
+ F_CLR(je, WT_CURJOIN_ENTRY_DISJUNCTION);
+ }
for (end = &je->ends[0]; end < &je->ends[je->ends_next];
end++)
- WT_RET(__curjoin_endpoint_init_key(session, je, end));
+ WT_ERR(__curjoin_endpoint_init_key(session, je, end));
/*
- * The first entry is iterated as the 'outermost' cursor.
- * For the common GE case, we don't have to test against
- * the left reference key, we know it will be true since
- * the btree is ordered.
+ * Do any needed Bloom filter initialization. Ignore Bloom
+ * filters for entries that will be iterated. They won't
+ * help since these entries either don't need an inclusion
+ * check or are doing any needed check during the iteration.
*/
- if (je == cjoin->entries && je->ends[0].flags ==
- (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ))
- F_SET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT);
-
- if (F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) {
+ if (!iterable && F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) {
if (session->txn.isolation == WT_ISO_READ_UNCOMMITTED)
- WT_RET_MSG(session, EINVAL,
+ WT_ERR_MSG(session, EINVAL,
"join cursors with Bloom filters cannot be "
"used with read-uncommitted isolation");
if (je->bloom == NULL) {
@@ -515,10 +728,10 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin)
}
je->bloom_bit_count = f;
je->bloom_hash_count = k;
- WT_RET(__wt_bloom_create(session, NULL,
+ WT_ERR(__wt_bloom_create(session, NULL,
NULL, je->count, f, k, &je->bloom));
F_SET(je, WT_CURJOIN_ENTRY_OWN_BLOOM);
- WT_RET(__curjoin_init_bloom(session, cjoin,
+ WT_ERR(__curjoin_init_bloom(session, cjoin,
je, je->bloom));
/*
* Share the Bloom filter, making all
@@ -540,55 +753,143 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin)
* merge into the shared one. The Bloom
* parameters of the two filters must match.
*/
- WT_RET(__wt_bloom_create(session, NULL,
+ WT_ERR(__wt_bloom_create(session, NULL,
NULL, je->count, je->bloom_bit_count,
je->bloom_hash_count, &bloom));
- WT_RET(__curjoin_init_bloom(session, cjoin,
+ WT_ERR(__curjoin_init_bloom(session, cjoin,
je, bloom));
- WT_RET(__wt_bloom_intersection(je->bloom,
+ WT_ERR(__wt_bloom_intersection(je->bloom,
bloom));
- WT_RET(__wt_bloom_close(bloom));
+ WT_ERR(__wt_bloom_close(bloom));
}
}
+ if (!F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION))
+ iterable = false;
}
-
F_SET(cjoin, WT_CURJOIN_INITIALIZED);
+
+err: __wt_free(session, mainbuf);
return (ret);
}
/*
+ * __curjoin_entries_in_range --
+ * Check if a key is in the range specified by the remaining entries,
+ * returning WT_NOTFOUND if not.
+ */
+static int
+__curjoin_entries_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_ITEM *curkey, WT_CURSOR_JOIN_ITER *iterarg)
+{
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_CURSOR_JOIN_ITER *iter;
+ WT_DECL_RET;
+ int fastret, slowret;
+ u_int pos;
+
+ iter = iterarg;
+ if (F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION)) {
+ fastret = 0;
+ slowret = WT_NOTFOUND;
+ } else {
+ fastret = WT_NOTFOUND;
+ slowret = 0;
+ }
+ pos = (iter == NULL ? 0 : iter->entry_pos);
+ for (entry = &cjoin->entries[pos]; pos < cjoin->entries_next;
+ entry++, pos++) {
+ ret = __curjoin_entry_member(session, entry, curkey, iter);
+ if (ret == fastret)
+ return (fastret);
+ if (ret != slowret)
+ WT_ERR(ret);
+ iter = NULL;
+ }
+err: return (ret == 0 ? slowret : ret);
+}
+
+/*
* __curjoin_entry_in_range --
* Check if a key is in the range specified by the entry, returning
* WT_NOTFOUND if not.
*/
static int
__curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry,
- WT_ITEM *curkey, bool skip_left)
+ WT_ITEM *curkey, WT_CURSOR_JOIN_ITER *iter)
{
WT_COLLATOR *collator;
WT_CURSOR_JOIN_ENDPOINT *end, *endmax;
+ bool disjunction, passed;
int cmp;
+ u_int pos;
collator = (entry->index != NULL) ? entry->index->collator : NULL;
endmax = &entry->ends[entry->ends_next];
- for (end = &entry->ends[skip_left ? 1 : 0]; end < endmax; end++) {
+ disjunction = F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION);
+ passed = false;
+
+ /*
+ * The iterator may have already satisfied some endpoint conditions.
+ * If so and we're a disjunction, we're done. If so and we're a
+ * conjunction, we can start past the satisfied conditions.
+ */
+ if (iter == NULL)
+ pos = 0;
+ else {
+ if (disjunction && iter->end_skip)
+ return (0);
+ pos = iter->end_pos + iter->end_skip;
+ }
+
+ for (end = &entry->ends[pos]; end < endmax; end++) {
WT_RET(__wt_compare(session, collator, curkey, &end->key,
&cmp));
- if (!F_ISSET(end, WT_CURJOIN_END_LT)) {
- if (cmp < 0 ||
- (cmp == 0 &&
- !F_ISSET(end, WT_CURJOIN_END_EQ)) ||
- (cmp > 0 && !F_ISSET(end, WT_CURJOIN_END_GT)))
- WT_RET(WT_NOTFOUND);
- } else {
- if (cmp > 0 ||
- (cmp == 0 &&
- !F_ISSET(end, WT_CURJOIN_END_EQ)) ||
- (cmp < 0 && !F_ISSET(end, WT_CURJOIN_END_LT)))
- WT_RET(WT_NOTFOUND);
+ switch (WT_CURJOIN_END_RANGE(end)) {
+ case WT_CURJOIN_END_EQ:
+ passed = (cmp == 0);
+ break;
+
+ case WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ:
+ passed = (cmp >= 0);
+ WT_ASSERT(session, iter == NULL);
+ break;
+
+ case WT_CURJOIN_END_GT:
+ passed = (cmp > 0);
+ if (passed && iter != NULL && pos == 0)
+ iter->end_skip = 1;
+ break;
+
+ case WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ:
+ passed = (cmp <= 0);
+ break;
+
+ case WT_CURJOIN_END_LT:
+ passed = (cmp < 0);
+ break;
+
+ default:
+ WT_RET(__wt_illegal_value(session, NULL));
+ break;
}
+
+ if (!passed) {
+ if (iter != NULL &&
+ (iter->is_equal ||
+ F_ISSET(end, WT_CURJOIN_END_LT))) {
+ WT_RET(__curjoin_entry_iter_bump(iter));
+ return (WT_NOTFOUND);
+ }
+ if (!disjunction)
+ return (WT_NOTFOUND);
+ iter = NULL;
+ } else if (disjunction)
+ break;
}
- return (0);
+ if (disjunction && end == endmax)
+ return (WT_NOTFOUND);
+ else
+ return (0);
}
typedef struct {
@@ -642,8 +943,8 @@ __curjoin_extract_insert(WT_CURSOR *cursor) {
* if not a member, returns WT_NOTFOUND.
*/
static int
-__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
- WT_CURSOR_JOIN_ENTRY *entry, bool skip_left)
+__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry,
+ WT_ITEM *key, WT_CURSOR_JOIN_ITER *iter)
{
WT_CURJOIN_EXTRACTOR extract_cursor;
WT_CURSOR *c;
@@ -666,12 +967,15 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
__wt_cursor_notsup); /* close */
WT_DECL_RET;
WT_INDEX *idx;
- WT_ITEM *key, v;
+ WT_ITEM v;
bool bloom_found;
- if (skip_left && entry->ends_next == 1)
+ if (entry->subjoin == NULL && iter != NULL &&
+ (iter->end_pos + iter->end_skip >= entry->ends_next ||
+ (iter->end_skip > 0 &&
+ F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION))))
return (0); /* no checks to make */
- key = cjoin->iter->curkey;
+
entry->stats.accesses++;
bloom_found = false;
@@ -692,14 +996,26 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
WT_ERR(__wt_bloom_inmem_get(entry->bloom, key));
bloom_found = true;
}
+ if (entry->subjoin != NULL) {
+ WT_ASSERT(session,
+ iter == NULL || entry->subjoin == iter->child->cjoin);
+ ret = __curjoin_entries_in_range(session, entry->subjoin,
+ key, iter == NULL ? NULL : iter->child);
+ if (iter != NULL &&
+ WT_CURJOIN_ITER_CONSUMED(iter->child)) {
+ WT_ERR(__curjoin_entry_iter_bump(iter));
+ ret = WT_NOTFOUND;
+ }
+ return (ret);
+ }
if (entry->index != NULL) {
/*
* If this entry is used by the iterator, then we already
- * have the index key, and we won't have to do any extraction
- * either.
+ * have the index key, and we won't have to do any
+ * extraction either.
*/
- if (entry == cjoin->iter->entry)
- WT_ITEM_SET(v, cjoin->iter->idxkey);
+ if (iter != NULL && entry == iter->entry)
+ WT_ITEM_SET(v, iter->idxkey);
else {
memset(&v, 0, sizeof(v)); /* Keep lint quiet. */
c = entry->main;
@@ -716,7 +1032,7 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
WT_ITEM_SET(v, *key);
if ((idx = entry->index) != NULL && idx->extractor != NULL &&
- entry != cjoin->iter->entry) {
+ (iter == NULL || entry != iter->entry)) {
WT_CLEAR(extract_cursor);
extract_cursor.iface = iface;
extract_cursor.iface.session = &session->iface;
@@ -728,7 +1044,7 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
if (!extract_cursor.ismember)
WT_ERR(WT_NOTFOUND);
} else
- WT_ERR(__curjoin_entry_in_range(session, entry, &v, skip_left));
+ WT_ERR(__curjoin_entry_in_range(session, entry, &v, iter));
if (0) {
err: if (ret == WT_NOTFOUND && bloom_found)
@@ -749,49 +1065,30 @@ __curjoin_next(WT_CURSOR *cursor)
WT_CURSOR_JOIN_ITER *iter;
WT_DECL_RET;
WT_SESSION_IMPL *session;
- bool skip_left;
- u_int i;
const uint8_t *p;
+ int tret;
cjoin = (WT_CURSOR_JOIN *)cursor;
- CURSOR_API_CALL(cursor, session, next, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL);
if (F_ISSET(cjoin, WT_CURJOIN_ERROR))
WT_ERR_MSG(session, WT_ERROR,
"join cursor encountered previous error");
if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED))
- WT_ERR(__curjoin_init_iter(session, cjoin));
-
- F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
+ WT_ERR(__curjoin_init_next(session, cjoin, true));
+ if (cjoin->iter == NULL)
+ WT_ERR(__curjoin_entry_iter_init(session, cjoin, &cjoin->iter));
iter = cjoin->iter;
+ F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
-nextkey:
- if ((ret = __curjoin_entry_iter_next(iter, cursor)) == 0) {
- F_SET(cursor, WT_CURSTD_KEY_EXT);
-
- /*
- * We may have already established membership for the
- * 'left' case for the first entry, since we're
- * using that in our iteration.
- */
- skip_left = F_ISSET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT);
- for (i = 0; i < cjoin->entries_next; i++) {
- ret = __curjoin_entry_member(session, cjoin,
- &cjoin->entries[i], skip_left);
- if (ret == WT_NOTFOUND) {
- /*
- * If this is compare=eq on our outer iterator,
- * and we've moved past it, we're done.
- */
- if (iter->isequal && i == 0)
- break;
- goto nextkey;
- }
- skip_left = false;
- WT_ERR(ret);
- }
- } else if (ret != WT_NOTFOUND)
+ while ((ret = __curjoin_entry_iter_next(iter, cursor)) == 0) {
+ if ((ret = __curjoin_entries_in_range(session, cjoin,
+ iter->curkey, iter)) != WT_NOTFOUND)
+ break;
+ }
+ iter->positioned = (ret == 0);
+ if (ret != 0 && ret != WT_NOTFOUND)
WT_ERR(ret);
if (ret == 0) {
@@ -799,7 +1096,7 @@ nextkey:
* Position the 'main' cursor, this will be used to
* retrieve values from the cursor join.
*/
- c = iter->main;
+ c = cjoin->main;
if (WT_CURSOR_RECNO(cursor) &&
!F_ISSET(cursor, WT_CURSTD_RAW)) {
p = (const uint8_t *)iter->curkey->data;
@@ -808,10 +1105,11 @@ nextkey:
c->set_key(c, cjoin->iface.recno);
} else
c->set_key(c, iter->curkey);
- if ((ret = c->search(c)) != 0)
- WT_ERR(c->search(c));
+ WT_ERR(c->search(c));
F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
- }
+ } else if (ret == WT_NOTFOUND &&
+ (tret = __curjoin_entry_iter_close_all(iter)) != 0)
+ WT_ERR(tret);
if (0) {
err: F_SET(cjoin, WT_CURJOIN_ERROR);
@@ -832,9 +1130,9 @@ __curjoin_reset(WT_CURSOR *cursor)
cjoin = (WT_CURSOR_JOIN *)cursor;
- CURSOR_API_CALL(cursor, session, reset, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL);
- if (F_ISSET(cjoin, WT_CURJOIN_INITIALIZED))
+ if (cjoin->iter != NULL)
WT_ERR(__curjoin_entry_iter_reset(cjoin->iter));
err: API_END_RET(session, ret);
@@ -856,7 +1154,7 @@ __curjoin_close(WT_CURSOR *cursor)
cjoin = (WT_CURSOR_JOIN *)cursor;
- CURSOR_API_CALL(cursor, session, close, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL);
__wt_schema_release_table(session, cjoin->table);
/* These are owned by the table */
@@ -869,6 +1167,10 @@ __curjoin_close(WT_CURSOR *cursor)
for (entry = cjoin->entries, i = 0; i < cjoin->entries_next;
entry++, i++) {
+ if (entry->subjoin != NULL) {
+ F_CLR(&entry->subjoin->iface, WT_CURSTD_JOINED);
+ entry->subjoin->parent = NULL;
+ }
if (entry->main != NULL)
WT_TRET(entry->main->close(entry->main));
if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM))
@@ -884,7 +1186,10 @@ __curjoin_close(WT_CURSOR *cursor)
}
if (cjoin->iter != NULL)
- WT_TRET(__curjoin_entry_iter_close(cjoin->iter));
+ WT_TRET(__curjoin_entry_iter_close_all(cjoin->iter));
+ if (cjoin->main != NULL)
+ WT_TRET(cjoin->main->close(cjoin->main));
+
__wt_free(session, cjoin->entries);
WT_TRET(__wt_cursor_close(cursor));
@@ -975,6 +1280,52 @@ err: WT_TRET(__curjoin_close(cursor));
}
/*
+ * __curjoin_open_main --
+ * For the given index, open the main file with a projection
+ * that is the index keys.
+ */
+static int
+__curjoin_open_main(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_CURSOR_JOIN_ENTRY *entry)
+{
+ WT_DECL_RET;
+ WT_INDEX *idx;
+ char *main_uri, *newformat;
+ const char *raw_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), "raw", NULL };
+ size_t len, newsize;
+
+ main_uri = NULL;
+ idx = entry->index;
+
+ newsize = strlen(cjoin->table->name) + idx->colconf.len + 1;
+ WT_ERR(__wt_calloc(session, 1, newsize, &main_uri));
+ snprintf(main_uri, newsize, "%s%.*s",
+ cjoin->table->name, (int)idx->colconf.len,
+ idx->colconf.str);
+ WT_ERR(__wt_open_cursor(session, main_uri,
+ (WT_CURSOR *)cjoin, raw_cfg, &entry->main));
+ if (idx->extractor == NULL) {
+ /*
+ * Add no-op padding so trailing 'u' formats are not
+ * transformed to 'U'. This matches what happens in
+ * the index. We don't do this when we have an
+ * extractor, extractors already use the padding
+ * byte trick.
+ */
+ len = strlen(entry->main->value_format) + 3;
+ WT_ERR(__wt_calloc(session, len, 1, &newformat));
+ snprintf(newformat, len, "%s0x",
+ entry->main->value_format);
+ __wt_free(session, entry->main->value_format);
+ entry->main->value_format = newformat;
+ }
+
+err: __wt_free(session, main_uri);
+ return (ret);
+}
+
+/*
* __wt_curjoin_join --
* Add a new join to a join cursor.
*/
@@ -986,31 +1337,51 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
WT_CURSOR_INDEX *cindex;
WT_CURSOR_JOIN_ENDPOINT *end;
WT_CURSOR_JOIN_ENTRY *entry;
+ WT_CURSOR_JOIN *child;
WT_DECL_RET;
- bool hasins, needbloom, range_eq;
- char *main_uri, *newformat;
- const char *raw_cfg[] = { WT_CONFIG_BASE(
- session, WT_SESSION_open_cursor), "raw", NULL };
- size_t len, newsize;
+ bool hasins, needbloom, nested, range_eq;
+ size_t len;
u_int i, ins, nonbloom;
+ uint8_t endrange;
entry = NULL;
hasins = needbloom = false;
ins = 0; /* -Wuninitialized */
- main_uri = NULL;
nonbloom = 0; /* -Wuninitialized */
- for (i = 0; i < cjoin->entries_next; i++) {
- if (cjoin->entries[i].index == idx) {
- entry = &cjoin->entries[i];
- break;
- }
- if (!needbloom && i > 0 &&
- !F_ISSET(&cjoin->entries[i], WT_CURJOIN_ENTRY_BLOOM)) {
- needbloom = true;
- nonbloom = i;
+ if (cjoin->entries_next == 0) {
+ if (LF_ISSET(WT_CURJOIN_ENTRY_DISJUNCTION))
+ F_SET(cjoin, WT_CURJOIN_DISJUNCTION);
+ } else if (LF_ISSET(WT_CURJOIN_ENTRY_DISJUNCTION) &&
+ !F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION))
+ WT_ERR_MSG(session, EINVAL,
+ "operation=or does not match previous operation=and");
+ else if (!LF_ISSET(WT_CURJOIN_ENTRY_DISJUNCTION) &&
+ F_ISSET(cjoin, WT_CURJOIN_DISJUNCTION))
+ WT_ERR_MSG(session, EINVAL,
+ "operation=and does not match previous operation=or");
+
+ nested = WT_PREFIX_MATCH(ref_cursor->uri, "join:");
+ if (!nested)
+ for (i = 0; i < cjoin->entries_next; i++) {
+ if (cjoin->entries[i].index == idx &&
+ cjoin->entries[i].subjoin == NULL) {
+ entry = &cjoin->entries[i];
+ break;
+ }
+ if (!needbloom && i > 0 &&
+ !F_ISSET(&cjoin->entries[i],
+ WT_CURJOIN_ENTRY_BLOOM)) {
+ needbloom = true;
+ nonbloom = i;
+ }
}
+ else {
+ if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM))
+ WT_ERR_MSG(session, EINVAL,
+ "Bloom filters cannot be used with subjoins");
}
+
if (entry == NULL) {
WT_ERR(__wt_realloc_def(session, &cjoin->entries_allocated,
cjoin->entries_next + 1, &cjoin->entries));
@@ -1070,17 +1441,18 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
for (i = 0; i < entry->ends_next; i++) {
end = &entry->ends[i];
range_eq = (range == WT_CURJOIN_END_EQ);
+ endrange = WT_CURJOIN_END_RANGE(end);
if ((F_ISSET(end, WT_CURJOIN_END_GT) &&
((range & WT_CURJOIN_END_GT) != 0 || range_eq)) ||
(F_ISSET(end, WT_CURJOIN_END_LT) &&
((range & WT_CURJOIN_END_LT) != 0 || range_eq)) ||
- (WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ &&
+ (endrange == WT_CURJOIN_END_EQ &&
(range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT))
!= 0))
WT_ERR_MSG(session, EINVAL,
"join has overlapping ranges");
if (range == WT_CURJOIN_END_EQ &&
- WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ &&
+ endrange == WT_CURJOIN_END_EQ &&
!F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION))
WT_ERR_MSG(session, EINVAL,
"compare=eq can only be combined "
@@ -1093,6 +1465,7 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
if (!hasins &&
((range & WT_CURJOIN_END_GT) != 0 ||
(range == WT_CURJOIN_END_EQ &&
+ endrange != WT_CURJOIN_END_EQ &&
!F_ISSET(end, WT_CURJOIN_END_GT)))) {
ins = i;
hasins = true;
@@ -1105,50 +1478,38 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
entry->bloom_hash_count =
WT_MAX(entry->bloom_hash_count, bloom_hash_count);
}
- WT_ERR(__curjoin_insert_endpoint(session, entry,
- hasins ? ins : entry->ends_next, &end));
- end->cursor = ref_cursor;
- F_SET(end, range);
-
- /* Open the main file with a projection of the indexed columns. */
- if (entry->main == NULL && idx != NULL) {
- newsize = strlen(cjoin->table->name) + idx->colconf.len + 1;
- WT_ERR(__wt_calloc(session, 1, newsize, &main_uri));
- snprintf(main_uri, newsize, "%s%.*s",
- cjoin->table->name, (int)idx->colconf.len,
- idx->colconf.str);
- WT_ERR(__wt_open_cursor(session, main_uri,
- (WT_CURSOR *)cjoin, raw_cfg, &entry->main));
- if (idx->extractor == NULL) {
+ if (nested) {
+ child = (WT_CURSOR_JOIN *)ref_cursor;
+ entry->subjoin = child;
+ child->parent = cjoin;
+ } else {
+ WT_ERR(__curjoin_insert_endpoint(session, entry,
+ hasins ? ins : entry->ends_next, &end));
+ end->cursor = ref_cursor;
+ F_SET(end, range);
+
+ if (entry->main == NULL && idx != NULL) {
/*
- * Add no-op padding so trailing 'u' formats are not
- * transformed to 'U'. This matches what happens in
- * the index. We don't do this when we have an
- * extractor, extractors already use the padding
- * byte trick.
+ * Open the main file with a projection of the
+ * indexed columns.
*/
- len = strlen(entry->main->value_format) + 3;
- WT_ERR(__wt_calloc(session, len, 1, &newformat));
- snprintf(newformat, len, "%s0x",
- entry->main->value_format);
- __wt_free(session, entry->main->value_format);
- entry->main->value_format = newformat;
- }
+ WT_ERR(__curjoin_open_main(session, cjoin, entry));
- /*
- * When we are repacking index keys to remove the primary
- * key, we never want to transform trailing 'u'. Use no-op
- * padding to force this.
- */
- cindex = (WT_CURSOR_INDEX *)ref_cursor;
- len = strlen(cindex->iface.key_format) + 3;
- WT_ERR(__wt_calloc(session, len, 1, &entry->repack_format));
- snprintf(entry->repack_format, len, "%s0x",
- cindex->iface.key_format);
+ /*
+ * When we are repacking index keys to remove the
+ * primary key, we never want to transform trailing
+ * 'u'. Use no-op padding to force this.
+ */
+ cindex = (WT_CURSOR_INDEX *)ref_cursor;
+ len = strlen(cindex->iface.key_format) + 3;
+ WT_ERR(__wt_calloc(session, len, 1,
+ &entry->repack_format));
+ snprintf(entry->repack_format, len, "%s0x",
+ cindex->iface.key_format);
+ }
}
-err: __wt_free(session, main_uri);
- return (ret);
+err: return (ret);
}
/*
diff --git a/src/docs/cursor-join.dox b/src/docs/cursor-join.dox
index 51da6b174bf..5ea064a250b 100644
--- a/src/docs/cursor-join.dox
+++ b/src/docs/cursor-join.dox
@@ -14,6 +14,31 @@ Here is an example using join cursors:
Joins support various comparison operators: \c "eq", \c "gt", \c "ge", \c "lt", \c "le". Ranges with lower and upper bounds can also be specified, by joining two cursors on the same index, for example, one with \c "compare=ge" and another \c "compare=lt". In addition to joining indices, the main table can be joined so that a range of primary keys can be specified.
+By default, a join cursor returns a conjunction, that is, all keys that
+satisfy all the joined comparisons. By specifying a configuration with \c
+"operation=or", a join cursor will return a disjunction, or all keys that
+satisfy at least one of the joined comparisons. More complex joins can be
+composed by specifying another join cursor as the reference cursor in a join
+call.
+
+Here is an example using these concepts to show a conjunction of a disjunction:
+
+@snippet ex_schema.c Complex join cursors
+
All the joins should be done on the join cursor before WT_CURSOR::next is called. Calling WT_CURSOR::next on a join cursor for the first time populates any bloom filters and performs other initialization. The join cursor's key is the primary key (the key for the main table), and its value is the entire set of values of the main table. A join cursor can be created with a projection by appending \c "(col1,col2,...)" to the URI if a different set of values is needed.
+Keys returned from the join cursor are ordered according to the
+first reference cursor joined. For example, if an index cursor was joined
+first, that index determines the order of results. If the join cursor
+uses disjunctions, then the ordering of all joins determines the order.
+The first join in a conjunctive join, or all joins in a disjunctive join,
+are distinctive in that they are iterated internally as the cursor join
+returns values in order. Any bloom filters specified on the
+joins that are used for iteration are not useful, and are silently ignored.
+
+When disjunctions are used where the sets of keys overlap on these 'iteration
+joins', a join cursor will return duplicates. A join cursor never returns
+duplicates unless \c "operation=or" is used in a join configuration, or unless
+the first joined cursor is itself a join cursor that would return duplicates.
+
*/
diff --git a/src/docs/spell.ok b/src/docs/spell.ok
index efc306568cd..965d28f2ec6 100644
--- a/src/docs/spell.ok
+++ b/src/docs/spell.ok
@@ -178,6 +178,8 @@ desc
destructor
destructors
dev
+disjunction
+disjunctions
distclean
dl
dll
diff --git a/src/include/api.h b/src/include/api.h
index c6a5af40698..50b2eab83b8 100644
--- a/src/include/api.h
+++ b/src/include/api.h
@@ -118,7 +118,7 @@
#define JOINABLE_CURSOR_CALL_CHECK(cur) \
if (F_ISSET(cur, WT_CURSTD_JOINED)) \
- WT_ERR(__wt_curindex_joined(cur))
+ WT_ERR(__wt_curjoin_joined(cur))
#define JOINABLE_CURSOR_API_CALL(cur, s, n, bt) \
CURSOR_API_CALL(cur, s, n, bt); \
diff --git a/src/include/cursor.h b/src/include/cursor.h
index c08cf54be70..6357523a03f 100644
--- a/src/include/cursor.h
+++ b/src/include/cursor.h
@@ -284,18 +284,50 @@ struct __wt_cursor_index {
uint8_t *cg_needvalue;
};
+/*
+ * A join iterator structure is used to generate candidate primary keys. It
+ * is the responsibility of the caller of the iterator to filter these
+ * primary key against the other conditions of the join before returning
+ * them the caller of WT_CURSOR::next.
+ *
+ * For a conjunction join (the default), entry_count will be 1, meaning that
+ * the iterator only consumes the first entry (WT_CURSOR_JOIN_ENTRY). That
+ * is, it successively returns primary keys from a cursor for the first
+ * index that was joined. When the values returned by that cursor are
+ * exhausted, the iterator has completed. For a disjunction join,
+ * exhausting a cursor just means that the iterator advances to the next
+ * entry. If the next entry represents an index, a new cursor is opened and
+ * primary keys from that index are then successively returned.
+ *
+ * When positioned on an entry that represents a nested join, a new child
+ * iterator is created that will be bound to the nested WT_CURSOR_JOIN.
+ * That iterator is then used to generate candidate primary keys. When its
+ * iteration is completed, that iterator is destroyed and the parent
+ * iterator advances to the next entry. Thus, depending on how deeply joins
+ * are nested, a similarly deep stack of iterators is created.
+ */
struct __wt_cursor_join_iter {
WT_SESSION_IMPL *session;
WT_CURSOR_JOIN *cjoin;
WT_CURSOR_JOIN_ENTRY *entry;
+ WT_CURSOR_JOIN_ITER *child;
WT_CURSOR *cursor; /* has null projection */
- WT_CURSOR *main; /* main table with projection */
WT_ITEM *curkey; /* primary key */
WT_ITEM idxkey;
+ u_int entry_pos; /* the current entry */
+ u_int entry_count; /* entries to walk */
+ u_int end_pos; /* the current endpoint */
+ u_int end_count; /* endpoints to walk */
+ u_int end_skip; /* when testing for inclusion */
+ /* can we skip current end? */
bool positioned;
- bool isequal; /* advancing means we're done */
+ bool is_equal;
};
+/*
+ * A join endpoint represents a positioned cursor that is 'captured' by a
+ * WT_SESSION::join call.
+ */
struct __wt_cursor_join_endpoint {
WT_ITEM key;
uint8_t recno_buf[10]; /* holds packed recno */
@@ -313,9 +345,17 @@ struct __wt_cursor_join_endpoint {
((endp)->flags & \
(WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ | WT_CURJOIN_END_LT))
+/*
+ * Each join entry typically represents an index's participation in a join.
+ * For example, if 'k' is an index, then "t.k > 10 && t.k < 20" would be
+ * represented by a single entry, with two endpoints. When the index and
+ * subjoin fields are NULL, the join is on the main table. When subjoin is
+ * non-NULL, there is a nested join clause.
+ */
struct __wt_cursor_join_entry {
WT_INDEX *index;
WT_CURSOR *main; /* raw main table cursor */
+ WT_CURSOR_JOIN *subjoin; /* a nested join clause */
WT_BLOOM *bloom; /* Bloom filter handle */
char *repack_format; /* target format for repack */
uint32_t bloom_bit_count; /* bits per item in bloom */
@@ -339,15 +379,17 @@ struct __wt_cursor_join {
WT_TABLE *table;
const char *projection;
- WT_CURSOR_JOIN_ITER *iter;
+ WT_CURSOR *main; /* main table with projection */
+ WT_CURSOR_JOIN *parent; /* parent of nested group */
+ WT_CURSOR_JOIN_ITER *iter; /* chain of iterators */
WT_CURSOR_JOIN_ENTRY *entries;
size_t entries_allocated;
u_int entries_next;
uint8_t recno_buf[10]; /* holds packed recno */
-#define WT_CURJOIN_ERROR 0x01 /* Error in initialization */
-#define WT_CURJOIN_INITIALIZED 0x02 /* Successful initialization */
-#define WT_CURJOIN_SKIP_FIRST_LEFT 0x04 /* First check not needed */
+#define WT_CURJOIN_DISJUNCTION 0x01 /* Entries are or-ed */
+#define WT_CURJOIN_ERROR 0x02 /* Error in initialization */
+#define WT_CURJOIN_INITIALIZED 0x04 /* Successful initialization */
uint8_t flags;
};
diff --git a/src/include/extern.h b/src/include/extern.h
index c7c4c2e3ff7..ae82424078d 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -283,8 +283,8 @@ extern int __wt_curdump_create(WT_CURSOR *child, WT_CURSOR *owner, WT_CURSOR **c
extern int __wt_curfile_update_check(WT_CURSOR *cursor);
extern int __wt_curfile_create(WT_SESSION_IMPL *session, WT_CURSOR *owner, const char *cfg[], bool bulk, bool bitmap, WT_CURSOR **cursorp);
extern int __wt_curfile_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
-extern int __wt_curindex_joined(WT_CURSOR *cursor);
extern int __wt_curindex_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
+extern int __wt_curjoin_joined(WT_CURSOR *cursor);
extern int __wt_curjoin_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
extern int __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_INDEX *idx, WT_CURSOR *ref_cursor, uint8_t flags, uint8_t range, uint64_t count, uint32_t bloom_bit_count, uint32_t bloom_hash_count);
extern int __wt_json_alloc_unpack(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, WT_CURSOR_JSON *json, bool iskey, va_list ap);
diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in
index c9102c929a8..87f7ed276e2 100644
--- a/src/include/wiredtiger.in
+++ b/src/include/wiredtiger.in
@@ -1247,18 +1247,21 @@ struct __wt_session {
* @param join_cursor a cursor that was opened using a
* \c "join:" URI. It may not have been used for any operations
* other than other join calls.
- * @param ref_cursor either an index cursor having the same base table
- * as the join_cursor, or a table cursor open on the same base table.
- * The ref_cursor must be positioned.
+ * @param ref_cursor an index cursor having the same base table
+ * as the join_cursor, or a table cursor open on the same base table,
+ * or another join cursor. Unless the ref_cursor is another join
+ * cursor, it must be positioned.
*
* The ref_cursor limits the results seen by iterating the
* join_cursor to table items referred to by the key in this
* index. The set of keys referred to is modified by the compare
* config option.
*
- * Multiple join calls builds up a set of ref_cursors, and the
- * results seen by iteration are the intersection of the cursor
- * ranges participating in the join.
+ * Multiple join calls builds up a set of ref_cursors, and
+ * by default, the results seen by iteration are the intersection
+ * of the cursor ranges participating in the join. When configured
+ * with \c "operation=or", the results seen are the union of
+ * the participating cursor ranges.
*
* After the join call completes, the ref_cursor cursor may not be
* used for any purpose other than get_key and get_value. Any other
@@ -1281,6 +1284,13 @@ struct __wt_session {
* also influences evaluation order for cursors in the join. When the
* count is equal for multiple bloom filters in a composition of joins\,
* the bloom filter may be shared., an integer; default \c .}
+ * @config{operation, the operation applied between this and other
+ * joined cursors. When "operation=and" is specified\, all the
+ * conditions implied by joins must be satisfied for an entry to be
+ * returned by the join cursor; when "operation=or" is specified\, only
+ * one must be satisfied. All cursors joined to a join cursor must have
+ * matching operations., a string\, chosen from the following options:
+ * \c "and"\, \c "or"; default \c "and".}
* @config{strategy, when set to bloom\, a bloom filter is created and
* populated for this index. This has an up front cost but may reduce
* the number of accesses to the main table when iterating the joined
diff --git a/src/session/session_api.c b/src/session/session_api.c
index bb496494234..aa9efac0b92 100644
--- a/src/session/session_api.c
+++ b/src/session/session_api.c
@@ -794,12 +794,15 @@ __session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor,
WT_INDEX *idx;
WT_SESSION_IMPL *session;
WT_TABLE *table;
+ bool nested;
uint64_t count;
uint32_t bloom_bit_count, bloom_hash_count;
uint8_t flags, range;
count = 0;
firstcg = NULL;
+ flags = 0;
+ nested = false;
session = (WT_SESSION_IMPL *)wt_session;
SESSION_API_CALL(session, join, config, cfg);
table = NULL;
@@ -817,19 +820,25 @@ __session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor,
ctable = (WT_CURSOR_TABLE *)ref_cursor;
table = ctable->table;
firstcg = ctable->cg_cursors[0];
+ } else if (WT_PREFIX_MATCH(ref_cursor->uri, "join:")) {
+ idx = NULL;
+ table = ((WT_CURSOR_JOIN *)ref_cursor)->table;
+ nested = true;
} else
- WT_ERR_MSG(session, EINVAL, "not an index or table cursor");
+ WT_ERR_MSG(session, EINVAL,
+ "ref_cursor must be an index, table or join cursor");
- if (!F_ISSET(firstcg, WT_CURSTD_KEY_SET))
+ if (firstcg != NULL && !F_ISSET(firstcg, WT_CURSTD_KEY_SET))
WT_ERR_MSG(session, EINVAL,
"requires reference cursor be positioned");
cjoin = (WT_CURSOR_JOIN *)join_cursor;
if (cjoin->table != table)
WT_ERR_MSG(session, EINVAL,
- "table for join cursor does not match table for index");
+ "table for join cursor does not match table for "
+ "ref_cursor");
if (F_ISSET(ref_cursor, WT_CURSTD_JOINED))
WT_ERR_MSG(session, EINVAL,
- "index cursor already used in a join");
+ "cursor already used in a join");
/* "ge" is the default */
range = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ;
@@ -868,15 +877,20 @@ __session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor,
WT_ERR_MSG(session, EINVAL,
"bloom_hash_count: value too large");
bloom_hash_count = (uint32_t)cval.val;
- if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM)) {
- if (count == 0)
- WT_ERR_MSG(session, EINVAL,
- "count must be nonzero when strategy=bloom");
- if (cjoin->entries_next == 0)
- WT_ERR_MSG(session, EINVAL,
- "the first joined cursor cannot specify "
- "strategy=bloom");
- }
+ if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM) && count == 0)
+ WT_ERR_MSG(session, EINVAL,
+ "count must be nonzero when strategy=bloom");
+
+ WT_ERR(__wt_config_gets(session, cfg, "operation", &cval));
+ if (cval.len != 0 && WT_STRING_MATCH("or", cval.str, cval.len))
+ LF_SET(WT_CURJOIN_ENTRY_DISJUNCTION);
+
+ if (nested && (count != 0 || range != WT_CURJOIN_END_EQ ||
+ LF_ISSET(WT_CURJOIN_ENTRY_BLOOM)))
+ WT_ERR_MSG(session, EINVAL,
+ "joining a nested join cursor is incompatible with "
+ "setting \"strategy\", \"compare\" or \"count\"");
+
WT_ERR(__wt_curjoin_join(session, cjoin, idx, ref_cursor, flags,
range, count, bloom_bit_count, bloom_hash_count));
/*
diff --git a/test/suite/test_join01.py b/test/suite/test_join01.py
index 4aa2bc6e269..6dc1a1a33ae 100644
--- a/test/suite/test_join01.py
+++ b/test/suite/test_join01.py
@@ -52,9 +52,29 @@ class test_join01(wttest.WiredTigerTestCase):
return [s, rs, sort3]
# Common function for testing iteration of join cursors
- def iter_common(self, jc, do_proj):
+ def iter_common(self, jc, do_proj, do_nested, join_order):
# See comments in join_common()
- expect = [73, 82, 62, 83, 92]
+ # The order that the results are seen depends on
+ # the ordering of the joins. Specifically, the first
+ # join drives the order that results are seen.
+ if do_nested:
+ if join_order == 0:
+ expect = [73, 82, 83, 92]
+ elif join_order == 1:
+ expect = [73, 82, 83, 92]
+ elif join_order == 2:
+ expect = [82, 92, 73, 83]
+ elif join_order == 3:
+ expect = [92, 73, 82, 83]
+ else:
+ if join_order == 0:
+ expect = [73, 82, 62, 83, 92]
+ elif join_order == 1:
+ expect = [62, 73, 82, 83, 92]
+ elif join_order == 2:
+ expect = [62, 82, 92, 73, 83]
+ elif join_order == 3:
+ expect = [73, 82, 62, 83, 92]
while jc.next() == 0:
[k] = jc.get_keys()
i = k - 1
@@ -64,7 +84,9 @@ class test_join01(wttest.WiredTigerTestCase):
[v0,v1,v2] = jc.get_values()
self.assertEquals(self.gen_values(i), [v0,v1,v2])
if len(expect) == 0 or i != expect[0]:
- self.tty(' result ' + str(i) + ' is not in: ' + str(expect))
+ self.tty('ERROR: ' + str(i) + ' is not next in: ' +
+ str(expect))
+ self.tty('JOIN ORDER=' + str(join_order) + ', NESTED=' + str(do_nested))
self.assertTrue(i == expect[0])
expect.remove(i)
self.assertEquals(0, len(expect))
@@ -118,11 +140,40 @@ class test_join01(wttest.WiredTigerTestCase):
self.assertTrue(len(expectstats) == 0,
'missing expected values in stats: ' + str(expectstats))
+ def session_record_join(self, jc, refc, config, order, joins):
+ joins.append([order, [jc, refc, config]])
+
+ def session_play_one_join(self, firsturi, jc, refc, config):
+ if refc.uri == firsturi and config != None:
+ config = config.replace('strategy=bloom','')
+ #self.tty('->join(jc, uri="' + refc.uri +
+ # '", config="' + str(config) + '"')
+ self.session.join(jc, refc, config)
+
+ def session_play_joins(self, joins, join_order):
+ #self.tty('->')
+ firsturi = None
+ for [i, joinargs] in joins:
+ if i >= join_order:
+ if firsturi == None:
+ firsturi = joinargs[1].uri
+ self.session_play_one_join(firsturi, *joinargs)
+ for [i, joinargs] in joins:
+ if i < join_order:
+ if firsturi == None:
+ firsturi = joinargs[1].uri
+ self.session_play_one_join(firsturi, *joinargs)
+
# Common function for testing the most basic functionality
# of joins
- def join_common(self, joincfg0, joincfg1, do_proj, do_stats):
+ def join_common(self, joincfg0, joincfg1, do_proj, do_nested, do_stats,
+ join_order):
#self.tty('join_common(' + joincfg0 + ',' + joincfg1 + ',' +
- # str(do_proj) + ')')
+ # str(do_proj) + ',' + str(do_nested) + ',' +
+ # str(do_stats) + ',' + str(join_order) + ')')
+ closeme = []
+ joins = [] # cursors to be joined
+
self.session.create('table:join01', 'key_format=r' +
',value_format=SSi,columns=(k,v0,v1,v2)')
self.session.create('index:join01:index0','columns=(v0)')
@@ -143,7 +194,7 @@ class test_join01(wttest.WiredTigerTestCase):
# We join on index2 first, not using bloom indices.
# This defines the order that items are returned.
- # index2 is sorts multiples of 3 first (see gen_values())
+ # index2 sorts multiples of 3 first (see gen_values())
# and by using 'gt' and key 99, we'll skip multiples of 3,
# and examine primary keys 2,5,8,...,95,98,1,4,7,...,94,97.
jc = self.session.open_cursor('join:table:join01' + proj_suffix,
@@ -152,7 +203,7 @@ class test_join01(wttest.WiredTigerTestCase):
c2 = self.session.open_cursor('index:join01:index2(v1)', None, None)
c2.set_key(99) # skips all entries w/ primary key divisible by three
self.assertEquals(0, c2.search())
- self.session.join(jc, c2, 'compare=gt')
+ self.session_record_join(jc, c2, 'compare=gt', 0, joins)
# Then select all the numbers 0-99 whose string representation
# sort >= '60'.
@@ -163,41 +214,86 @@ class test_join01(wttest.WiredTigerTestCase):
c0 = self.session.open_cursor('table:join01', None, None)
c0.set_key(60)
self.assertEquals(0, c0.search())
- self.session.join(jc, c0, 'compare=ge' + joincfg0)
+ self.session_record_join(jc, c0, 'compare=ge' + joincfg0, 1, joins)
# Then select all numbers whose reverse string representation
# is in '20' < x < '40'.
c1a = self.session.open_cursor('index:join01:index1(v1)', None, None)
c1a.set_key('21')
self.assertEquals(0, c1a.search())
- self.session.join(jc, c1a, 'compare=gt' + joincfg1)
+ self.session_record_join(jc, c1a, 'compare=gt' + joincfg1, 2, joins)
c1b = self.session.open_cursor('index:join01:index1(v1)', None, None)
c1b.set_key('41')
self.assertEquals(0, c1b.search())
- self.session.join(jc, c1b, 'compare=lt' + joincfg1)
+ self.session_record_join(jc, c1b, 'compare=lt' + joincfg1, 2, joins)
# Numbers that satisfy these 3 conditions (with ordering implied by c2):
# [73, 82, 62, 83, 92].
#
# After iterating, we should be able to reset and iterate again.
+ if do_nested:
+ # To test nesting, we create two new levels of conditions:
+ #
+ # x == 72 or x == 73 or x == 82 or x == 83 or
+ # (x >= 90 and x <= 99)
+ #
+ # that will get AND-ed into our existing join. The expected
+ # result is [73, 82, 83, 92].
+ #
+ # We don't specify the projection here, it should be picked up
+ # from the 'enclosing' join.
+ nest1 = self.session.open_cursor('join:table:join01', None, None)
+ nest2 = self.session.open_cursor('join:table:join01', None, None)
+
+ nc = self.session.open_cursor('index:join01:index0', None, None)
+ nc.set_key('90')
+ self.assertEquals(0, nc.search())
+ self.session.join(nest2, nc, 'compare=ge') # joincfg left out
+ closeme.append(nc)
+
+ nc = self.session.open_cursor('index:join01:index0', None, None)
+ nc.set_key('99')
+ self.assertEquals(0, nc.search())
+ self.session.join(nest2, nc, 'compare=le')
+ closeme.append(nc)
+
+ self.session.join(nest1, nest2, "operation=or")
+
+ for val in [ '72', '73', '82', '83' ]:
+ nc = self.session.open_cursor('index:join01:index0', None, None)
+ nc.set_key(val)
+ self.assertEquals(0, nc.search())
+ self.session.join(nest1, nc, 'compare=eq,operation=or' +
+ joincfg0)
+ closeme.append(nc)
+ self.session_record_join(jc, nest1, None, 3, joins)
+
+ self.session_play_joins(joins, join_order)
+ self.iter_common(jc, do_proj, do_nested, join_order)
if do_stats:
self.stats(jc, 0)
- self.iter_common(jc, do_proj)
+ jc.reset()
+ self.iter_common(jc, do_proj, do_nested, join_order)
if do_stats:
self.stats(jc, 1)
jc.reset()
- self.iter_common(jc, do_proj)
+ self.iter_common(jc, do_proj, do_nested, join_order)
if do_stats:
self.stats(jc, 2)
jc.reset()
- self.iter_common(jc, do_proj)
+ self.iter_common(jc, do_proj, do_nested, join_order)
jc.close()
c2.close()
c1a.close()
c1b.close()
c0.close()
+ if do_nested:
+ nest1.close()
+ nest2.close()
+ for c in closeme:
+ c.close()
self.session.drop('table:join01')
# Test joins with basic functionality
@@ -207,10 +303,15 @@ class test_join01(wttest.WiredTigerTestCase):
for cfga in [ '', bloomcfg1000, bloomcfg10000 ]:
for cfgb in [ '', bloomcfg1000, bloomcfg10000 ]:
for do_proj in [ False, True ]:
- #self.tty('cfga=' + cfga +
- # ', cfgb=' + cfgb +
- # ', doproj=' + str(do_proj))
- self.join_common(cfga, cfgb, do_proj, False)
+ for do_nested in [ False, True ]:
+ for order in range(0, 4):
+ #self.tty('cfga=' + cfga +
+ # ', cfgb=' + cfgb +
+ # ', doproj=' + str(do_proj) +
+ # ', donested=' + str(do_nested) +
+ # ', order=' + str(order))
+ self.join_common(cfga, cfgb, do_proj, do_nested,
+ False, order)
def test_join_errors(self):
self.session.create('table:join01', 'key_format=r,value_format=SS'
@@ -244,7 +345,7 @@ class test_join01(wttest.WiredTigerTestCase):
# Joining a table cursor, not index
self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
lambda: self.session.join(jc, fc, 'compare=ge'),
- '/not an index or table cursor/')
+ '/must be an index, table or join cursor/')
# Joining a non positioned cursor
self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
lambda: self.session.join(jc, ic0, 'compare=ge'),
@@ -278,12 +379,6 @@ class test_join01(wttest.WiredTigerTestCase):
'/requires reference cursor be positioned/')
ic1.next()
- # The first cursor joined cannot be bloom
- self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
- lambda: self.session.join(jc, ic1,
- 'compare=ge,strategy=bloom,count=1000'),
- '/first joined cursor cannot specify strategy=bloom/')
-
# This succeeds.
self.session.join(jc, ic1, 'compare=ge'),
@@ -300,7 +395,7 @@ class test_join01(wttest.WiredTigerTestCase):
self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
lambda: self.session.join(jc, ic0,
'compare=le' + bloom_config),
- '/index cursor already used in a join/')
+ '/cursor already used in a join/')
# When joining with the same index, need compatible compares
self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
@@ -337,16 +432,16 @@ class test_join01(wttest.WiredTigerTestCase):
# Operations on the joined cursor are frozen until the join is closed.
self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
lambda: ic0.next(),
- '/index cursor is being used in a join/')
+ '/cursor is being used in a join/')
# Operations on the joined cursor are frozen until the join is closed.
self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
lambda: ic0.prev(),
- '/index cursor is being used in a join/')
+ '/cursor is being used in a join/')
self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
lambda: ic0.reset(),
- '/index cursor is being used in a join/')
+ '/cursor is being used in a join/')
# Only a small number of operations allowed on a join cursor
msg = "/Unsupported cursor/"
@@ -402,11 +497,15 @@ class test_join01(wttest.WiredTigerTestCase):
def test_stats(self):
bloomcfg1000 = ',strategy=bloom,count=1000'
bloomcfg10 = ',strategy=bloom,count=10'
- self.join_common(bloomcfg1000, bloomcfg1000, False, True)
+ self.join_common(bloomcfg1000, bloomcfg1000, False, False, True, 0)
# Intentially run with an underconfigured Bloom filter,
# statistics should pick up some false positives.
- self.join_common(bloomcfg10, bloomcfg10, False, True)
+ self.join_common(bloomcfg10, bloomcfg10, False, False, True, 0)
+
+ # Run stats with a nested join
+ self.join_common(bloomcfg1000, bloomcfg1000, False, True, True, 0)
+ self.join_common(bloomcfg1000, bloomcfg1000, False, True, True, 3)
# test statistics with a simple one index join cursor
def test_simple_stats(self):
diff --git a/test/suite/test_join07.py b/test/suite/test_join07.py
new file mode 100644
index 00000000000..36e91361329
--- /dev/null
+++ b/test/suite/test_join07.py
@@ -0,0 +1,548 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2016 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+import os, re, run
+import wiredtiger, wttest, suite_random
+from wtscenario import check_scenarios, multiply_scenarios, number_scenarios
+
+class ParseException(Exception):
+ def __init__(self, msg):
+ super(ParseException, self).__init__(msg)
+
+class Token:
+ UNKNOWN = '<unknown>'
+ NUMBER = 'Number'
+ STRING = 'String'
+ COLUMN = 'Column'
+ LPAREN = '('
+ RPAREN = ')'
+ LBRACKET = '{'
+ RBRACKET = '}'
+ COMMA = ','
+ OR = '||'
+ AND = '&&'
+ LT = '<'
+ GT = '>'
+ LE = '<='
+ GE = '>='
+ EQ = '=='
+ ATTRIBUTE = 'Attribute' # bracketed key value pair
+
+ COMPARE_OPS = [LT, GT, LE, GE, EQ]
+ COMPARATORS = [NUMBER, STRING]
+
+ def __init__(self, kind, tokenizer):
+ self.kind = kind
+ self.pos = tokenizer.off + tokenizer.pos
+ self.n = 0
+ self.s = ''
+ self.index = ''
+ self.attr_key = ''
+ self.attr_value = ''
+ self.groups = None
+
+ def __str__(self):
+ return '<Token ' + self.kind + ' at char ' + str(self.pos) + '>'
+
+class Tokenizer:
+ def __init__(self, s):
+ self.off = 0
+ self.s = s + '?' # add a char that won't match anything
+ self.pos = 0
+ self.end = len(s)
+ self.re_num = re.compile(r"(\d+)")
+ self.re_quote1 = re.compile(r"'([^']*)'")
+ self.re_quote2 = re.compile(r"\"([^\"]*)\"")
+ self.re_attr = re.compile(r"\[(\w+)=(\w+)\]")
+ self.pushed = None
+
+ def newToken(self, kind, sz):
+ t = Token(kind, self)
+ self.pos += sz
+ return t
+
+ def error(self, s):
+ raise ParseException(str(self.pos) + ': ' + s)
+
+ def matched(self, kind, repat):
+ pos = self.pos
+ match = re.match(repat, self.s[pos:])
+ if not match:
+ end = pos + 10
+ if end > self.end:
+ end = self.end
+ self.error('matching ' + kind + ' at "' +
+ self.s[pos:end] + '..."')
+ t = self.newToken(kind, match.end())
+ t.groups = match.groups()
+ t.s = self.s[pos:pos + match.end()]
+ return t
+
+ def available(self):
+ if self.pushed == None:
+ self.pushback(self.token())
+ return (self.pushed != None)
+
+ def pushback(self, token):
+ if self.pushed != None:
+ raise AssertionError('pushback more than once')
+ self.pushed = token
+
+ def peek(self):
+ token = self.token()
+ self.pushback(token)
+ return token
+
+ def scan(self):
+ while self.pos < self.end and self.s[self.pos].isspace():
+ self.pos += 1
+ return '' if self.pos >= self.end else self.s[self.pos]
+
+ def token(self):
+ if self.pushed != None:
+ ret = self.pushed
+ self.pushed = None
+ return ret
+ c = self.scan()
+ if self.pos >= self.end:
+ return None
+ lookahead = '' if self.pos + 1 >= self.end else self.s[self.pos+1]
+ #self.tty("Tokenizer.token char=" + c + ", lookahead=" + lookahead)
+ if c == "'":
+ t = self.matched(Token.STRING, self.re_quote1)
+ t.s = t.groups[0]
+ return t
+ if c == '"':
+ t = self.matched(Token.STRING, self.re_quote2)
+ t.s = t.groups[0]
+ return t
+ if c in "{}(),":
+ return self.newToken(c, 1)
+ if c == "|":
+ if lookahead != "|":
+ self.error('matching OR')
+ return self.newToken(Token.OR, 2)
+ if c == "&":
+ if lookahead != "&":
+ self.error('matching AND')
+ return self.newToken(Token.AND, 2)
+ if c in "0123456789":
+ t = self.matched(Token.NUMBER, self.re_num)
+ t.s = t.groups[0]
+ t.n = int(t.s)
+ return t
+ if c in "ABCDEFGHIJ":
+ t = self.newToken(Token.COLUMN, 1)
+ t.s = c
+ return t
+ if c == '<':
+ if lookahead == '=':
+ return self.newToken(Token.LE, 2)
+ else:
+ return self.newToken(Token.LT, 1)
+ if c == '>':
+ if lookahead == '=':
+ return self.newToken(Token.GE, 2)
+ else:
+ return self.newToken(Token.GT, 1)
+ if c in "=":
+ if lookahead != "=":
+ self.error('matching EQ')
+ return self.newToken(Token.EQ, 2)
+ if c in "[":
+ t = self.matched(Token.ATTRIBUTE, self.re_attr)
+ t.attr_key = t.groups[0]
+ t.attr_value = t.groups[1]
+ return t
+ return None
+
+ def tty(self, s):
+ wttest.WiredTigerTestCase.tty(s)
+
+# test_join07.py
+# Join interpreter
+class test_join07(wttest.WiredTigerTestCase):
+ reverseop = { '==' : '==', '<=' : '>=', '<' : '>', '>=' : '<=', '>' : '<' }
+ compareop = { '==' : 'eq', '<=' : 'le', '<' : 'lt', '>=' : 'ge',
+ '>' : 'gt' }
+ columnmult = { 'A' : 1, 'B' : 2, 'C' : 3, 'D' : 4, 'E' : 5,
+ 'F' : 6, 'G' : 7, 'H' : 8, 'I' : 9, 'J' : 10 }
+
+ extractscen = [
+ ('extractor', dict(extractor=True)),
+ ('noextractor', dict(extractor=False))
+ ]
+
+ scenarios = number_scenarios(extractscen)
+
+ # Return the wiredtiger_open extension argument for a shared library.
+ def extensionArg(self, exts):
+ extfiles = []
+ for ext in exts:
+ (dirname, name, libname) = ext
+ if name != None and name != 'none':
+ testdir = os.path.dirname(__file__)
+ extdir = os.path.join(run.wt_builddir, 'ext', dirname)
+ extfile = os.path.join(
+ extdir, name, '.libs', 'libwiredtiger_' + libname + '.so')
+ if not os.path.exists(extfile):
+ self.skipTest('extension "' + extfile + '" not built')
+ if not extfile in extfiles:
+ extfiles.append(extfile)
+ if len(extfiles) == 0:
+ return ''
+ else:
+ return ',extensions=["' + '","'.join(extfiles) + '"]'
+
+ # Override WiredTigerTestCase, we have extensions.
+ def setUpConnectionOpen(self, dir):
+ extarg = self.extensionArg([('extractors', 'csv', 'csv_extractor')])
+ connarg = 'create,error_prefix="{0}: ",{1}'.format(
+ self.shortid(), extarg)
+ conn = self.wiredtiger_open(dir, connarg)
+ self.pr(`conn`)
+ return conn
+
+ def expect(self, token, expected):
+ if token == None or token.kind not in expected:
+ self.err(token, 'expected one of: ' + str(expected))
+ return token
+
+ def err(self, token, msg):
+ self.assertTrue(False, 'ERROR at token ' + str(token) + ': ' + msg)
+
+ def gen_key(self, i):
+ if self.keyformat == 'S':
+ return [ 'key%06d' % i ] # zero pad so it sorts expectedly
+ else:
+ return [ i ]
+
+ def gen_values(self, i):
+ s = ""
+ ret = []
+ for x in range(1, 11):
+ v = (i * x) % self.N
+ if x <= 5:
+ ret.append(v)
+ else:
+ ret.append(str(v))
+ if s != "":
+ s += ","
+ s += str(v)
+ ret.insert(0, s)
+ return ret
+
+ def iterate(self, jc, mbr):
+ mbr = set(mbr) # we need a mutable set
+ gotkeys = []
+ #self.tty('iteration expects ' + str(len(mbr)) +
+ # ' entries: ' + str(mbr))
+ while jc.next() == 0:
+ [k] = jc.get_keys()
+ values = jc.get_values()
+ if self.keyformat == 'S':
+ i = int(str(k[3:]))
+ else:
+ i = k
+ #self.tty('GOT key=' + str(k) + ', values=' + str(values))
+
+ # Duplicates may be returned when the disjunctions are used,
+ # so we ignore them.
+ if not i in gotkeys:
+ self.assertEquals(self.gen_values(i), values)
+ if not i in mbr:
+ self.tty('ERROR: result ' + str(i) + ' is not in: ' +
+ str(mbr))
+ self.assertTrue(i in mbr)
+ mbr.remove(i)
+ gotkeys.append(i)
+ self.assertEquals(0, len(mbr))
+
+ def token_literal(self, token):
+ if token.kind == Token.STRING:
+ return token.s
+ elif token.kind == Token.NUMBER:
+ return token.n
+
+ def idx_sim(self, x, mult, isstr):
+ if isstr:
+ return str(int(x) * mult % self.N)
+ else:
+ return (x * mult % self.N)
+
+ def mkmbr(self, expr):
+ return frozenset([x for x in self.allN if expr(x)])
+
+ def join_one_side(self, jc, coltok, littok, optok, conjunction,
+ isright, mbr):
+ idxname = 'index:join07:' + coltok.s
+ cursor = self.session.open_cursor(idxname, None, None)
+ jc.cursors.append(cursor)
+ literal = self.token_literal(littok)
+ cursor.set_key(literal)
+ searchret = cursor.search()
+ if searchret != 0:
+ self.tty('ERROR: cannot find value ' + str(literal) +
+ ' in ' + idxname)
+ self.assertEquals(0, searchret)
+ op = optok.kind
+ if not isright:
+ op = self.reverseop[op]
+ mult = self.columnmult[coltok.s]
+ config = 'compare=' + self.compareop[op] + ',operation=' + \
+ ('and' if conjunction else 'or')
+ if hasattr(coltok, 'bloom'):
+ config += ',strategy=bloom,count=' + str(coltok.bloom)
+ #self.tty('join(jc, cursor=' + str(literal) + ', ' + config)
+ self.session.join(jc, cursor, config)
+ isstr = type(literal) is str
+ if op == '==':
+ tmbr = self.mkmbr(lambda x: self.idx_sim(x, mult, isstr) == literal)
+ elif op == '<=':
+ tmbr = self.mkmbr(lambda x: self.idx_sim(x, mult, isstr) <= literal)
+ elif op == '<':
+ tmbr = self.mkmbr(lambda x: self.idx_sim(x, mult, isstr) < literal)
+ elif op == '>=':
+ tmbr = self.mkmbr(lambda x: self.idx_sim(x, mult, isstr) >= literal)
+ elif op == '>':
+ tmbr = self.mkmbr(lambda x: self.idx_sim(x, mult, isstr) > literal)
+ if conjunction:
+ mbr = mbr.intersection(tmbr)
+ else:
+ mbr = mbr.union(tmbr)
+ return mbr
+
+ def parse_join(self, jc, tokenizer, conjunction, mbr):
+ left = None
+ right = None
+ leftop = None
+ rightop = None
+ col = None
+ token = tokenizer.token()
+ if token.kind == Token.LPAREN:
+ subjc = self.session.open_cursor('join:table:join07', None, None)
+ jc.cursors.append(subjc)
+ submbr = self.parse_junction(subjc, tokenizer)
+ config = 'operation=' + ('and' if conjunction else 'or')
+ self.session.join(jc, subjc, config)
+ if conjunction:
+ mbr = mbr.intersection(submbr)
+ else:
+ mbr = mbr.union(submbr)
+ return mbr
+ if token.kind in Token.COMPARATORS:
+ left = token
+ leftop = self.expect(tokenizer.token(), Token.COMPARE_OPS)
+ token = tokenizer.token()
+ col = self.expect(token, [Token.COLUMN])
+ token = tokenizer.token()
+ if token.kind in Token.ATTRIBUTE:
+ tokenizer.pushback(token)
+ self.parse_column_attributes(tokenizer, col)
+ token = tokenizer.token()
+ if token.kind in Token.COMPARE_OPS:
+ rightop = token
+ right = self.expect(tokenizer.token(), Token.COMPARATORS)
+ token = tokenizer.token()
+ tokenizer.pushback(token)
+
+ # Now we have everything we need to do a join.
+ if left != None:
+ mbr = self.join_one_side(jc, col, left, leftop, conjunction,
+ False, mbr)
+ if right != None:
+ mbr = self.join_one_side(jc, col, right, rightop, conjunction,
+ True, mbr)
+ return mbr
+
+ # Parse a set of joins, grouped by && or ||
+ def parse_junction(self, jc, tokenizer):
+ jc.cursors = []
+
+ # Take a peek at the tokenizer's stream to see if we
+ # have a conjunction or disjunction
+ token = tokenizer.peek()
+ s = tokenizer.s[token.pos:]
+ (andpos, orpos) = self.find_nonparen(s, ['&', '|'])
+ if orpos >= 0 and (andpos < 0 or orpos < andpos):
+ conjunction = False
+ mbr = frozenset()
+ else:
+ conjunction = True
+ mbr = frozenset(self.allN)
+
+ while tokenizer.available():
+ mbr = self.parse_join(jc, tokenizer, conjunction, mbr)
+ token = tokenizer.token()
+ if token != None:
+ if token.kind == Token.OR:
+ self.assertTrue(not conjunction)
+ elif token.kind == Token.AND:
+ self.assertTrue(conjunction)
+ elif token.kind == Token.RPAREN:
+ break
+ else:
+ self.err(token, 'unexpected token')
+ return mbr
+
+ def parse_attributes(self, tokenizer):
+ attributes = []
+ token = tokenizer.token()
+ while token != None and token.kind == Token.ATTRIBUTE:
+ attributes.append(token)
+ token = tokenizer.token()
+ tokenizer.pushback(token)
+ return attributes
+
+ # Find a set of chars that aren't within parentheses.
+ # For this simple language, we don't allow parentheses in quoted literals.
+ def find_nonparen(self, s, matchlist):
+ pos = 0
+ end = len(s)
+ nmatch = len(matchlist)
+ nfound = 0
+ result = [-1 for i in range(0, nmatch)]
+ parennest = 0
+ while pos < end and nfound < nmatch:
+ c = s[pos]
+ if c == '(':
+ parennest += 1
+ elif c == ')':
+ parennest -= 1
+ if parennest < 0:
+ break
+ elif parennest == 0 and c in matchlist:
+ m = matchlist.index(c)
+ if result[m] < 0:
+ result[m] = pos
+ nfound += 1
+ pos += 1
+ return result
+
+ def parse_toplevel(self, jc, tokenizer):
+ return self.parse_junction(jc, tokenizer)
+
+ def parse_toplevel_attributes(self, tokenizer):
+ for attrtoken in self.parse_attributes(tokenizer):
+ key = attrtoken.attr_key
+ value = attrtoken.attr_value
+ #self.tty('ATTR:' + str([key,value]))
+ if key == 'N':
+ self.N = int(value)
+ elif key == 'key':
+ self.keyformat = value
+ else:
+ tokenizer.error('bad attribute key: ' + str(key))
+
+ def parse_column_attributes(self, tokenizer, c):
+ for attrtoken in self.parse_attributes(tokenizer):
+ key = attrtoken.attr_key
+ value = attrtoken.attr_value
+ #self.tty('ATTR:' + str([key,value]))
+ if key == 'bloom':
+ c.bloom = int(value)
+ else:
+ tokenizer.error('bad column attribute key: ' + str(key))
+
+ def close_cursors(self, jc):
+ jc.close()
+ for c in jc.cursors:
+ if c.uri[0:5] == 'join:':
+ self.close_cursors(c)
+ else:
+ c.close()
+
+ def interpret(self, s):
+ #self.tty('INTERPRET: ' + s)
+ self.N = 1000
+ self.keyformat = "r"
+ self.keycols = 'k'
+
+ # Grab attributes before creating anything, as some attributes
+ # may override needed parameters.
+ tokenizer = Tokenizer(s)
+ self.parse_toplevel_attributes(tokenizer)
+ self.allN = range(1, self.N + 1)
+
+ self.session.create('table:join07', 'key_format=' + self.keyformat +
+ ',value_format=SiiiiiSSSSS,' +
+ 'columns=(' + self.keycols +
+ ',S,A,B,C,D,E,F,G,H,I,J)')
+ mdfieldnum = 0
+ mdformat = 'i'
+ mdconfig = ''
+ for colname in [ 'A','B','C','D','E','F','G','H','I','J' ]:
+ if self.extractor:
+ if colname == 'F':
+ mdformat = 'S'
+ mdconfig = 'app_metadata={"format" : "%s","field" : "%d"}' % \
+ (mdformat, mdfieldnum)
+ config = 'extractor=csv,key_format=%s' % mdformat
+ mdfieldnum += 1
+ else:
+ config = 'columns=(%s)' % colname
+ self.session.create('index:join07:%s' % colname,
+ '%s,%s' % (config, mdconfig))
+ c = self.session.open_cursor('table:join07', None, None)
+ for i in self.allN:
+ c.set_key(*self.gen_key(i))
+ c.set_value(*self.gen_values(i))
+ c.insert()
+ c.close()
+
+ jc = self.session.open_cursor('join:table:join07', None, None)
+ mbr = self.parse_toplevel(jc, tokenizer)
+ self.iterate(jc, mbr)
+
+ self.close_cursors(jc)
+ self.session.drop('table:join07')
+
+ def test_join_string(self):
+ self.interpret("[N=1000][key=r] 7 < A <= 500 && B < 150 && C > 17")
+ self.interpret("[N=1001][key=r] 7 < A <= 500 && B < 150 && F > '234'")
+ self.interpret("[N=10000][key=r] 7 < A <= 500 && B < 150 && " +
+ "(F > '234' || G < '100')")
+ self.interpret("[N=7919][key=r](7 < A <= 9)&&(F > '234')")
+ self.interpret("[N=1000][key=S](A>=0 && A<0)||(A>999)")
+ self.interpret("[N=2000][key=S](A>=0 && A<0)||(A>1999)")
+ self.interpret("(7<A<=10 && B < 150)||(B>998)")
+ self.interpret("(7<A<=10 && B < 150)||(J=='990')")
+ clause1 = "(7 < A <= 500 && B < 150)"
+ clause2 = "(F > '234' || G < '100')"
+ self.interpret("[N=1000][key=r]" + clause1 + "&&" + clause2)
+ self.interpret("(7<A<=10)||(B>994||C<12)")
+ self.interpret("(7<A<=10 && B < 150)||(B>996||C<6)")
+ self.interpret("[N=1000][key=r]" + clause2 + "||" + clause1)
+ self.interpret("[N=1000][key=r]" + clause1 + "||" + clause2)
+ self.interpret("[N=1000][key=S]" + clause2 + "&&" + clause1)
+ clause1 = "(7 < A <= 500 && B[bloom=300] < 150)"
+ clause2 = "(F[bloom=500] > '234' || G[bloom=20] < '100')"
+ self.interpret("[N=1000][key=S]" + clause1 + "&&" + clause2)
+
+if __name__ == '__main__':
+ wttest.run()