summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Cahill <michael.cahill@mongodb.com>2015-11-19 11:47:53 +1100
committerMichael Cahill <michael.cahill@mongodb.com>2015-11-19 11:47:53 +1100
commit4368d3975e0baa53508269f3fb2d712ecab7a584 (patch)
tree16c0f97519fce4eed13b48d979ec40c24cb29e67
parenta72ddb75ae212726c3891cb7ec0e72a5f3b225f0 (diff)
parent2bf052b61603eb0d6c7592c1bd3e0723495451be (diff)
downloadmongo-4368d3975e0baa53508269f3fb2d712ecab7a584.tar.gz
Merge pull request #2289 from wiredtiger/wt-1315-join-cursor
WT-1315 Cursor join implementation
-rw-r--r--build_win/filelist.win1
-rw-r--r--dist/api_data.py27
-rw-r--r--dist/filelist1
-rw-r--r--dist/s_define.list1
-rw-r--r--dist/s_funcs.list2
-rw-r--r--dist/s_string.ok12
-rw-r--r--dist/stat.py44
-rw-r--r--dist/stat_data.py15
-rw-r--r--examples/c/ex_schema.c39
-rw-r--r--examples/java/com/wiredtiger/examples/ex_schema.java43
-rw-r--r--ext/extractors/csv/csv_extractor.c45
-rw-r--r--lang/java/java_doc.i1
-rw-r--r--src/bloom/bloom.c41
-rw-r--r--src/config/config_def.c18
-rw-r--r--src/conn/conn_stat.c17
-rw-r--r--src/cursor/cur_dump.c2
-rw-r--r--src/cursor/cur_index.c54
-rw-r--r--src/cursor/cur_join.c1049
-rw-r--r--src/cursor/cur_stat.c146
-rw-r--r--src/cursor/cur_table.c54
-rw-r--r--src/docs/cursor-join.dox19
-rw-r--r--src/docs/cursors.dox1
-rw-r--r--src/docs/data-sources.dox4
-rw-r--r--src/docs/programming.dox1
-rw-r--r--src/docs/spell.ok3
-rw-r--r--src/include/api.h16
-rw-r--r--src/include/config.h45
-rw-r--r--src/include/cursor.h74
-rw-r--r--src/include/cursor.i64
-rw-r--r--src/include/extern.h22
-rw-r--r--src/include/stat.h10
-rw-r--r--src/include/wiredtiger.in79
-rw-r--r--src/include/wt_internal.h12
-rw-r--r--src/lsm/lsm_stat.c6
-rw-r--r--src/packing/pack_impl.c105
-rw-r--r--src/schema/schema_stat.c8
-rw-r--r--src/session/session_api.c164
-rw-r--r--src/support/stat.c62
-rw-r--r--test/suite/test_join01.py396
-rw-r--r--test/suite/test_join02.py290
-rw-r--r--test/suite/test_join03.py158
-rw-r--r--test/suite/test_schema05.py7
42 files changed, 2975 insertions, 183 deletions
diff --git a/build_win/filelist.win b/build_win/filelist.win
index 9d0ee10d305..af6ddf98da9 100644
--- a/build_win/filelist.win
+++ b/build_win/filelist.win
@@ -72,6 +72,7 @@ src/cursor/cur_ds.c
src/cursor/cur_dump.c
src/cursor/cur_file.c
src/cursor/cur_index.c
+src/cursor/cur_join.c
src/cursor/cur_json.c
src/cursor/cur_log.c
src/cursor/cur_metadata.c
diff --git a/dist/api_data.py b/dist/api_data.py
index a86647a27f2..f58a48b4a0b 100644
--- a/dist/api_data.py
+++ b/dist/api_data.py
@@ -772,6 +772,33 @@ methods = {
type='boolean'),
]),
+'WT_SESSION.join' : Method([
+ Config('compare', '"eq"', r'''
+ modifies the set of items to be returned so that the index key
+ satisfies the given comparison relative to the key set in this
+ cursor''',
+ choices=['eq', 'ge', 'gt', 'le', 'lt']),
+ Config('count', '', r'''
+ set an approximate count of the elements that would be included in
+ the join. This is used in sizing the bloom filter, and also influences
+ evaluation order for cursors in the join. When the count is equal
+ for multiple bloom filters in a composition of joins, the bloom
+ filter may be shared''',
+ type='int'),
+ Config('bloom_bit_count', '16', r'''
+ the number of bits used per item for the bloom filter''',
+ min='2', max='1000'),
+ Config('bloom_hash_count', '8', r'''
+ the number of hash values per item for the bloom filter''',
+ min='2', max='100'),
+ Config('strategy', '', r'''
+ when set to bloom, a bloom filter is created and populated for
+ this index. This has an up front cost but may reduce the number
+ of accesses to the main table when iterating the joined cursor.
+ The bloom setting requires that count be set''',
+ choices=['bloom', 'default']),
+]),
+
'WT_SESSION.log_flush' : Method([
Config('sync', 'on', r'''
forcibly flush the log and wait for it to achieve the synchronization
diff --git a/dist/filelist b/dist/filelist
index f33f0e9a962..52af87c2a68 100644
--- a/dist/filelist
+++ b/dist/filelist
@@ -72,6 +72,7 @@ src/cursor/cur_ds.c
src/cursor/cur_dump.c
src/cursor/cur_file.c
src/cursor/cur_index.c
+src/cursor/cur_join.c
src/cursor/cur_json.c
src/cursor/cur_log.c
src/cursor/cur_metadata.c
diff --git a/dist/s_define.list b/dist/s_define.list
index a2b86610755..8b0d9a0bdcd 100644
--- a/dist/s_define.list
+++ b/dist/s_define.list
@@ -4,6 +4,7 @@ API_CALL
API_CALL_NOCONF
API_SESSION_INIT
FLD_MASK
+JOINABLE_CURSOR_CALL_CHECK
LF_MASK
LLONG_MAX
LLONG_MIN
diff --git a/dist/s_funcs.list b/dist/s_funcs.list
index 3b5690a4bc2..ed6cf43bb2f 100644
--- a/dist/s_funcs.list
+++ b/dist/s_funcs.list
@@ -27,6 +27,8 @@ __wt_log_scan
__wt_nlpo2
__wt_nlpo2_round
__wt_print_huffman_code
+__wt_stat_join_aggregate
+__wt_stat_join_clear_all
__wt_try_readlock
wiredtiger_config_parser_open
wiredtiger_config_validate
diff --git a/dist/s_string.ok b/dist/s_string.ok
index 21cd360c144..7de139f6a40 100644
--- a/dist/s_string.ok
+++ b/dist/s_string.ok
@@ -221,6 +221,7 @@ OUTBUFF
OVFL
ObWgfvgw
Obama
+Outfmt
PARAM
POSIX
PREDEFINE
@@ -351,6 +352,7 @@ allocfile
allocsize
amd
ao
+ap
api
arg
argc
@@ -421,6 +423,7 @@ checksums
chk
chongo
cip
+cjoin
ckpt
ckptfrag
ckptlist
@@ -464,6 +467,7 @@ curdump
curextract
curfile
curindex
+curjoin
curlog
curmetadata
cursoring
@@ -510,6 +514,7 @@ dhandles
difftime
dir
dirlist
+disjunction
dlclose
dlh
dll
@@ -541,6 +546,7 @@ enqueue
enqueued
env
eof
+eq
equalp
errhandler
errno
@@ -593,6 +599,7 @@ ftruncate
func
gcc
gdb
+ge
getenv
getline
getone
@@ -608,6 +615,7 @@ goesc
gostring
gostruct
goutf
+gt
hashval
havesize
hdr
@@ -633,6 +641,7 @@ indirects
indx
infeasible
inflateInit
+infmt
init
initn
initsize
@@ -651,6 +660,7 @@ io
ip
islocked
ispo
+iter
iteratively
jnr
jrx
@@ -669,6 +679,7 @@ latencies
lbrace
lbracket
ld
+le
len
lenp
level's
@@ -715,6 +726,7 @@ mem
memalign
membar
memcpy
+memget
memmove
memset
memsize
diff --git a/dist/stat.py b/dist/stat.py
index c9684665a53..d62fda3fcb9 100644
--- a/dist/stat.py
+++ b/dist/stat.py
@@ -5,7 +5,7 @@ import re, string, sys, textwrap
from dist import compare_srcfile
# Read the source files.
-from stat_data import groups, dsrc_stats, connection_stats
+from stat_data import groups, dsrc_stats, connection_stats, join_stats
def print_struct(title, name, base, stats):
'''Print the structures for the stat.h file.'''
@@ -35,9 +35,17 @@ for line in open('../src/include/stat.h', 'r'):
print_struct(
'connections', 'connection', 1000, connection_stats)
print_struct('data sources', 'dsrc', 2000, dsrc_stats)
+ print_struct('join cursors', 'join', 3000, join_stats)
f.close()
compare_srcfile(tmp_file, '../src/include/stat.h')
+def print_defines_one(capname, base, stats):
+ for v, l in enumerate(stats, base):
+ f.write('/*! %s */\n' % '\n * '.join(textwrap.wrap(l.desc, 70)))
+ f.write('#define\tWT_STAT_' + capname + '_' + l.name.upper() + "\t" *
+ max(1, 6 - int((len('WT_STAT_' + capname + '_' + l.name)) / 8)) +
+ str(v) + '\n')
+
def print_defines():
'''Print the #defines for the wiredtiger.in file.'''
f.write('''
@@ -51,11 +59,7 @@ def print_defines():
* @{
*/
''')
- for v, l in enumerate(connection_stats, 1000):
- f.write('/*! %s */\n' % '\n * '.join(textwrap.wrap(l.desc, 70)))
- f.write('#define\tWT_STAT_CONN_' + l.name.upper() + "\t" *
- max(1, 6 - int((len('WT_STAT_CONN_' + l.name)) / 8)) +
- str(v) + '\n')
+ print_defines_one('CONN', 1000, connection_stats)
f.write('''
/*!
* @}
@@ -64,11 +68,16 @@ def print_defines():
* @{
*/
''')
- for v, l in enumerate(dsrc_stats, 2000):
- f.write('/*! %s */\n' % '\n * '.join(textwrap.wrap(l.desc, 70)))
- f.write('#define\tWT_STAT_DSRC_' + l.name.upper() + "\t" *
- max(1, 6 - int((len('WT_STAT_DSRC_' + l.name)) / 8)) +
- str(v) + '\n')
+ print_defines_one('DSRC', 2000, dsrc_stats)
+ f.write('''
+/*!
+ * @}
+ * @name Statistics for join cursors
+ * @anchor statistics_join
+ * @{
+ */
+''')
+ print_defines_one('JOIN', 3000, join_stats)
f.write('/*! @} */\n')
# Update the #defines in the wiredtiger.in file.
@@ -98,10 +107,12 @@ def print_func(name, handle, list):
f.write('};\n')
f.write('''
-const char *
-__wt_stat_''' + name + '''_desc(int slot)
+int
+__wt_stat_''' + name + '''_desc(WT_CURSOR_STAT *cst, int slot, const char **p)
{
-\treturn (__stats_''' + name + '''_desc[slot]);
+\tWT_UNUSED(cst);
+\t*p = __stats_''' + name + '''_desc[slot];
+\treturn (0);
}
''')
@@ -113,7 +124,8 @@ __wt_stat_''' + name + '_init_single(WT_' + name.upper() + '''_STATS *stats)
}
''')
- f.write('''
+ if handle != None:
+ f.write('''
void
__wt_stat_''' + name + '_init(' + handle + ''' *handle)
{
@@ -205,6 +217,7 @@ f.write('#include "wt_internal.h"\n')
print_func('dsrc', 'WT_DATA_HANDLE', dsrc_stats)
print_func('connection', 'WT_CONNECTION_IMPL', connection_stats)
+print_func('join', None, join_stats)
f.close()
compare_srcfile(tmp_file, '../src/support/stat.c')
@@ -224,6 +237,7 @@ for l in sorted(dsrc_stats):
scale_info += ' \'' + l.desc + '\',\n'
if 'no_clear' in l.flags:
clear_info += ' \'' + l.desc + '\',\n'
+# No join statistics can be captured in wtstats
scale_info += ']\n'
clear_info += ']\n'
prefix_info = 'prefix_list = [\n'
diff --git a/dist/stat_data.py b/dist/stat_data.py
index 77e3fee9646..3511ac1efc9 100644
--- a/dist/stat_data.py
+++ b/dist/stat_data.py
@@ -67,6 +67,10 @@ class DhandleStat(Stat):
prefix = 'data-handle'
def __init__(self, name, desc, flags=''):
Stat.__init__(self, name, DhandleStat.prefix, desc, flags)
+class JoinStat(Stat):
+ prefix = '' # prefix is inserted dynamically
+ def __init__(self, name, desc, flags=''):
+ Stat.__init__(self, name, JoinStat.prefix, desc, flags)
class LogStat(Stat):
prefix = 'log'
def __init__(self, name, desc, flags=''):
@@ -542,3 +546,14 @@ dsrc_stats = [
]
dsrc_stats = sorted(dsrc_stats, key=attrgetter('name'))
+
+##########################################
+# Cursor Join statistics
+##########################################
+join_stats = [
+ JoinStat('accesses', 'accesses'),
+ JoinStat('actual_count', 'actual count of items'),
+ JoinStat('bloom_false_positive', 'bloom filter false positives'),
+]
+
+join_stats = sorted(join_stats, key=attrgetter('name'))
diff --git a/examples/c/ex_schema.c b/examples/c/ex_schema.c
index 8b74500acd3..dabb568129e 100644
--- a/examples/c/ex_schema.c
+++ b/examples/c/ex_schema.c
@@ -49,12 +49,16 @@ typedef struct {
static POP_RECORD pop_data[] = {
{ "AU", 1900, 4000000 },
+ { "AU", 1950, 8267337 },
{ "AU", 2000, 19053186 },
{ "CAN", 1900, 5500000 },
+ { "CAN", 1950, 14011422 },
{ "CAN", 2000, 31099561 },
{ "UK", 1900, 369000000 },
+ { "UK", 1950, 50127000 },
{ "UK", 2000, 59522468 },
{ "USA", 1900, 76212168 },
+ { "USA", 1950, 150697361 },
{ "USA", 2000, 301279593 },
{ "", 0, 0 }
};
@@ -65,7 +69,7 @@ main(void)
{
POP_RECORD *p;
WT_CONNECTION *conn;
- WT_CURSOR *cursor;
+ WT_CURSOR *cursor, *cursor2, *join_cursor;
WT_SESSION *session;
const char *country;
uint64_t recno, population;
@@ -321,6 +325,39 @@ main(void)
/*! [Access only the index] */
ret = cursor->close(cursor);
+ /*! [Join cursors] */
+ /* Open cursors needed by the join. */
+ ret = session->open_cursor(session,
+ "join:table:poptable", NULL, NULL, &join_cursor);
+ ret = session->open_cursor(session,
+ "index:poptable:country", NULL, NULL, &cursor);
+ ret = session->open_cursor(session,
+ "index:poptable:immutable_year", NULL, NULL, &cursor2);
+
+ /* select values WHERE country == "AU" AND year > 1900 */
+ cursor->set_key(cursor, "AU\0\0\0");
+ ret = cursor->search(cursor);
+ ret = session->join(session, join_cursor, cursor,
+ "compare=eq,count=10");
+ cursor2->set_key(cursor2, (uint16_t)1900);
+ ret = cursor2->search(cursor2);
+ ret = session->join(session, join_cursor, cursor2,
+ "compare=gt,count=10,strategy=bloom");
+
+ /* List the values that are joined */
+ while ((ret = join_cursor->next(join_cursor)) == 0) {
+ ret = join_cursor->get_key(join_cursor, &recno);
+ ret = join_cursor->get_value(join_cursor, &country, &year,
+ &population);
+ printf("ID %" PRIu64, recno);
+ printf(": country %s, year %u, population %" PRIu64 "\n",
+ country, year, population);
+ }
+ /*! [Join cursors] */
+ ret = join_cursor->close(join_cursor);
+ ret = cursor2->close(cursor2);
+ ret = cursor->close(cursor);
+
ret = conn->close(conn, NULL);
return (ret);
diff --git a/examples/java/com/wiredtiger/examples/ex_schema.java b/examples/java/com/wiredtiger/examples/ex_schema.java
index 5b849ecf430..ba15db62a14 100644
--- a/examples/java/com/wiredtiger/examples/ex_schema.java
+++ b/examples/java/com/wiredtiger/examples/ex_schema.java
@@ -57,12 +57,16 @@ public class ex_schema {
popData = new ArrayList<PopRecord>();
popData.add(new PopRecord("AU", (short)1900, 4000000 ));
+ popData.add(new PopRecord("AU", (short)1950, 8267337 ));
popData.add(new PopRecord("AU", (short)2000, 19053186 ));
popData.add(new PopRecord("CAN", (short)1900, 5500000 ));
+ popData.add(new PopRecord("CAN", (short)1950, 14011422 ));
popData.add(new PopRecord("CAN", (short)2000, 31099561 ));
popData.add(new PopRecord("UK", (short)1900, 369000000 ));
+ popData.add(new PopRecord("UK", (short)1950, 50127000 ));
popData.add(new PopRecord("UK", (short)2000, 59522468 ));
popData.add(new PopRecord("USA", (short)1900, 76212168 ));
+ popData.add(new PopRecord("USA", (short)1950, 150697361 ));
popData.add(new PopRecord("USA", (short)2000, 301279593 ));
};
/*! [schema declaration] */
@@ -72,7 +76,7 @@ public class ex_schema {
throws WiredTigerException
{
Connection conn;
- Cursor cursor;
+ Cursor cursor, cursor2, join_cursor;
Session session;
String country;
long recno, population;
@@ -206,7 +210,7 @@ public class ex_schema {
* for a particular country.
*/
cursor = session.open_cursor("colgroup:poptable:main", null, null);
- cursor.putKeyLong(2);
+ cursor.putKeyRecord(2);
if ((ret = cursor.search()) == 0) {
country = cursor.getValueString();
year = cursor.getValueShort();
@@ -223,7 +227,7 @@ public class ex_schema {
* population of a particular country.
*/
cursor = session.open_cursor("colgroup:poptable:population", null, null);
- cursor.putKeyLong(2);
+ cursor.putKeyRecord(2);
if ((ret = cursor.search()) == 0) {
population = cursor.getValueLong();
System.out.println("ID 2: population " + population);
@@ -335,6 +339,39 @@ public class ex_schema {
/*! [Access only the index] */
ret = cursor.close();
+ /*! [Join cursors] */
+ /* Open cursors needed by the join. */
+ join_cursor = session.open_cursor(
+ "join:table:poptable", null, null);
+ cursor = session.open_cursor(
+ "index:poptable:country", null, null);
+ cursor2 = session.open_cursor(
+ "index:poptable:immutable_year", null, null);
+
+ /* select values WHERE country == "AU" AND year > 1900 */
+ cursor.putKeyString("AU");
+ ret = cursor.search();
+ session.join(join_cursor, cursor, "compare=eq,count=10");
+ cursor2.putKeyShort((short)1900);
+ ret = cursor2.search();
+ session.join(join_cursor, cursor2,
+ "compare=gt,count=10,strategy=bloom");
+
+ /* List the values that are joined */
+ while ((ret = join_cursor.next()) == 0) {
+ recno = join_cursor.getKeyRecord();
+ country = join_cursor.getValueString();
+ year = join_cursor.getValueShort();
+ population = join_cursor.getValueLong();
+ System.out.print("ID " + recno);
+ System.out.println( ": country " + country + ", year " + year +
+ ", population " + population);
+ }
+ /*! [Join cursors] */
+ ret = join_cursor.close();
+ ret = cursor2.close();
+ ret = cursor.close();
+
ret = conn.close(null);
return (ret);
diff --git a/ext/extractors/csv/csv_extractor.c b/ext/extractors/csv/csv_extractor.c
index 34b8d7c7c64..8d50cc7ec5d 100644
--- a/ext/extractors/csv/csv_extractor.c
+++ b/ext/extractors/csv/csv_extractor.c
@@ -49,7 +49,8 @@
typedef struct {
WT_EXTRACTOR extractor; /* Must come first */
WT_EXTENSION_API *wt_api; /* Extension API */
- int field_num; /* Field to extract */
+ int field; /* Field to extract */
+ int format_isnum; /* Field contents are numeric */
} CSV_EXTRACTOR;
/*
@@ -61,15 +62,15 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session,
const WT_ITEM *key, const WT_ITEM *value, WT_CURSOR *result_cursor)
{
char *copy, *p, *pend, *valstr;
- const CSV_EXTRACTOR *cvs_extractor;
- int i, ret;
+ const CSV_EXTRACTOR *csv_extractor;
+ int i, ret, val;
size_t len;
WT_EXTENSION_API *wtapi;
(void)key; /* Unused parameters */
- cvs_extractor = (const CSV_EXTRACTOR *)extractor;
- wtapi = cvs_extractor->wt_api;
+ csv_extractor = (const CSV_EXTRACTOR *)extractor;
+ wtapi = csv_extractor->wt_api;
/* Unpack the value. */
if ((ret = wtapi->struct_unpack(wtapi,
@@ -78,11 +79,11 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session,
p = valstr;
pend = strchr(p, ',');
- for (i = 0; i < cvs_extractor->field_num && pend != NULL; i++) {
+ for (i = 0; i < csv_extractor->field && pend != NULL; i++) {
p = pend + 1;
pend = strchr(p, ',');
}
- if (i == cvs_extractor->field_num) {
+ if (i == csv_extractor->field) {
if (pend == NULL)
pend = p + strlen(p);
/*
@@ -95,7 +96,12 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session,
return (errno);
strncpy(copy, p, len);
copy[len] = '\0';
- result_cursor->set_key(result_cursor, copy);
+ if (csv_extractor->format_isnum) {
+ if ((val = atoi(copy)) < 0)
+ return (EINVAL);
+ result_cursor->set_key(result_cursor, val);
+ } else
+ result_cursor->set_key(result_cursor, copy);
ret = result_cursor->insert(result_cursor);
free(copy);
if (ret != 0)
@@ -107,7 +113,7 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session,
/*
* csv_customize --
* The customize function creates a customized extractor,
- * needed to save the field number.
+ * needed to save the field number and format.
*/
static int
csv_customize(WT_EXTRACTOR *extractor, WT_SESSION *session,
@@ -115,20 +121,37 @@ csv_customize(WT_EXTRACTOR *extractor, WT_SESSION *session,
{
const CSV_EXTRACTOR *orig;
CSV_EXTRACTOR *csv_extractor;
+ WT_CONFIG_ITEM field, format;
+ WT_CONFIG_PARSER *parser;
+ WT_EXTENSION_API *wtapi;
+ int ret;
long field_num;
(void)session; /* Unused parameters */
(void)uri; /* Unused parameters */
orig = (const CSV_EXTRACTOR *)extractor;
- field_num = strtol(appcfg->str, NULL, 10);
+ wtapi = orig->wt_api;
+ if ((ret = wtapi->config_parser_open(wtapi, session, appcfg->str,
+ appcfg->len, &parser)) != 0)
+ return (ret);
+ if ((ret = parser->get(parser, "field", &field)) != 0 ||
+ (ret = parser->get(parser, "format", &format)) != 0) {
+ if (ret == WT_NOTFOUND)
+ return (EINVAL);
+ return (ret);
+ }
+ field_num = strtol(field.str, NULL, 10);
if (field_num < 0 || field_num > INT_MAX)
return (EINVAL);
+ if (format.len != 1 || (format.str[0] != 'S' && format.str[0] != 'i'))
+ return (EINVAL);
if ((csv_extractor = calloc(1, sizeof(CSV_EXTRACTOR))) == NULL)
return (errno);
*csv_extractor = *orig;
- csv_extractor->field_num = (int)field_num;
+ csv_extractor->field = field_num;
+ csv_extractor->format_isnum = (format.str[0] == 'i');
*customp = (WT_EXTRACTOR *)csv_extractor;
return (0);
}
diff --git a/lang/java/java_doc.i b/lang/java/java_doc.i
index 75c14dbfe8f..17317ab875b 100644
--- a/lang/java/java_doc.i
+++ b/lang/java/java_doc.i
@@ -33,6 +33,7 @@ COPYDOC(__wt_session, WT_SESSION, open_cursor)
COPYDOC(__wt_session, WT_SESSION, create)
COPYDOC(__wt_session, WT_SESSION, compact)
COPYDOC(__wt_session, WT_SESSION, drop)
+COPYDOC(__wt_session, WT_SESSION, join)
COPYDOC(__wt_session, WT_SESSION, log_flush)
COPYDOC(__wt_session, WT_SESSION, log_printf)
COPYDOC(__wt_session, WT_SESSION, rename)
diff --git a/src/bloom/bloom.c b/src/bloom/bloom.c
index 9225b9fe3b5..e3a21f25dc1 100644
--- a/src/bloom/bloom.c
+++ b/src/bloom/bloom.c
@@ -314,6 +314,47 @@ __wt_bloom_get(WT_BLOOM *bloom, WT_ITEM *key)
}
/*
+ * __wt_bloom_inmem_get --
+ * Tests whether the given key is in the Bloom filter.
+ * This can be used in place of __wt_bloom_get
+ * for Bloom filters that are memory only.
+ */
+int
+__wt_bloom_inmem_get(WT_BLOOM *bloom, WT_ITEM *key)
+{
+ uint64_t h1, h2;
+ uint32_t i;
+
+ h1 = __wt_hash_fnv64(key->data, key->size);
+ h2 = __wt_hash_city64(key->data, key->size);
+ for (i = 0; i < bloom->k; i++, h1 += h2) {
+ if (!__bit_test(bloom->bitstring, h1 % bloom->m))
+ return (WT_NOTFOUND);
+ }
+ return (0);
+}
+
+/*
+ * __wt_bloom_intersection --
+ * Modify the Bloom filter to contain the intersection of this
+ * filter with another.
+ */
+int
+__wt_bloom_intersection(WT_BLOOM *bloom, WT_BLOOM *other)
+{
+ uint64_t i, nbytes;
+
+ if (bloom->k != other->k || bloom->factor != other->factor ||
+ bloom->m != other->m || bloom->n != other->n)
+ return (EINVAL);
+
+ nbytes = __bitstr_size(bloom->m);
+ for (i = 0; i < nbytes; i++)
+ bloom->bitstring[i] &= other->bitstring[i];
+ return (0);
+}
+
+/*
* __wt_bloom_close --
* Close the Bloom filter, release any resources.
*/
diff --git a/src/config/config_def.c b/src/config/config_def.c
index e1284d18c09..d79ce6853e6 100644
--- a/src/config/config_def.c
+++ b/src/config/config_def.c
@@ -295,6 +295,19 @@ static const WT_CONFIG_CHECK confchk_WT_SESSION_drop[] = {
{ NULL, NULL, NULL, NULL, NULL, 0 }
};
+static const WT_CONFIG_CHECK confchk_WT_SESSION_join[] = {
+ { "bloom_bit_count", "int", NULL, "min=2,max=1000", NULL, 0 },
+ { "bloom_hash_count", "int", NULL, "min=2,max=100", NULL, 0 },
+ { "compare", "string",
+ NULL, "choices=[\"eq\",\"ge\",\"gt\",\"le\",\"lt\"]",
+ NULL, 0 },
+ { "count", "int", NULL, NULL, NULL, 0 },
+ { "strategy", "string",
+ NULL, "choices=[\"bloom\",\"default\"]",
+ NULL, 0 },
+ { NULL, NULL, NULL, NULL, NULL, 0 }
+};
+
static const WT_CONFIG_CHECK confchk_WT_SESSION_log_flush[] = {
{ "sync", "string",
NULL, "choices=[\"background\",\"off\",\"on\"]",
@@ -893,6 +906,11 @@ static const WT_CONFIG_ENTRY config_entries[] = {
"force=0,remove_files=",
confchk_WT_SESSION_drop, 2
},
+ { "WT_SESSION.join",
+ "bloom_bit_count=16,bloom_hash_count=8,compare=\"eq\",count=,"
+ "strategy=",
+ confchk_WT_SESSION_join, 5
+ },
{ "WT_SESSION.log_flush",
"sync=on",
confchk_WT_SESSION_log_flush, 1
diff --git a/src/conn/conn_stat.c b/src/conn/conn_stat.c
index 455ec9514f0..31438e10606 100644
--- a/src/conn/conn_stat.c
+++ b/src/conn/conn_stat.c
@@ -154,7 +154,7 @@ __statlog_dump(WT_SESSION_IMPL *session, const char *name, bool conn_stats)
WT_DECL_RET;
int64_t *stats;
int i;
- const char *uri;
+ const char *desc, *uri;
const char *cfg[] = {
WT_CONFIG_BASE(session, WT_SESSION_open_cursor), NULL };
@@ -175,16 +175,19 @@ __statlog_dump(WT_SESSION_IMPL *session, const char *name, bool conn_stats)
* If we don't find an underlying object, silently ignore it, the object
* may exist only intermittently.
*/
- switch (ret = __wt_curstat_open(session, uri, cfg, &cursor)) {
+ switch (ret = __wt_curstat_open(session, uri, NULL, cfg, &cursor)) {
case 0:
cst = (WT_CURSOR_STAT *)cursor;
- for (stats = cst->stats, i = 0; i < cst->stats_count; ++i)
+ for (stats = cst->stats, i = 0; i < cst->stats_count; ++i) {
+ if (conn_stats)
+ WT_ERR(__wt_stat_connection_desc(cst, i,
+ &desc));
+ else
+ WT_ERR(__wt_stat_dsrc_desc(cst, i, &desc));
WT_ERR(__wt_fprintf(conn->stat_fp,
"%s %" PRId64 " %s %s\n",
- conn->stat_stamp, stats[i],
- name, conn_stats ?
- __wt_stat_connection_desc(i) :
- __wt_stat_dsrc_desc(i)));
+ conn->stat_stamp, stats[i], name, desc));
+ }
WT_ERR(cursor->close(cursor));
break;
case EBUSY:
diff --git a/src/cursor/cur_dump.c b/src/cursor/cur_dump.c
index 6c11c4b407e..e5799fbad05 100644
--- a/src/cursor/cur_dump.c
+++ b/src/cursor/cur_dump.c
@@ -329,7 +329,7 @@ __curdump_close(WT_CURSOR *cursor)
cdump = (WT_CURSOR_DUMP *)cursor;
child = cdump->child;
- CURSOR_API_CALL(cursor, session, get_key, NULL);
+ CURSOR_API_CALL(cursor, session, close, NULL);
if (child != NULL)
WT_TRET(child->close(child));
/* We shared the child's URI. */
diff --git a/src/cursor/cur_index.c b/src/cursor/cur_index.c
index fd2a6cd7480..a909eaece99 100644
--- a/src/cursor/cur_index.c
+++ b/src/cursor/cur_index.c
@@ -8,6 +8,20 @@
#include "wt_internal.h"
+ /*
+ * __wt_curindex_joined --
+ * Produce an error that this cursor is being used in a join call.
+ */
+int
+__wt_curindex_joined(WT_CURSOR *cursor)
+{
+ WT_SESSION_IMPL *session;
+
+ session = (WT_SESSION_IMPL *)cursor->session;
+ __wt_errx(session, "index cursor is being used in a join");
+ return (ENOTSUP);
+}
+
/*
* __curindex_get_value --
* WT_CURSOR->get_value implementation for index cursors.
@@ -15,32 +29,16 @@
static int
__curindex_get_value(WT_CURSOR *cursor, ...)
{
- WT_CURSOR_INDEX *cindex;
WT_DECL_RET;
- WT_ITEM *item;
WT_SESSION_IMPL *session;
va_list ap;
- cindex = (WT_CURSOR_INDEX *)cursor;
- CURSOR_API_CALL(cursor, session, get_value, NULL);
- WT_CURSOR_NEEDVALUE(cursor);
-
va_start(ap, cursor);
- if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) {
- ret = __wt_schema_project_merge(session,
- cindex->cg_cursors, cindex->value_plan,
- cursor->value_format, &cursor->value);
- if (ret == 0) {
- item = va_arg(ap, WT_ITEM *);
- item->data = cursor->value.data;
- item->size = cursor->value.size;
- }
- } else
- ret = __wt_schema_project_out(session,
- cindex->cg_cursors, cindex->value_plan, ap);
- va_end(ap);
+ JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL);
+ WT_ERR(__wt_curindex_get_valuev(cursor, ap));
-err: API_END_RET(session, ret);
+err: va_end(ap);
+ API_END_RET(session, ret);
}
/*
@@ -53,7 +51,7 @@ __curindex_set_value(WT_CURSOR *cursor, ...)
WT_DECL_RET;
WT_SESSION_IMPL *session;
- CURSOR_API_CALL(cursor, session, set_value, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, set_value, NULL);
ret = ENOTSUP;
err: cursor->saved_err = ret;
F_CLR(cursor, WT_CURSTD_VALUE_SET);
@@ -72,7 +70,7 @@ __curindex_compare(WT_CURSOR *a, WT_CURSOR *b, int *cmpp)
WT_SESSION_IMPL *session;
cindex = (WT_CURSOR_INDEX *)a;
- CURSOR_API_CALL(a, session, compare, NULL);
+ JOINABLE_CURSOR_API_CALL(a, session, compare, NULL);
/* Check both cursors are "index:" type. */
if (!WT_PREFIX_MATCH(a->uri, "index:") ||
@@ -150,7 +148,7 @@ __curindex_next(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
cindex = (WT_CURSOR_INDEX *)cursor;
- CURSOR_API_CALL(cursor, session, next, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL);
F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
if ((ret = cindex->child->next(cindex->child)) == 0)
@@ -171,7 +169,7 @@ __curindex_prev(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
cindex = (WT_CURSOR_INDEX *)cursor;
- CURSOR_API_CALL(cursor, session, prev, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, prev, NULL);
F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
if ((ret = cindex->child->prev(cindex->child)) == 0)
@@ -194,7 +192,7 @@ __curindex_reset(WT_CURSOR *cursor)
u_int i;
cindex = (WT_CURSOR_INDEX *)cursor;
- CURSOR_API_CALL(cursor, session, reset, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL);
F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
WT_TRET(cindex->child->reset(cindex->child));
@@ -225,7 +223,7 @@ __curindex_search(WT_CURSOR *cursor)
cindex = (WT_CURSOR_INDEX *)cursor;
child = cindex->child;
- CURSOR_API_CALL(cursor, session, search, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, search, NULL);
/*
* We are searching using the application-specified key, which
@@ -284,7 +282,7 @@ __curindex_search_near(WT_CURSOR *cursor, int *exact)
WT_SESSION_IMPL *session;
cindex = (WT_CURSOR_INDEX *)cursor;
- CURSOR_API_CALL(cursor, session, search_near, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, search_near, NULL);
__wt_cursor_set_raw_key(cindex->child, &cursor->key);
if ((ret = cindex->child->search_near(cindex->child, exact)) == 0)
ret = __curindex_move(cindex);
@@ -311,7 +309,7 @@ __curindex_close(WT_CURSOR *cursor)
cindex = (WT_CURSOR_INDEX *)cursor;
idx = cindex->index;
- CURSOR_API_CALL(cursor, session, close, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL);
if ((cp = cindex->cg_cursors) != NULL)
for (i = 0, cp = cindex->cg_cursors;
diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c
new file mode 100644
index 00000000000..be121daa61f
--- /dev/null
+++ b/src/cursor/cur_join.c
@@ -0,0 +1,1049 @@
+/*-
+ * Copyright (c) 2014-2015 MongoDB, Inc.
+ * Copyright (c) 2008-2014 WiredTiger, Inc.
+ * All rights reserved.
+ *
+ * See the file LICENSE for redistribution information.
+ */
+
+#include "wt_internal.h"
+
+/*
+ * __curjoin_entry_iter_init --
+ * Initialize an iteration for the index managed by a join entry.
+ *
+ */
+static int
+__curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ITER **iterp)
+{
+ WT_CURSOR *newcur;
+ WT_CURSOR *to_dup;
+ WT_DECL_RET;
+ const char *raw_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), "raw", NULL };
+ const char *def_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), NULL };
+ const char *uri, **config;
+ char *uribuf;
+ WT_CURSOR_JOIN_ITER *iter;
+ size_t size;
+
+ iter = NULL;
+ uribuf = NULL;
+ to_dup = entry->ends[0].cursor;
+
+ uri = to_dup->uri;
+ if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW))
+ config = &raw_cfg[0];
+ else
+ config = &def_cfg[0];
+
+ if (cjoin->projection != NULL) {
+ size = strlen(uri) + strlen(cjoin->projection) + 1;
+ WT_ERR(__wt_calloc(session, size, 1, &uribuf));
+ snprintf(uribuf, size, "%s%s", uri, cjoin->projection);
+ uri = uribuf;
+ }
+ WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, config,
+ &newcur));
+ WT_ERR(__wt_cursor_dup_position(to_dup, newcur));
+ WT_ERR(__wt_calloc_one(session, &iter));
+ iter->cjoin = cjoin;
+ iter->session = session;
+ iter->entry = entry;
+ iter->cursor = newcur;
+ iter->advance = false;
+ *iterp = iter;
+
+ if (0) {
+err: __wt_free(session, iter);
+ }
+ __wt_free(session, uribuf);
+ return (ret);
+}
+
+/*
+ * __curjoin_pack_recno --
+ * Pack the given recno into a buffer; prepare an item referencing it.
+ *
+ */
+static int
+__curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf,
+ size_t bufsize, WT_ITEM *item)
+{
+ WT_DECL_RET;
+ WT_SESSION *wtsession;
+ size_t sz;
+
+ wtsession = (WT_SESSION *)session;
+ WT_ERR(wiredtiger_struct_size(wtsession, &sz, "r", r));
+ WT_ASSERT(session, sz < bufsize);
+ WT_ERR(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r));
+ item->size = sz;
+ item->data = buf;
+
+err: return (ret);
+}
+
+/*
+ * __curjoin_entry_iter_next --
+ * Get the next item in an iteration.
+ *
+ */
+static int
+__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey,
+ uint64_t *rp)
+{
+ WT_CURSOR *firstcg_cur;
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ uint64_t r;
+
+ if (iter->advance)
+ WT_ERR(iter->cursor->next(iter->cursor));
+ else
+ iter->advance = true;
+
+ session = iter->session;
+ cjoin = iter->cjoin;
+
+ /*
+ * Set our key to the primary key, we'll also need this
+ * to check membership.
+ */
+ if (iter->entry->index != NULL)
+ firstcg_cur = ((WT_CURSOR_INDEX *)iter->cursor)->cg_cursors[0];
+ else
+ firstcg_cur = ((WT_CURSOR_TABLE *)iter->cursor)->cg_cursors[0];
+ if (WT_CURSOR_RECNO(&cjoin->iface)) {
+ r = *(uint64_t *)firstcg_cur->key.data;
+ WT_ERR(__curjoin_pack_recno(session, r, cjoin->recno_buf,
+ sizeof(cjoin->recno_buf), primkey));
+ *rp = r;
+ } else {
+ WT_ITEM_SET(*primkey, firstcg_cur->key);
+ *rp = 0;
+ }
+ iter->curkey = primkey;
+ iter->entry->stats.actual_count++;
+ iter->entry->stats.accesses++;
+
+err: return (ret);
+}
+
+/*
+ * __curjoin_entry_iter_reset --
+ * Reset an iteration to the starting point.
+ *
+ */
+static int
+__curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter)
+{
+ WT_DECL_RET;
+
+ if (iter->advance) {
+ WT_ERR(iter->cursor->reset(iter->cursor));
+ WT_ERR(__wt_cursor_dup_position(
+ iter->cjoin->entries[0].ends[0].cursor, iter->cursor));
+ iter->advance = false;
+ iter->entry->stats.actual_count = 0;
+ }
+
+err: return (ret);
+}
+
+/*
+ * __curjoin_entry_iter_ready --
+ * The iterator is positioned.
+ *
+ */
+static bool
+__curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter)
+{
+ return (iter->advance);
+}
+
+/*
+ * __curjoin_entry_iter_close --
+ * Close the iteration, release resources.
+ *
+ */
+static int
+__curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *iter)
+{
+ if (iter->cursor != NULL)
+ return (iter->cursor->close(iter->cursor));
+ else
+ return (0);
+ __wt_free(iter->session, iter);
+}
+
+/*
+ * __curjoin_get_key --
+ * WT_CURSOR->get_key for join cursors.
+ */
+static int
+__curjoin_get_key(WT_CURSOR *cursor, ...)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ va_list ap;
+
+ cjoin = (WT_CURSOR_JOIN *)cursor;
+
+ va_start(ap, cursor);
+ CURSOR_API_CALL(cursor, session, get_key, NULL);
+
+ if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) ||
+ !__curjoin_entry_iter_ready(cjoin->iter)) {
+ __wt_errx(session, "join cursor must be advanced with next()");
+ WT_ERR(EINVAL);
+ }
+ WT_ERR(__wt_cursor_get_keyv(cursor, cursor->flags, ap));
+
+err: va_end(ap);
+ API_END_RET(session, ret);
+}
+
+/*
+ * __curjoin_get_value --
+ * WT_CURSOR->get_value for join cursors.
+ */
+static int
+__curjoin_get_value(WT_CURSOR *cursor, ...)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_CURSOR_JOIN_ITER *iter;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ va_list ap;
+
+ cjoin = (WT_CURSOR_JOIN *)cursor;
+ iter = cjoin->iter;
+
+ va_start(ap, cursor);
+ CURSOR_API_CALL(cursor, session, get_value, NULL);
+
+ if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) ||
+ !__curjoin_entry_iter_ready(iter)) {
+ __wt_errx(session, "join cursor must be advanced with next()");
+ WT_ERR(EINVAL);
+ }
+ if (iter->entry->index != NULL)
+ WT_ERR(__wt_curindex_get_valuev(iter->cursor, ap));
+ else
+ WT_ERR(__wt_curtable_get_valuev(iter->cursor, ap));
+
+err: va_end(ap);
+ API_END_RET(session, ret);
+}
+
+/*
+ * __curjoin_init_bloom --
+ * Populate Bloom filters
+ */
+static int
+__curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_CURSOR_JOIN_ENTRY *entry, WT_BLOOM *bloom)
+{
+ WT_COLLATOR *collator;
+ WT_CURSOR *c;
+ WT_CURSOR_INDEX *cindex;
+ WT_CURSOR_JOIN_ENDPOINT *end, *endmax;
+ WT_DECL_RET;
+ WT_ITEM curkey, curvalue, *k;
+ WT_TABLE *maintable;
+ char *uri;
+ const char *raw_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), "raw", NULL };
+ const char *mainkey_str, *p;
+ void *allocbuf;
+ size_t mainkey_len, size;
+ u_int i;
+ int cmp, skip;
+
+ c = NULL;
+ allocbuf = NULL;
+ skip = 0;
+
+ if (entry->index != NULL) {
+ /*
+ * Open a cursor having a projection of the keys of the
+ * index we're comparing against. Open it raw, we're
+ * going to compare it to the raw keys of the
+ * reference cursors.
+ */
+ maintable = ((WT_CURSOR_TABLE *)entry->main)->table;
+ mainkey_str = maintable->colconf.str + 1;
+ for (p = mainkey_str, i = 0;
+ p != NULL && i < maintable->nkey_columns; i++)
+ p = strchr(p + 1, ',');
+ WT_ASSERT(session, p != 0);
+ mainkey_len = WT_PTRDIFF(p, mainkey_str);
+ size = strlen(entry->index->name) + mainkey_len + 3;
+ WT_ERR(__wt_calloc(session, size, 1, &uri));
+ snprintf(uri, size, "%s(%.*s)", entry->index->name,
+ (int)mainkey_len, mainkey_str);
+ } else {
+ /*
+ * For joins on the main table, we just need the primary
+ * key for comparison, we don't need any values.
+ */
+ size = strlen(cjoin->table->name) + 3;
+ WT_ERR(__wt_calloc(session, size, 1, &uri));
+ snprintf(uri, size, "%s()", cjoin->table->name);
+ }
+ WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, raw_cfg, &c));
+
+ /* Initially position the cursor if necessary. */
+ endmax = &entry->ends[entry->ends_next];
+ if ((end = &entry->ends[0]) < endmax &&
+ F_ISSET(end, WT_CURJOIN_END_GE)) {
+ WT_ERR(__wt_cursor_dup_position(end->cursor, c));
+ if (end->flags == WT_CURJOIN_END_GE)
+ skip = 1;
+ }
+ collator = (entry->index == NULL) ? NULL : entry->index->collator;
+ while (ret == 0) {
+ c->get_key(c, &curkey);
+ if (entry->index != NULL) {
+ cindex = (WT_CURSOR_INDEX *)c;
+ if (cindex->index->extractor == NULL) {
+ /*
+ * Repack so it's comparable to the
+ * reference endpoints.
+ */
+ k = &cindex->child->key;
+ WT_ERR(__wt_struct_repack(session,
+ cindex->child->key_format,
+ entry->main->value_format, k, &curkey,
+ &allocbuf));
+ } else
+ curkey = cindex->child->key;
+ }
+ for (end = &entry->ends[skip]; end < endmax; end++) {
+ WT_ERR(__wt_compare(session, collator, &curkey,
+ &end->key, &cmp));
+ if (!F_ISSET(end, WT_CURJOIN_END_LT)) {
+ if (cmp < 0 || (cmp == 0 &&
+ !F_ISSET(end, WT_CURJOIN_END_EQ)))
+ goto advance;
+ if (cmp > 0) {
+ if (F_ISSET(end, WT_CURJOIN_END_GT))
+ skip = 1;
+ else
+ goto done;
+ }
+ } else {
+ if (cmp > 0 || (cmp == 0 &&
+ !F_ISSET(end, WT_CURJOIN_END_EQ)))
+ goto done;
+ }
+ }
+ if (entry->index != NULL)
+ c->get_value(c, &curvalue);
+ else
+ c->get_key(c, &curvalue);
+ WT_ERR(__wt_bloom_insert(bloom, &curvalue));
+ entry->stats.actual_count++;
+advance:
+ if ((ret = c->next(c)) == WT_NOTFOUND)
+ break;
+ }
+done:
+ WT_ERR_NOTFOUND_OK(ret);
+
+err: if (c != NULL)
+ WT_TRET(c->close(c));
+ __wt_free(session, allocbuf);
+ return (ret);
+}
+
+/*
+ * __curjoin_endpoint_init_key --
+ * Set the key in the reference endpoint.
+ */
+static int
+__curjoin_endpoint_init_key(WT_SESSION_IMPL *session,
+ WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ENDPOINT *endpoint)
+{
+ WT_CURSOR *cursor;
+ WT_CURSOR_INDEX *cindex;
+ WT_DECL_RET;
+ WT_ITEM *k;
+ uint64_t r;
+ void *allocbuf;
+
+ allocbuf = NULL;
+ if ((cursor = endpoint->cursor) != NULL) {
+ if (entry->index != NULL) {
+ cindex = (WT_CURSOR_INDEX *)endpoint->cursor;
+ if (cindex->index->extractor == NULL) {
+ WT_ERR(__wt_struct_repack(session,
+ cindex->child->key_format,
+ entry->main->value_format,
+ &cindex->child->key, &endpoint->key,
+ &allocbuf));
+ if (allocbuf != NULL)
+ F_SET(endpoint, WT_CURJOIN_END_OWN_KEY);
+ } else
+ endpoint->key = cindex->child->key;
+ } else {
+ k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key;
+ if (WT_CURSOR_RECNO(cursor)) {
+ r = *(uint64_t *)k->data;
+ WT_ERR(__curjoin_pack_recno(session, r,
+ endpoint->recno_buf,
+ sizeof(endpoint->recno_buf),
+ &endpoint->key));
+ }
+ else
+ endpoint->key = *k;
+ }
+ }
+ if (0) {
+err: __wt_free(session, allocbuf);
+ }
+ return (ret);
+}
+
+/*
+ * __curjoin_init_iter --
+ * Initialize before any iteration.
+ */
+static int
+__curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin)
+{
+ WT_BLOOM *bloom;
+ WT_DECL_RET;
+ WT_CURSOR_JOIN_ENTRY *je, *jeend, *je2;
+ WT_CURSOR_JOIN_ENDPOINT *end;
+ uint64_t k, m;
+
+ if (cjoin->entries_next == 0) {
+ __wt_errx(session, "join cursor has not yet been joined "
+ "with any other cursors");
+ WT_ERR(EINVAL);
+ }
+
+ je = &cjoin->entries[0];
+ WT_ERR(__curjoin_entry_iter_init(session, cjoin, je, &cjoin->iter));
+
+ jeend = &cjoin->entries[cjoin->entries_next];
+ for (je = cjoin->entries; je < jeend; je++) {
+ __wt_stat_join_init_single(&je->stats);
+ for (end = &je->ends[0]; end < &je->ends[je->ends_next];
+ end++)
+ WT_ERR(__curjoin_endpoint_init_key(session, je, end));
+
+ /*
+ * The first entry is iterated as the 'outermost' cursor.
+ * For the common GE case, we don't have to test against
+ * the left reference key, we know it will be true since
+ * the btree is ordered.
+ */
+ if (je == cjoin->entries && je->ends[0].flags ==
+ (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ))
+ F_SET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT);
+
+ if (F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) {
+ if (je->bloom == NULL) {
+ /*
+ * Look for compatible filters to be shared,
+ * pick compatible numbers for bit counts
+ * and number of hashes.
+ */
+ m = je->bloom_bit_count;
+ k = je->bloom_hash_count;
+ for (je2 = je + 1; je2 < jeend; je2++)
+ if (F_ISSET(je2,
+ WT_CURJOIN_ENTRY_BLOOM) &&
+ je2->count == je->count) {
+ m = WT_MAX(
+ je2->bloom_bit_count, m);
+ k = WT_MAX(
+ je2->bloom_hash_count, k);
+ }
+ je->bloom_bit_count = m;
+ je->bloom_hash_count = k;
+ WT_ERR(__wt_bloom_create(session, NULL,
+ NULL, je->count, m, k, &je->bloom));
+ F_SET(je, WT_CURJOIN_ENTRY_OWN_BLOOM);
+ WT_ERR(__curjoin_init_bloom(session, cjoin,
+ je, je->bloom));
+ /*
+ * Share the Bloom filter, making all
+ * config info consistent.
+ */
+ for (je2 = je + 1; je2 < jeend; je2++)
+ if (F_ISSET(je2,
+ WT_CURJOIN_ENTRY_BLOOM) &&
+ je2->count == je->count) {
+ WT_ASSERT(session,
+ je2->bloom == NULL);
+ je2->bloom = je->bloom;
+ je2->bloom_bit_count = m;
+ je2->bloom_hash_count = k;
+ }
+ } else {
+ /*
+ * Create a temporary filter that we'll
+ * merge into the shared one. The Bloom
+ * parameters of the two filters must match.
+ */
+ WT_ERR(__wt_bloom_create(session, NULL,
+ NULL, je->count, je->bloom_bit_count,
+ je->bloom_hash_count, &bloom));
+ WT_ERR(__curjoin_init_bloom(session, cjoin,
+ je, bloom));
+ WT_ERR(__wt_bloom_intersection(je->bloom,
+ bloom));
+ WT_ERR(__wt_bloom_close(bloom));
+ }
+ }
+ }
+ F_SET(cjoin, WT_CURJOIN_INITIALIZED);
+
+err:
+ return (ret);
+}
+
+/*
+ * __curjoin_entry_in_range --
+ * Check if a key is in the range specified by the entry, returning
+ * WT_NOTFOUND if not.
+ */
+static int
+__curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry,
+ WT_ITEM *curkey, bool skip_left)
+{
+ WT_COLLATOR *collator;
+ WT_CURSOR_JOIN_ENDPOINT *end, *endmax;
+ WT_DECL_RET;
+ int cmp;
+
+ collator = (entry->index != NULL) ? entry->index->collator : NULL;
+ endmax = &entry->ends[entry->ends_next];
+ for (end = &entry->ends[skip_left ? 1 : 0]; end < endmax; end++) {
+ WT_ERR(__wt_compare(session, collator, curkey, &end->key,
+ &cmp));
+ if (!F_ISSET(end, WT_CURJOIN_END_LT)) {
+ if (cmp < 0 ||
+ (cmp == 0 &&
+ !F_ISSET(end, WT_CURJOIN_END_EQ)) ||
+ (cmp > 0 && !F_ISSET(end, WT_CURJOIN_END_GT)))
+ WT_ERR(WT_NOTFOUND);
+ } else {
+ if (cmp > 0 ||
+ (cmp == 0 &&
+ !F_ISSET(end, WT_CURJOIN_END_EQ)) ||
+ (cmp < 0 && !F_ISSET(end, WT_CURJOIN_END_LT)))
+ WT_ERR(WT_NOTFOUND);
+ }
+ }
+err: return (ret);
+}
+
+typedef struct {
+ WT_CURSOR iface;
+ WT_CURSOR_JOIN_ENTRY *entry;
+ int ismember;
+} WT_CURJOIN_EXTRACTOR;
+
+/*
+ * __curjoin_extract_insert --
+ * Handle a key produced by a custom extractor.
+ */
+static int
+__curjoin_extract_insert(WT_CURSOR *cursor) {
+ WT_CURJOIN_EXTRACTOR *cextract;
+ WT_DECL_RET;
+ WT_ITEM ikey;
+ WT_SESSION_IMPL *session;
+
+ cextract = (WT_CURJOIN_EXTRACTOR *)cursor;
+ /*
+ * This insert method may be called multiple times during a single
+ * extraction. If we already have a definitive answer to the
+ * membership question, exit early.
+ */
+ if (cextract->ismember)
+ return (0);
+
+ session = (WT_SESSION_IMPL *)cursor->session;
+
+ WT_ITEM_SET(ikey, cursor->key);
+ /*
+ * We appended a padding byte to the key to avoid rewriting the last
+ * column. Strip that away here.
+ */
+ WT_ASSERT(session, ikey.size > 0);
+ --ikey.size;
+
+ ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false);
+ if (ret == WT_NOTFOUND)
+ ret = 0;
+ else
+ cextract->ismember = 1;
+
+ return (ret);
+}
+
+/*
+ * __curjoin_entry_member --
+ * Do a membership check for a particular index that was joined,
+ * if not a member, returns WT_NOTFOUND.
+ */
+static int
+__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_CURSOR_JOIN_ENTRY *entry, bool skip_left)
+{
+ WT_CURJOIN_EXTRACTOR extract_cursor;
+ WT_CURSOR *c;
+ WT_CURSOR_STATIC_INIT(iface,
+ __wt_cursor_get_key, /* get-key */
+ __wt_cursor_get_value, /* get-value */
+ __wt_cursor_set_key, /* set-key */
+ __wt_cursor_set_value, /* set-value */
+ __wt_cursor_notsup, /* compare */
+ __wt_cursor_notsup, /* equals */
+ __wt_cursor_notsup, /* next */
+ __wt_cursor_notsup, /* prev */
+ __wt_cursor_notsup, /* reset */
+ __wt_cursor_notsup, /* search */
+ __wt_cursor_notsup, /* search-near */
+ __curjoin_extract_insert, /* insert */
+ __wt_cursor_notsup, /* update */
+ __wt_cursor_notsup, /* reconfigure */
+ __wt_cursor_notsup, /* remove */
+ __wt_cursor_notsup); /* close */
+ WT_DECL_RET;
+ WT_INDEX *idx;
+ WT_ITEM *key, v;
+ bool bloom_found;
+
+ key = cjoin->iter->curkey;
+ entry->stats.accesses++;
+ bloom_found = false;
+
+ if (entry->bloom != NULL) {
+ /*
+ * If we don't own the Bloom filter, we must be sharing one
+ * in a previous entry. So the shared filter has already
+ * been checked and passed.
+ */
+ if (!F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM))
+ return (0);
+
+ /*
+ * If the item is not in the Bloom filter, we return
+ * immediately, otherwise, we still need to check the
+ * long way.
+ */
+ WT_ERR(__wt_bloom_inmem_get(entry->bloom, key));
+ bloom_found = true;
+ }
+ if (entry->index != NULL) {
+ c = entry->main;
+ c->set_key(c, key);
+ if ((ret = c->search(c)) == 0)
+ ret = c->get_value(c, &v);
+ else if (ret == WT_NOTFOUND)
+ WT_ERR_MSG(session, WT_ERROR,
+ "main table for join is missing entry.");
+ c->reset(c);
+ WT_ERR(ret);
+ } else
+ v = *key;
+
+ if ((idx = entry->index) != NULL && idx->extractor != NULL) {
+ extract_cursor.iface = iface;
+ extract_cursor.iface.session = &session->iface;
+ extract_cursor.iface.key_format = idx->exkey_format;
+ extract_cursor.ismember = 0;
+ extract_cursor.entry = entry;
+ WT_ERR(idx->extractor->extract(idx->extractor,
+ &session->iface, key, &v, &extract_cursor.iface));
+ if (!extract_cursor.ismember)
+ WT_ERR(WT_NOTFOUND);
+ } else
+ WT_ERR(__curjoin_entry_in_range(session, entry, &v, skip_left));
+
+ if (0) {
+err: if (ret == WT_NOTFOUND && bloom_found)
+ entry->stats.bloom_false_positive++;
+ }
+ return (ret);
+}
+
+/*
+ * __curjoin_next --
+ * WT_CURSOR::next for join cursors.
+ */
+static int
+__curjoin_next(WT_CURSOR *cursor)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ bool skip_left;
+ u_int i;
+
+ cjoin = (WT_CURSOR_JOIN *)cursor;
+
+ CURSOR_API_CALL(cursor, session, next, NULL);
+
+ if (F_ISSET(cjoin, WT_CURJOIN_ERROR)) {
+ __wt_errx(session, "join cursor encountered previous error");
+ WT_ERR(WT_ERROR);
+ }
+ if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED))
+ WT_ERR(__curjoin_init_iter(session, cjoin));
+
+nextkey:
+ if ((ret = __curjoin_entry_iter_next(cjoin->iter, &cursor->key,
+ &cursor->recno)) == 0) {
+ F_SET(cursor, WT_CURSTD_KEY_EXT);
+
+ /*
+ * We may have already established membership for the
+ * 'left' case for the first entry, since we're
+ * using that in our iteration.
+ */
+ skip_left = F_ISSET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT);
+ for (i = 0; i < cjoin->entries_next; i++) {
+ ret = __curjoin_entry_member(session, cjoin,
+ &cjoin->entries[i], skip_left);
+ if (ret == WT_NOTFOUND)
+ goto nextkey;
+ skip_left = false;
+ WT_ERR(ret);
+ }
+ }
+
+ if (0) {
+err: F_SET(cjoin, WT_CURJOIN_ERROR);
+ }
+ API_END_RET(session, ret);
+}
+
+/*
+ * __curjoin_reset --
+ * WT_CURSOR::reset for join cursors.
+ */
+static int
+__curjoin_reset(WT_CURSOR *cursor)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+
+ cjoin = (WT_CURSOR_JOIN *)cursor;
+
+ CURSOR_API_CALL(cursor, session, reset, NULL);
+
+ if (F_ISSET(cjoin, WT_CURJOIN_INITIALIZED))
+ WT_ERR(__curjoin_entry_iter_reset(cjoin->iter));
+
+err: API_END_RET(session, ret);
+}
+
+/*
+ * __curjoin_close --
+ * WT_CURSOR::close for join cursors.
+ */
+static int
+__curjoin_close(WT_CURSOR *cursor)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_CURSOR_JOIN_ENDPOINT *end;
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ u_int i;
+
+ cjoin = (WT_CURSOR_JOIN *)cursor;
+
+ CURSOR_API_CALL(cursor, session, close, NULL);
+
+ __wt_schema_release_table(session, cjoin->table);
+ /* These are owned by the table */
+ cursor->internal_uri = NULL;
+ cursor->key_format = NULL;
+ if (cjoin->projection != NULL) {
+ __wt_free(session, cjoin->projection);
+ __wt_free(session, cursor->value_format);
+ }
+
+ for (entry = cjoin->entries, i = 0; i < cjoin->entries_next;
+ entry++, i++) {
+ if (entry->main != NULL)
+ WT_TRET(entry->main->close(entry->main));
+ if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM))
+ WT_TRET(__wt_bloom_close(entry->bloom));
+ for (end = &entry->ends[0];
+ end < &entry->ends[entry->ends_next]; end++) {
+ F_CLR(end->cursor, WT_CURSTD_JOINED);
+ if (F_ISSET(end, WT_CURJOIN_END_OWN_KEY))
+ __wt_free(session, end->key.data);
+ }
+ }
+
+ if (cjoin->iter != NULL)
+ WT_TRET(__curjoin_entry_iter_close(cjoin->iter));
+ __wt_free(session, cjoin->entries);
+ WT_TRET(__wt_cursor_close(cursor));
+
+err: API_END_RET(session, ret);
+}
+
+/*
+ * __wt_curjoin_open --
+ * Initialize a join cursor.
+ *
+ * Join cursors are read-only.
+ */
+int
+__wt_curjoin_open(WT_SESSION_IMPL *session,
+ const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp)
+{
+ WT_CURSOR_STATIC_INIT(iface,
+ __curjoin_get_key, /* get-key */
+ __curjoin_get_value, /* get-value */
+ __wt_cursor_notsup, /* set-key */
+ __wt_cursor_notsup, /* set-value */
+ __wt_cursor_notsup, /* compare */
+ __wt_cursor_notsup, /* equals */
+ __curjoin_next, /* next */
+ __wt_cursor_notsup, /* prev */
+ __curjoin_reset, /* reset */
+ __wt_cursor_notsup, /* search */
+ __wt_cursor_notsup, /* search-near */
+ __wt_cursor_notsup, /* insert */
+ __wt_cursor_notsup, /* update */
+ __wt_cursor_notsup, /* remove */
+ __wt_cursor_notsup, /* reconfigure */
+ __curjoin_close); /* close */
+ WT_CURSOR *cursor;
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_ITEM(tmp);
+ WT_DECL_RET;
+ WT_TABLE *table;
+ size_t size;
+ const char *tablename, *columns;
+
+ WT_STATIC_ASSERT(offsetof(WT_CURSOR_JOIN, iface) == 0);
+
+ if (!WT_PREFIX_SKIP(uri, "join:"))
+ return (EINVAL);
+ tablename = uri;
+ if (!WT_PREFIX_SKIP(tablename, "table:"))
+ return (EINVAL);
+
+ columns = strchr(tablename, '(');
+ if (columns == NULL)
+ size = strlen(tablename);
+ else
+ size = WT_PTRDIFF(columns, tablename);
+ WT_RET(__wt_schema_get_table(session, tablename, size, 0, &table));
+
+ WT_RET(__wt_calloc_one(session, &cjoin));
+ cursor = &cjoin->iface;
+ *cursor = iface;
+ cursor->session = &session->iface;
+ cursor->internal_uri = table->name;
+ cursor->key_format = table->key_format;
+ cursor->value_format = table->value_format;
+ cjoin->table = table;
+
+ /* Handle projections. */
+ WT_ERR(__wt_scr_alloc(session, 0, &tmp));
+ if (columns != NULL) {
+ WT_ERR(__wt_struct_reformat(session, table,
+ columns, strlen(columns), NULL, 1, tmp));
+ WT_ERR(__wt_strndup(
+ session, tmp->data, tmp->size, &cursor->value_format));
+ WT_ERR(__wt_strdup(session, columns, &cjoin->projection));
+ }
+
+ if (owner != NULL)
+ WT_ERR(EINVAL);
+
+ WT_ERR(__wt_cursor_init(cursor, uri, owner, cfg, cursorp));
+
+ if (0) {
+err: WT_TRET(__curjoin_close(cursor));
+ *cursorp = NULL;
+ }
+
+ __wt_scr_free(session, &tmp);
+ return (ret);
+}
+
+/*
+ * __wt_curjoin_join --
+ * Add a new join to a join cursor.
+ */
+int
+__wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_INDEX *idx, WT_CURSOR *ref_cursor, uint32_t flags, uint32_t range,
+ uint64_t count, uint64_t bloom_bit_count, uint64_t bloom_hash_count)
+{
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_DECL_RET;
+ WT_CURSOR_JOIN_ENDPOINT *end, *newend;
+ bool hasins, needbloom, range_eq;
+ u_int i, ins, nonbloom;
+ const char *raw_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), "raw", NULL };
+ char *main_uri;
+ size_t namesize, newsize;
+
+ entry = NULL;
+ hasins = needbloom = false;
+ ins = 0; /* -Wuninitialized */
+ main_uri = NULL;
+ nonbloom = 0; /* -Wuninitialized */
+ namesize = strlen(cjoin->table->name);
+
+ for (i = 0; i < cjoin->entries_next; i++) {
+ if (cjoin->entries[i].index == idx) {
+ entry = &cjoin->entries[i];
+ break;
+ }
+ if (!needbloom && i > 0 &&
+ !F_ISSET(&cjoin->entries[i], WT_CURJOIN_ENTRY_BLOOM)) {
+ needbloom = true;
+ nonbloom = i;
+ }
+ }
+ if (entry == NULL) {
+ WT_ERR(__wt_realloc_def(session, &cjoin->entries_allocated,
+ cjoin->entries_next + 1, &cjoin->entries));
+ if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM) && needbloom) {
+ /*
+ * Reorder the list so that after the first entry,
+ * the Bloom filtered entries come next, followed by
+ * the non-Bloom entries. Once the Bloom filters
+ * are built, determining membership via Bloom is
+ * faster than without Bloom, so we can answer
+ * membership questions more quickly, and with less
+ * I/O, with the Bloom entries first.
+ */
+ entry = &cjoin->entries[nonbloom];
+ memmove(entry + 1, entry,
+ (cjoin->entries_next - nonbloom) *
+ sizeof(WT_CURSOR_JOIN_ENTRY));
+ memset(entry, 0, sizeof(WT_CURSOR_JOIN_ENTRY));
+ }
+ else
+ entry = &cjoin->entries[cjoin->entries_next];
+ entry->index = idx;
+ entry->flags = flags;
+ entry->count = count;
+ entry->bloom_bit_count = bloom_bit_count;
+ entry->bloom_hash_count = bloom_hash_count;
+ ++cjoin->entries_next;
+ } else {
+ /* Merge the join into an existing entry for this index */
+ if (count != 0 && entry->count != 0 && entry->count != count) {
+ __wt_errx(session, "count=%" PRIu64 " does not match "
+ "previous count=%" PRIu64 " for this index",
+ count, entry->count);
+ WT_ERR(EINVAL);
+ }
+ if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM) !=
+ F_ISSET(entry, WT_CURJOIN_ENTRY_BLOOM)) {
+ __wt_errx(session, "join has incompatible strategy "
+ "values for the same index");
+ WT_ERR(EINVAL);
+ }
+ /*
+ * Check against other comparisons (we call them endpoints)
+ * already set up for this index.
+ * We allow either:
+ * - one or more "eq" (with disjunction)
+ * - exactly one "eq" (with conjunction)
+ * - exactly one of "gt" or "ge" (conjunction or disjunction)
+ * - exactly one of "lt" or "le" (conjunction or disjunction)
+ * - one of "gt"/"ge" along with one of "lt"/"le"
+ * (currently restricted to conjunction).
+ *
+ * Some other combinations, although expressible either do
+ * not make sense (X == 3 AND X == 5) or are reducible (X <
+ * 7 AND X < 9). Other specific cases of (X < 7 OR X > 15)
+ * or (X == 4 OR X > 15) make sense but we don't handle yet.
+ */
+ for (i = 0; i < entry->ends_next; i++) {
+ end = &entry->ends[i];
+ range_eq = (range == WT_CURJOIN_END_EQ);
+ if ((F_ISSET(end, WT_CURJOIN_END_GT) &&
+ ((range & WT_CURJOIN_END_GT) != 0 || range_eq)) ||
+ (F_ISSET(end, WT_CURJOIN_END_LT) &&
+ ((range & WT_CURJOIN_END_LT) != 0 || range_eq)) ||
+ (end->flags == WT_CURJOIN_END_EQ &&
+ (range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT))
+ != 0)) {
+ __wt_errx(session,
+ "join has overlapping ranges");
+ WT_ERR(EINVAL);
+ }
+ if (range == WT_CURJOIN_END_EQ &&
+ end->flags == WT_CURJOIN_END_EQ &&
+ !F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)) {
+ __wt_errx(session,
+ "compare=eq can only be combined "
+ "using operation=or");
+ WT_ERR(EINVAL);
+ }
+
+ /*
+ * Sort "gt"/"ge" to the front, followed by any number
+ * of "eq", and finally "lt"/"le".
+ */
+ if (!hasins &&
+ ((range & WT_CURJOIN_END_GT) != 0 ||
+ (range == WT_CURJOIN_END_EQ &&
+ !F_ISSET(end, WT_CURJOIN_END_GT)))) {
+ ins = i;
+ hasins = true;
+ }
+ }
+ /* All checks completed, merge any new configuration now */
+ entry->count = count;
+ entry->bloom_bit_count =
+ WT_MAX(entry->bloom_bit_count, bloom_bit_count);
+ entry->bloom_hash_count =
+ WT_MAX(entry->bloom_hash_count, bloom_hash_count);
+ }
+ WT_ERR(__wt_realloc_def(session, &entry->ends_allocated,
+ entry->ends_next + 1, &entry->ends));
+ if (!hasins)
+ ins = entry->ends_next;
+ newend = &entry->ends[ins];
+ memmove(newend + 1, newend,
+ (entry->ends_next - ins) * sizeof(WT_CURSOR_JOIN_ENDPOINT));
+ memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT));
+ entry->ends_next++;
+ newend->cursor = ref_cursor;
+ F_SET(newend, range);
+
+ /* Open the main file with a projection of the indexed columns. */
+ if (entry->main == NULL && entry->index != NULL) {
+ namesize = strlen(cjoin->table->name);
+ newsize = namesize + entry->index->colconf.len + 1;
+ WT_ERR(__wt_calloc(session, 1, newsize, &main_uri));
+ snprintf(main_uri, newsize, "%s%.*s",
+ cjoin->table->name, (int)entry->index->colconf.len,
+ entry->index->colconf.str);
+ WT_ERR(__wt_open_cursor(session, main_uri,
+ (WT_CURSOR *)cjoin, raw_cfg, &entry->main));
+ }
+
+err: if (main_uri != NULL)
+ __wt_free(session, main_uri);
+ return (ret);
+}
diff --git a/src/cursor/cur_stat.c b/src/cursor/cur_stat.c
index 81d028c165a..65d2dc81406 100644
--- a/src/cursor/cur_stat.c
+++ b/src/cursor/cur_stat.c
@@ -103,7 +103,7 @@ __curstat_get_value(WT_CURSOR *cursor, ...)
va_list ap;
size_t size;
uint64_t *v;
- const char **p;
+ const char *desc, **p;
cst = (WT_CURSOR_STAT *)cursor;
va_start(ap, cursor);
@@ -111,15 +111,13 @@ __curstat_get_value(WT_CURSOR *cursor, ...)
WT_CURSOR_NEEDVALUE(cursor);
+ WT_ERR(cst->stats_desc(cst, WT_STAT_KEY_OFFSET(cst), &desc));
if (F_ISSET(cursor, WT_CURSTD_RAW)) {
WT_ERR(__wt_struct_size(session, &size, cursor->value_format,
- cst->stats_desc(WT_STAT_KEY_OFFSET(cst)),
- cst->pv.data, cst->v));
+ desc, cst->pv.data, cst->v));
WT_ERR(__wt_buf_initsize(session, &cursor->value, size));
WT_ERR(__wt_struct_pack(session, cursor->value.mem, size,
- cursor->value_format,
- cst->stats_desc(WT_STAT_KEY_OFFSET(cst)),
- cst->pv.data, cst->v));
+ cursor->value_format, desc, cst->pv.data, cst->v));
item = va_arg(ap, WT_ITEM *);
item->data = cursor->value.data;
@@ -130,7 +128,7 @@ __curstat_get_value(WT_CURSOR *cursor, ...)
* pointer support isn't documented, but it's a cheap test.
*/
if ((p = va_arg(ap, const char **)) != NULL)
- *p = cst->stats_desc(WT_STAT_KEY_OFFSET(cst));
+ *p = desc;
if ((p = va_arg(ap, const char **)) != NULL)
*p = cst->pv.data;
if ((v = va_arg(ap, uint64_t *)) != NULL)
@@ -201,7 +199,9 @@ __curstat_next(WT_CURSOR *cursor)
/* Initialize on demand. */
if (cst->notinitialized) {
WT_ERR(__wt_curstat_init(
- session, cursor->internal_uri, cst->cfg, cst));
+ session, cursor->internal_uri, NULL, cst->cfg, cst));
+ if (cst->next_set != NULL)
+ WT_ERR((*cst->next_set)(session, cst, true, true));
cst->notinitialized = false;
}
@@ -211,15 +211,19 @@ __curstat_next(WT_CURSOR *cursor)
cst->key = WT_STAT_KEY_MIN(cst);
} else if (cst->key < WT_STAT_KEY_MAX(cst))
++cst->key;
- else {
- F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
+ else if (cst->next_set != NULL)
+ WT_ERR((*cst->next_set)(session, cst, true, false));
+ else
WT_ERR(WT_NOTFOUND);
- }
+
cst->v = (uint64_t)cst->stats[WT_STAT_KEY_OFFSET(cst)];
WT_ERR(__curstat_print_value(session, cst->v, &cst->pv));
F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
-err: API_END_RET(session, ret);
+ if (0) {
+err: F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
+ }
+ API_END_RET(session, ret);
}
/*
@@ -239,7 +243,9 @@ __curstat_prev(WT_CURSOR *cursor)
/* Initialize on demand. */
if (cst->notinitialized) {
WT_ERR(__wt_curstat_init(
- session, cursor->internal_uri, cst->cfg, cst));
+ session, cursor->internal_uri, NULL, cst->cfg, cst));
+ if (cst->next_set != NULL)
+ WT_ERR((*cst->next_set)(session, cst, false, true));
cst->notinitialized = false;
}
@@ -249,16 +255,19 @@ __curstat_prev(WT_CURSOR *cursor)
cst->key = WT_STAT_KEY_MAX(cst);
} else if (cst->key > WT_STAT_KEY_MIN(cst))
--cst->key;
- else {
- F_CLR(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
+ else if (cst->next_set != NULL)
+ WT_ERR((*cst->next_set)(session, cst, false, false));
+ else
WT_ERR(WT_NOTFOUND);
- }
cst->v = (uint64_t)cst->stats[WT_STAT_KEY_OFFSET(cst)];
WT_ERR(__curstat_print_value(session, cst->v, &cst->pv));
F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
-err: API_END_RET(session, ret);
+ if (0) {
+err: F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
+ }
+ API_END_RET(session, ret);
}
/*
@@ -301,7 +310,7 @@ __curstat_search(WT_CURSOR *cursor)
/* Initialize on demand. */
if (cst->notinitialized) {
WT_ERR(__wt_curstat_init(
- session, cursor->internal_uri, cst->cfg, cst));
+ session, cursor->internal_uri, NULL, cst->cfg, cst));
cst->notinitialized = false;
}
@@ -332,6 +341,7 @@ __curstat_close(WT_CURSOR *cursor)
__curstat_free_config(session, cst);
__wt_buf_free(session, &cst->pv);
+ __wt_free(session, cst->desc_buf);
WT_ERR(__wt_cursor_close(cursor));
@@ -426,12 +436,102 @@ __wt_curstat_dsrc_final(WT_CURSOR_STAT *cst)
}
/*
+ * __curstat_join_next_set --
+ * Advance to another index used in a join to give another set of
+ * statistics.
+ */
+static int
+__curstat_join_next_set(WT_SESSION_IMPL *session, WT_CURSOR_STAT *cst,
+ bool forw, bool init)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_JOIN_STATS_GROUP *join_group;
+ ssize_t pos;
+
+ WT_ASSERT(session, WT_STREQ(cst->iface.uri, "statistics:join"));
+ join_group = &cst->u.join_stats_group;
+ cjoin = join_group->join_cursor;
+ if (init)
+ pos = forw ? 0 : cjoin->entries_next - 1;
+ else
+ pos = join_group->join_cursor_entry + (forw ? 1 : -1);
+ if (pos < 0 || (size_t)pos >= cjoin->entries_next)
+ return (WT_NOTFOUND);
+
+ join_group->join_cursor_entry = pos;
+ if (cjoin->entries[pos].index == NULL) {
+ WT_ASSERT(session, WT_PREFIX_MATCH(cjoin->iface.uri, "join:"));
+ join_group->desc_prefix = cjoin->iface.uri + 5;
+ } else
+ join_group->desc_prefix = cjoin->entries[pos].index->name;
+ join_group->join_stats = cjoin->entries[pos].stats;
+ if (!init)
+ cst->key = forw ? WT_STAT_KEY_MIN(cst) : WT_STAT_KEY_MAX(cst);
+ return (0);
+}
+
+/*
+ * __curstat_join_desc --
+ * Assemble the description field based on current index and statistic.
+ */
+static int
+__curstat_join_desc(WT_CURSOR_STAT *cst, int slot, const char **resultp)
+{
+ size_t len;
+ const char *static_desc;
+ WT_JOIN_STATS_GROUP *sgrp;
+ WT_SESSION_IMPL *session;
+
+ sgrp = &cst->u.join_stats_group;
+ session = (WT_SESSION_IMPL *)sgrp->join_cursor->iface.session;
+ WT_RET(__wt_stat_join_desc(cst, slot, &static_desc));
+ len = strlen("join: ") + strlen(sgrp->desc_prefix) +
+ strlen(static_desc) + 1;
+ WT_RET(__wt_realloc(session, NULL, len, &cst->desc_buf));
+ snprintf(cst->desc_buf, len, "join: %s%s", sgrp->desc_prefix,
+ static_desc);
+ *resultp = cst->desc_buf;
+ return (0);
+}
+
+/*
+ * __curstat_join_init --
+ * Initialize the statistics for a joined cursor.
+ */
+static int
+__curstat_join_init(WT_SESSION_IMPL *session,
+ WT_CURSOR *curjoin, const char *cfg[], WT_CURSOR_STAT *cst)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_RET;
+
+ WT_UNUSED(cfg);
+
+ if (curjoin == NULL && cst->u.join_stats_group.join_cursor != NULL)
+ curjoin = &cst->u.join_stats_group.join_cursor->iface;
+ if (curjoin == NULL || !WT_PREFIX_MATCH(curjoin->uri, "join:"))
+ WT_ERR_MSG(session, EINVAL,
+ "join cursor must be used with statistics:join");
+ cjoin = (WT_CURSOR_JOIN *)curjoin;
+ memset(&cst->u.join_stats_group, 0, sizeof(WT_JOIN_STATS_GROUP));
+ cst->u.join_stats_group.join_cursor = cjoin;
+
+ cst->stats = (int64_t *)&cst->u.join_stats_group.join_stats;
+ cst->stats_base = WT_JOIN_STATS_BASE;
+ cst->stats_count = sizeof(WT_JOIN_STATS) / sizeof(int64_t);
+ cst->stats_desc = __curstat_join_desc;
+ cst->next_set = __curstat_join_next_set;
+
+err: return (ret);
+}
+
+/*
* __wt_curstat_init --
* Initialize a statistics cursor.
*/
int
__wt_curstat_init(WT_SESSION_IMPL *session,
- const char *uri, const char *cfg[], WT_CURSOR_STAT *cst)
+ const char *uri, WT_CURSOR *curjoin, const char *cfg[], WT_CURSOR_STAT *cst)
{
const char *dsrc_uri;
@@ -442,6 +542,10 @@ __wt_curstat_init(WT_SESSION_IMPL *session,
dsrc_uri = uri + strlen("statistics:");
+ if (WT_STREQ(dsrc_uri, "join"))
+ return (
+ __curstat_join_init(session, curjoin, cfg, cst));
+
if (WT_PREFIX_MATCH(dsrc_uri, "colgroup:"))
return (
__wt_curstat_colgroup_init(session, dsrc_uri, cfg, cst));
@@ -467,7 +571,7 @@ __wt_curstat_init(WT_SESSION_IMPL *session,
*/
int
__wt_curstat_open(WT_SESSION_IMPL *session,
- const char *uri, const char *cfg[], WT_CURSOR **cursorp)
+ const char *uri, WT_CURSOR *other, const char *cfg[], WT_CURSOR **cursorp)
{
WT_CONNECTION_IMPL *conn;
WT_CURSOR_STATIC_INIT(iface,
@@ -581,7 +685,7 @@ __wt_curstat_open(WT_SESSION_IMPL *session,
* objects like tables, we need to a valid set of statistics when before
* the open returns.
*/
- WT_ERR(__wt_curstat_init(session, uri, cst->cfg, cst));
+ WT_ERR(__wt_curstat_init(session, uri, other, cst->cfg, cst));
cst->notinitialized = false;
/* The cursor isn't yet positioned. */
diff --git a/src/cursor/cur_table.c b/src/cursor/cur_table.c
index b78e12c2648..dca72a16ee5 100644
--- a/src/cursor/cur_table.c
+++ b/src/cursor/cur_table.c
@@ -186,34 +186,16 @@ __wt_curtable_get_key(WT_CURSOR *cursor, ...)
int
__wt_curtable_get_value(WT_CURSOR *cursor, ...)
{
- WT_CURSOR *primary;
- WT_CURSOR_TABLE *ctable;
WT_DECL_RET;
- WT_ITEM *item;
WT_SESSION_IMPL *session;
va_list ap;
- ctable = (WT_CURSOR_TABLE *)cursor;
- primary = *ctable->cg_cursors;
- CURSOR_API_CALL(cursor, session, get_value, NULL);
- WT_CURSOR_NEEDVALUE(primary);
-
va_start(ap, cursor);
- if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) {
- ret = __wt_schema_project_merge(session,
- ctable->cg_cursors, ctable->plan,
- cursor->value_format, &cursor->value);
- if (ret == 0) {
- item = va_arg(ap, WT_ITEM *);
- item->data = cursor->value.data;
- item->size = cursor->value.size;
- }
- } else
- ret = __wt_schema_project_out(session,
- ctable->cg_cursors, ctable->plan, ap);
- va_end(ap);
+ JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL);
+ WT_ERR(__wt_curtable_get_valuev(cursor, ap));
-err: API_END_RET(session, ret);
+err: va_end(ap);
+ API_END_RET(session, ret);
}
/*
@@ -264,7 +246,7 @@ __wt_curtable_set_value(WT_CURSOR *cursor, ...)
u_int i;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, set_value, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, set_value, NULL);
va_start(ap, cursor);
if (F_ISSET(cursor, WT_CURSOR_RAW_OK | WT_CURSTD_DUMP_JSON)) {
@@ -332,7 +314,7 @@ __curtable_compare(WT_CURSOR *a, WT_CURSOR *b, int *cmpp)
WT_DECL_RET;
WT_SESSION_IMPL *session;
- CURSOR_API_CALL(a, session, compare, NULL);
+ JOINABLE_CURSOR_API_CALL(a, session, compare, NULL);
/*
* Confirm both cursors refer to the same source and have keys, then
@@ -362,7 +344,7 @@ __curtable_next(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, next, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL);
APPLY_CG(ctable, next);
err: API_END_RET(session, ret);
@@ -383,7 +365,7 @@ __curtable_next_random(WT_CURSOR *cursor)
u_int i;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, next, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL);
cp = ctable->cg_cursors;
/* Split out the first next, it retrieves the random record. */
@@ -414,7 +396,7 @@ __curtable_prev(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, prev, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, prev, NULL);
APPLY_CG(ctable, prev);
err: API_END_RET(session, ret);
@@ -432,7 +414,7 @@ __curtable_reset(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, reset, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL);
APPLY_CG(ctable, reset);
err: API_END_RET(session, ret);
@@ -450,7 +432,7 @@ __curtable_search(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, search, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, search, NULL);
APPLY_CG(ctable, search);
err: API_END_RET(session, ret);
@@ -470,7 +452,7 @@ __curtable_search_near(WT_CURSOR *cursor, int *exact)
u_int i;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, search_near, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, search_near, NULL);
cp = ctable->cg_cursors;
primary = *cp;
WT_ERR(primary->search_near(primary, exact));
@@ -501,7 +483,7 @@ __curtable_insert(WT_CURSOR *cursor)
u_int i;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_UPDATE_API_CALL(cursor, session, insert, NULL);
+ JOINABLE_CURSOR_UPDATE_API_CALL(cursor, session, insert, NULL);
WT_ERR(__curtable_open_indices(ctable));
/*
@@ -568,7 +550,7 @@ __curtable_update(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_UPDATE_API_CALL(cursor, session, update, NULL);
+ JOINABLE_CURSOR_UPDATE_API_CALL(cursor, session, update, NULL);
WT_ERR(__curtable_open_indices(ctable));
/*
@@ -619,7 +601,7 @@ __curtable_remove(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_REMOVE_API_CALL(cursor, session, NULL);
+ JOINABLE_CURSOR_REMOVE_API_CALL(cursor, session, NULL);
WT_ERR(__curtable_open_indices(ctable));
/* Find the old record so it can be removed from indices */
@@ -731,7 +713,7 @@ __curtable_close(WT_CURSOR *cursor)
u_int i;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, close, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL);
if (ctable->cg_cursors != NULL)
for (i = 0, cp = ctable->cg_cursors;
@@ -854,7 +836,7 @@ __curtable_open_indices(WT_CURSOR_TABLE *ctable)
*/
int
__wt_curtable_open(WT_SESSION_IMPL *session,
- const char *uri, const char *cfg[], WT_CURSOR **cursorp)
+ const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp)
{
WT_CURSOR_STATIC_INIT(iface,
__wt_curtable_get_key, /* get-key */
@@ -945,7 +927,7 @@ __wt_curtable_open(WT_SESSION_IMPL *session,
}
WT_ERR(__wt_cursor_init(
- cursor, cursor->internal_uri, NULL, cfg, cursorp));
+ cursor, cursor->internal_uri, owner, cfg, cursorp));
if (F_ISSET(cursor, WT_CURSTD_DUMP_JSON))
WT_ERR(__wt_json_column_init(cursor, table->key_format,
diff --git a/src/docs/cursor-join.dox b/src/docs/cursor-join.dox
new file mode 100644
index 00000000000..51da6b174bf
--- /dev/null
+++ b/src/docs/cursor-join.dox
@@ -0,0 +1,19 @@
+/*! @m_page{{c,java},cursor_join,Join cursors}
+
+Join cursors provide a way to iterate over a subset of a table, where the subset is specified by relationships with reference cursors.
+
+A join cursor is created with WT_SESSION::open_cursor using a \c
+"join:table:<name>" URI prefix. Then reference cursors are positioned to
+keys on indices and joined to the join cursor using WT_SESSION::join calls.
+The result is a join cursor that can be iterated to satisfy the join
+equation.
+
+Here is an example using join cursors:
+
+@snippet ex_schema.c Join cursors
+
+Joins support various comparison operators: \c "eq", \c "gt", \c "ge", \c "lt", \c "le". Ranges with lower and upper bounds can also be specified, by joining two cursors on the same index, for example, one with \c "compare=ge" and another \c "compare=lt". In addition to joining indices, the main table can be joined so that a range of primary keys can be specified.
+
+All the joins should be done on the join cursor before WT_CURSOR::next is called. Calling WT_CURSOR::next on a join cursor for the first time populates any bloom filters and performs other initialization. The join cursor's key is the primary key (the key for the main table), and its value is the entire set of values of the main table. A join cursor can be created with a projection by appending \c "(col1,col2,...)" to the URI if a different set of values is needed.
+
+*/
diff --git a/src/docs/cursors.dox b/src/docs/cursors.dox
index c9455e2976c..b6271951f91 100644
--- a/src/docs/cursors.dox
+++ b/src/docs/cursors.dox
@@ -19,6 +19,7 @@ See the following for more details:
- @subpage data_sources
- @ref metadata
- @ref cursor_log
+- @ref cursor_join
@section cursor_projections Projections
diff --git a/src/docs/data-sources.dox b/src/docs/data-sources.dox
index 0c446b2bf78..d09d1cbc1b8 100644
--- a/src/docs/data-sources.dox
+++ b/src/docs/data-sources.dox
@@ -13,6 +13,10 @@ The following are the builtin basic cursor types:
index cursor,
key=index key\, value=table value(s) with optional projection
of columns}
+@row{<tt>join:table:\<table name\>[\<projection\>]</tt>,
+ join cursor,
+ key=table key\, value=table value(s) with optional projection
+ of columns}
</table>
Some administrative tasks can be accomplished using the following special
cursor types that give access to data managed by WiredTiger:
diff --git a/src/docs/programming.dox b/src/docs/programming.dox
index d99c34d1da2..f005f6d3e2d 100644
--- a/src/docs/programming.dox
+++ b/src/docs/programming.dox
@@ -43,6 +43,7 @@ each of which is ordered by one or more columns.
- @ref transaction_named_snapshots
- @subpage shared_cache
- @subpage statistics
+- @subpage cursor_join
- @subpage cursor_log
- @subpage_single upgrade
diff --git a/src/docs/spell.ok b/src/docs/spell.ok
index b887f0ceee2..86af82d8fd2 100644
--- a/src/docs/spell.ok
+++ b/src/docs/spell.ok
@@ -197,6 +197,7 @@ endinternal
english
env
eof
+eq
erlang
errno
exe
@@ -220,6 +221,7 @@ freelist
fsync
gcc
gdbm
+ge
getKey
getValue
getopt
@@ -260,6 +262,7 @@ keyvalue
kvs
lang
lastname
+le
len
leveldb
li
diff --git a/src/include/api.h b/src/include/api.h
index 74c58845c43..4821b450f9e 100644
--- a/src/include/api.h
+++ b/src/include/api.h
@@ -116,11 +116,23 @@
API_CALL_NOCONF(s, WT_CURSOR, n, cur, \
((bt) == NULL) ? NULL : ((WT_BTREE *)(bt))->dhandle)
+#define JOINABLE_CURSOR_CALL_CHECK(cur) \
+ if (F_ISSET(cur, WT_CURSTD_JOINED)) \
+ WT_ERR(__wt_curindex_joined(cur))
+
+#define JOINABLE_CURSOR_API_CALL(cur, s, n, bt) \
+ CURSOR_API_CALL(cur, s, n, bt); \
+ JOINABLE_CURSOR_CALL_CHECK(cur)
+
#define CURSOR_REMOVE_API_CALL(cur, s, bt) \
(s) = (WT_SESSION_IMPL *)(cur)->session; \
TXN_API_CALL_NOCONF(s, WT_CURSOR, remove, cur, \
((bt) == NULL) ? NULL : ((WT_BTREE *)(bt))->dhandle);
+#define JOINABLE_CURSOR_REMOVE_API_CALL(cur, s, bt) \
+ CURSOR_REMOVE_API_CALL(cur, s, bt); \
+ JOINABLE_CURSOR_CALL_CHECK(cur)
+
#define CURSOR_UPDATE_API_CALL(cur, s, n, bt) \
(s) = (WT_SESSION_IMPL *)(cur)->session; \
TXN_API_CALL_NOCONF(s, WT_CURSOR, n, cur, \
@@ -128,6 +140,10 @@
if (F_ISSET(S2C(s), WT_CONN_IN_MEMORY) && __wt_cache_full(s)) \
WT_ERR(WT_CACHE_FULL);
+#define JOINABLE_CURSOR_UPDATE_API_CALL(cur, s, n, bt) \
+ CURSOR_UPDATE_API_CALL(cur, s, n, bt); \
+ JOINABLE_CURSOR_CALL_CHECK(cur)
+
#define CURSOR_UPDATE_API_END(s, ret) \
TXN_API_END(s, ret)
diff --git a/src/include/config.h b/src/include/config.h
index 408639ab2a9..e836abaccba 100644
--- a/src/include/config.h
+++ b/src/include/config.h
@@ -68,28 +68,29 @@ struct __wt_config_parser_impl {
#define WT_CONFIG_ENTRY_WT_SESSION_compact 16
#define WT_CONFIG_ENTRY_WT_SESSION_create 17
#define WT_CONFIG_ENTRY_WT_SESSION_drop 18
-#define WT_CONFIG_ENTRY_WT_SESSION_log_flush 19
-#define WT_CONFIG_ENTRY_WT_SESSION_log_printf 20
-#define WT_CONFIG_ENTRY_WT_SESSION_open_cursor 21
-#define WT_CONFIG_ENTRY_WT_SESSION_reconfigure 22
-#define WT_CONFIG_ENTRY_WT_SESSION_rename 23
-#define WT_CONFIG_ENTRY_WT_SESSION_reset 24
-#define WT_CONFIG_ENTRY_WT_SESSION_rollback_transaction 25
-#define WT_CONFIG_ENTRY_WT_SESSION_salvage 26
-#define WT_CONFIG_ENTRY_WT_SESSION_snapshot 27
-#define WT_CONFIG_ENTRY_WT_SESSION_strerror 28
-#define WT_CONFIG_ENTRY_WT_SESSION_transaction_sync 29
-#define WT_CONFIG_ENTRY_WT_SESSION_truncate 30
-#define WT_CONFIG_ENTRY_WT_SESSION_upgrade 31
-#define WT_CONFIG_ENTRY_WT_SESSION_verify 32
-#define WT_CONFIG_ENTRY_colgroup_meta 33
-#define WT_CONFIG_ENTRY_file_meta 34
-#define WT_CONFIG_ENTRY_index_meta 35
-#define WT_CONFIG_ENTRY_table_meta 36
-#define WT_CONFIG_ENTRY_wiredtiger_open 37
-#define WT_CONFIG_ENTRY_wiredtiger_open_all 38
-#define WT_CONFIG_ENTRY_wiredtiger_open_basecfg 39
-#define WT_CONFIG_ENTRY_wiredtiger_open_usercfg 40
+#define WT_CONFIG_ENTRY_WT_SESSION_join 19
+#define WT_CONFIG_ENTRY_WT_SESSION_log_flush 20
+#define WT_CONFIG_ENTRY_WT_SESSION_log_printf 21
+#define WT_CONFIG_ENTRY_WT_SESSION_open_cursor 22
+#define WT_CONFIG_ENTRY_WT_SESSION_reconfigure 23
+#define WT_CONFIG_ENTRY_WT_SESSION_rename 24
+#define WT_CONFIG_ENTRY_WT_SESSION_reset 25
+#define WT_CONFIG_ENTRY_WT_SESSION_rollback_transaction 26
+#define WT_CONFIG_ENTRY_WT_SESSION_salvage 27
+#define WT_CONFIG_ENTRY_WT_SESSION_snapshot 28
+#define WT_CONFIG_ENTRY_WT_SESSION_strerror 29
+#define WT_CONFIG_ENTRY_WT_SESSION_transaction_sync 30
+#define WT_CONFIG_ENTRY_WT_SESSION_truncate 31
+#define WT_CONFIG_ENTRY_WT_SESSION_upgrade 32
+#define WT_CONFIG_ENTRY_WT_SESSION_verify 33
+#define WT_CONFIG_ENTRY_colgroup_meta 34
+#define WT_CONFIG_ENTRY_file_meta 35
+#define WT_CONFIG_ENTRY_index_meta 36
+#define WT_CONFIG_ENTRY_table_meta 37
+#define WT_CONFIG_ENTRY_wiredtiger_open 38
+#define WT_CONFIG_ENTRY_wiredtiger_open_all 39
+#define WT_CONFIG_ENTRY_wiredtiger_open_basecfg 40
+#define WT_CONFIG_ENTRY_wiredtiger_open_usercfg 41
/*
* configuration section: END
* DO NOT EDIT: automatically built by dist/flags.py.
diff --git a/src/include/cursor.h b/src/include/cursor.h
index 1cbe76216b1..23d3f3745db 100644
--- a/src/include/cursor.h
+++ b/src/include/cursor.h
@@ -264,6 +264,66 @@ struct __wt_cursor_index {
uint8_t *cg_needvalue;
};
+struct __wt_cursor_join_iter {
+ WT_SESSION_IMPL *session;
+ WT_CURSOR_JOIN *cjoin;
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_CURSOR *cursor;
+ WT_ITEM *curkey;
+ bool advance;
+};
+
+struct __wt_cursor_join_endpoint {
+ WT_ITEM key;
+ uint8_t recno_buf[10]; /* holds packed recno */
+ WT_CURSOR *cursor;
+
+#define WT_CURJOIN_END_LT 0x01 /* include values < cursor */
+#define WT_CURJOIN_END_EQ 0x02 /* include values == cursor */
+#define WT_CURJOIN_END_GT 0x04 /* include values > cursor */
+#define WT_CURJOIN_END_GE (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ)
+#define WT_CURJOIN_END_LE (WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ)
+#define WT_CURJOIN_END_OWN_KEY 0x08 /* must free key's data */
+ uint8_t flags; /* range for this endpoint */
+};
+
+struct __wt_cursor_join_entry {
+ WT_INDEX *index;
+ WT_CURSOR *main; /* raw main table cursor */
+ WT_BLOOM *bloom; /* Bloom filter handle */
+ uint64_t bloom_bit_count; /* bits per item in bloom */
+ uint64_t bloom_hash_count; /* hash functions in bloom */
+ uint64_t count; /* approx number of matches */
+
+#define WT_CURJOIN_ENTRY_BLOOM 0x01 /* use a bloom filter */
+#define WT_CURJOIN_ENTRY_DISJUNCTION 0x02 /* endpoints are or-ed */
+#define WT_CURJOIN_ENTRY_OWN_BLOOM 0x04 /* this entry owns the bloom */
+ uint8_t flags;
+
+ WT_CURSOR_JOIN_ENDPOINT *ends; /* reference endpoints */
+ size_t ends_allocated;
+ size_t ends_next;
+
+ WT_JOIN_STATS stats; /* Join statistics */
+};
+
+struct __wt_cursor_join {
+ WT_CURSOR iface;
+
+ WT_TABLE *table;
+ const char *projection;
+ WT_CURSOR_JOIN_ITER *iter;
+ WT_CURSOR_JOIN_ENTRY *entries;
+ size_t entries_allocated;
+ u_int entries_next;
+ uint8_t recno_buf[10]; /* holds packed recno */
+
+#define WT_CURJOIN_ERROR 0x01 /* Error in initialization */
+#define WT_CURJOIN_INITIALIZED 0x02 /* Successful initialization */
+#define WT_CURJOIN_SKIP_FIRST_LEFT 0x04 /* First check not needed */
+ uint8_t flags;
+};
+
struct __wt_cursor_json {
char *key_buf; /* JSON formatted string */
char *value_buf; /* JSON formatted string */
@@ -298,6 +358,13 @@ struct __wt_cursor_metadata {
uint32_t flags;
};
+struct __wt_join_stats_group {
+ const char *desc_prefix; /* Prefix appears before description */
+ WT_CURSOR_JOIN *join_cursor;
+ ssize_t join_cursor_entry; /* Position in entries */
+ WT_JOIN_STATS join_stats;
+};
+
struct __wt_cursor_stat {
WT_CURSOR iface;
@@ -307,14 +374,19 @@ struct __wt_cursor_stat {
int64_t *stats; /* Statistics */
int stats_base; /* Base statistics value */
int stats_count; /* Count of statistics values */
- const char *(*stats_desc)(int); /* Statistics descriptions */
+ int (*stats_desc)(WT_CURSOR_STAT *, int, const char **);
+ /* Statistics descriptions */
+ int (*next_set)(WT_SESSION_IMPL *, WT_CURSOR_STAT *, bool,
+ bool); /* Advance to next set */
union { /* Copies of the statistics */
WT_DSRC_STATS dsrc_stats;
WT_CONNECTION_STATS conn_stats;
+ WT_JOIN_STATS_GROUP join_stats_group;
} u;
const char **cfg; /* Original cursor configuration */
+ char *desc_buf; /* Saved description string */
int key; /* Current stats key */
uint64_t v; /* Current stats value */
diff --git a/src/include/cursor.i b/src/include/cursor.i
index c6ce04cab6f..9dd280534b4 100644
--- a/src/include/cursor.i
+++ b/src/include/cursor.i
@@ -139,6 +139,70 @@ __curfile_leave(WT_CURSOR_BTREE *cbt)
}
/*
+ * __wt_curindex_get_valuev --
+ * Internal implementation of WT_CURSOR->get_value for index cursors
+ */
+static inline int
+__wt_curindex_get_valuev(WT_CURSOR *cursor, va_list ap)
+{
+ WT_CURSOR_INDEX *cindex;
+ WT_DECL_RET;
+ WT_ITEM *item;
+ WT_SESSION_IMPL *session;
+
+ cindex = (WT_CURSOR_INDEX *)cursor;
+ session = (WT_SESSION_IMPL *)cursor->session;
+ WT_CURSOR_NEEDVALUE(cursor);
+
+ if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) {
+ ret = __wt_schema_project_merge(session,
+ cindex->cg_cursors, cindex->value_plan,
+ cursor->value_format, &cursor->value);
+ if (ret == 0) {
+ item = va_arg(ap, WT_ITEM *);
+ item->data = cursor->value.data;
+ item->size = cursor->value.size;
+ }
+ } else
+ ret = __wt_schema_project_out(session,
+ cindex->cg_cursors, cindex->value_plan, ap);
+err: return (ret);
+}
+
+/*
+ * __wt_curtable_get_valuev --
+ * Internal implementation of WT_CURSOR->get_value for table cursors.
+ */
+static inline int
+__wt_curtable_get_valuev(WT_CURSOR *cursor, va_list ap)
+{
+ WT_CURSOR *primary;
+ WT_CURSOR_TABLE *ctable;
+ WT_DECL_RET;
+ WT_ITEM *item;
+ WT_SESSION_IMPL *session;
+
+ ctable = (WT_CURSOR_TABLE *)cursor;
+ session = (WT_SESSION_IMPL *)cursor->session;
+ primary = *ctable->cg_cursors;
+ WT_CURSOR_NEEDVALUE(primary);
+
+ if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) {
+ ret = __wt_schema_project_merge(session,
+ ctable->cg_cursors, ctable->plan,
+ cursor->value_format, &cursor->value);
+ if (ret == 0) {
+ item = va_arg(ap, WT_ITEM *);
+ item->data = cursor->value.data;
+ item->size = cursor->value.size;
+ }
+ } else
+ ret = __wt_schema_project_out(session,
+ ctable->cg_cursors, ctable->plan, ap);
+err: return (ret);
+}
+
+/*
* __wt_cursor_dhandle_incr_use --
* Increment the in-use counter in cursor's data source.
*/
diff --git a/src/include/extern.h b/src/include/extern.h
index a6ccc526f8c..743a3c3ac31 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -83,6 +83,8 @@ extern int __wt_bloom_finalize(WT_BLOOM *bloom);
extern int __wt_bloom_hash(WT_BLOOM *bloom, WT_ITEM *key, WT_BLOOM_HASH *bhash);
extern int __wt_bloom_hash_get(WT_BLOOM *bloom, WT_BLOOM_HASH *bhash);
extern int __wt_bloom_get(WT_BLOOM *bloom, WT_ITEM *key);
+extern int __wt_bloom_inmem_get(WT_BLOOM *bloom, WT_ITEM *key);
+extern int __wt_bloom_intersection(WT_BLOOM *bloom, WT_BLOOM *other);
extern int __wt_bloom_close(WT_BLOOM *bloom);
extern int __wt_bloom_drop(WT_BLOOM *bloom, const char *config);
extern int __wt_compact(WT_SESSION_IMPL *session, const char *cfg[]);
@@ -274,7 +276,10 @@ extern int __wt_curdump_create(WT_CURSOR *child, WT_CURSOR *owner, WT_CURSOR **c
extern int __wt_curfile_update_check(WT_CURSOR *cursor);
extern int __wt_curfile_create(WT_SESSION_IMPL *session, WT_CURSOR *owner, const char *cfg[], bool bulk, bool bitmap, WT_CURSOR **cursorp);
extern int __wt_curfile_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
+extern int __wt_curindex_joined(WT_CURSOR *cursor);
extern int __wt_curindex_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
+extern int __wt_curjoin_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
+extern int __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_INDEX *idx, WT_CURSOR *ref_cursor, uint32_t flags, uint32_t range, uint64_t count, uint64_t bloom_bit_count, uint64_t bloom_hash_count);
extern int __wt_json_alloc_unpack(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, WT_CURSOR_JSON *json, bool iskey, va_list ap);
extern void __wt_json_close(WT_SESSION_IMPL *session, WT_CURSOR *cursor);
extern size_t __wt_json_unpack_char(char ch, u_char *buf, size_t bufsz, bool force_unicode);
@@ -287,8 +292,8 @@ extern int __wt_json_strncpy(char **pdst, size_t dstlen, const char *src, size_t
extern int __wt_curlog_open(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR **cursorp);
extern int __wt_curmetadata_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
extern void __wt_curstat_dsrc_final(WT_CURSOR_STAT *cst);
-extern int __wt_curstat_init(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR_STAT *cst);
-extern int __wt_curstat_open(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR **cursorp);
+extern int __wt_curstat_init(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *curjoin, const char *cfg[], WT_CURSOR_STAT *cst);
+extern int __wt_curstat_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *other, const char *cfg[], WT_CURSOR **cursorp);
extern int __wt_cursor_notsup(WT_CURSOR *cursor);
extern int __wt_cursor_noop(WT_CURSOR *cursor);
extern void __wt_cursor_set_notsup(WT_CURSOR *cursor);
@@ -316,7 +321,7 @@ extern int __wt_curtable_get_value(WT_CURSOR *cursor, ...);
extern void __wt_curtable_set_key(WT_CURSOR *cursor, ...);
extern void __wt_curtable_set_value(WT_CURSOR *cursor, ...);
extern int __wt_table_range_truncate(WT_CURSOR_TABLE *start, WT_CURSOR_TABLE *stop);
-extern int __wt_curtable_open(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR **cursorp);
+extern int __wt_curtable_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
extern int __wt_evict_file(WT_SESSION_IMPL *session, WT_CACHE_OP syncop);
extern void __wt_evict_list_clear_page(WT_SESSION_IMPL *session, WT_REF *ref);
extern int __wt_evict_server_wake(WT_SESSION_IMPL *session);
@@ -533,6 +538,8 @@ extern int __wt_struct_confchk(WT_SESSION_IMPL *session, WT_CONFIG_ITEM *v);
extern int __wt_struct_size(WT_SESSION_IMPL *session, size_t *sizep, const char *fmt, ...);
extern int __wt_struct_pack(WT_SESSION_IMPL *session, void *buffer, size_t size, const char *fmt, ...);
extern int __wt_struct_unpack(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, ...);
+extern int __wt_struct_unpack_size(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, size_t *resultp);
+extern int __wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt, const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf, void **reallocp);
extern int __wt_ovfl_discard_add(WT_SESSION_IMPL *session, WT_PAGE *page, WT_CELL *cell);
extern void __wt_ovfl_discard_free(WT_SESSION_IMPL *session, WT_PAGE *page);
extern int __wt_ovfl_reuse_search(WT_SESSION_IMPL *session, WT_PAGE *page, uint8_t **addrp, size_t *addr_sizep, const void *value, size_t value_size);
@@ -674,19 +681,24 @@ __wt_scr_alloc_func(WT_SESSION_IMPL *session, size_t size, WT_ITEM **scratchp
extern void __wt_scr_discard(WT_SESSION_IMPL *session);
extern void *__wt_ext_scr_alloc( WT_EXTENSION_API *wt_api, WT_SESSION *wt_session, size_t size);
extern void __wt_ext_scr_free(WT_EXTENSION_API *wt_api, WT_SESSION *wt_session, void *p);
-extern const char *__wt_stat_dsrc_desc(int slot);
+extern int __wt_stat_dsrc_desc(WT_CURSOR_STAT *cst, int slot, const char **p);
extern void __wt_stat_dsrc_init_single(WT_DSRC_STATS *stats);
extern void __wt_stat_dsrc_init(WT_DATA_HANDLE *handle);
extern void __wt_stat_dsrc_clear_single(WT_DSRC_STATS *stats);
extern void __wt_stat_dsrc_clear_all(WT_DSRC_STATS **stats);
extern void __wt_stat_dsrc_aggregate_single( WT_DSRC_STATS *from, WT_DSRC_STATS *to);
extern void __wt_stat_dsrc_aggregate( WT_DSRC_STATS **from, WT_DSRC_STATS *to);
-extern const char *__wt_stat_connection_desc(int slot);
+extern int __wt_stat_connection_desc(WT_CURSOR_STAT *cst, int slot, const char **p);
extern void __wt_stat_connection_init_single(WT_CONNECTION_STATS *stats);
extern void __wt_stat_connection_init(WT_CONNECTION_IMPL *handle);
extern void __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats);
extern void __wt_stat_connection_clear_all(WT_CONNECTION_STATS **stats);
extern void __wt_stat_connection_aggregate( WT_CONNECTION_STATS **from, WT_CONNECTION_STATS *to);
+extern int __wt_stat_join_desc(WT_CURSOR_STAT *cst, int slot, const char **p);
+extern void __wt_stat_join_init_single(WT_JOIN_STATS *stats);
+extern void __wt_stat_join_clear_single(WT_JOIN_STATS *stats);
+extern void __wt_stat_join_clear_all(WT_JOIN_STATS **stats);
+extern void __wt_stat_join_aggregate( WT_JOIN_STATS **from, WT_JOIN_STATS *to);
extern void __wt_txn_release_snapshot(WT_SESSION_IMPL *session);
extern void __wt_txn_get_snapshot(WT_SESSION_IMPL *session);
extern void __wt_txn_update_oldest(WT_SESSION_IMPL *session, bool force);
diff --git a/src/include/stat.h b/src/include/stat.h
index 44e2d7edd8a..fa434a3eb7c 100644
--- a/src/include/stat.h
+++ b/src/include/stat.h
@@ -493,4 +493,14 @@ struct __wt_dsrc_stats {
int64_t txn_update_conflict;
};
+/*
+ * Statistics entries for join cursors.
+ */
+#define WT_JOIN_STATS_BASE 3000
+struct __wt_join_stats {
+ int64_t accesses;
+ int64_t actual_count;
+ int64_t bloom_false_positive;
+};
+
/* Statistics section: END */
diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in
index a246cec2bab..2c3f8b27e1e 100644
--- a/src/include/wiredtiger.in
+++ b/src/include/wiredtiger.in
@@ -574,11 +574,12 @@ struct __wt_cursor {
#define WT_CURSTD_KEY_EXT 0x0020 /* Key points out of the tree. */
#define WT_CURSTD_KEY_INT 0x0040 /* Key points into the tree. */
#define WT_CURSTD_KEY_SET (WT_CURSTD_KEY_EXT | WT_CURSTD_KEY_INT)
-#define WT_CURSTD_OPEN 0x0080
-#define WT_CURSTD_OVERWRITE 0x0100
-#define WT_CURSTD_RAW 0x0200
-#define WT_CURSTD_VALUE_EXT 0x0400 /* Value points out of the tree. */
-#define WT_CURSTD_VALUE_INT 0x0800 /* Value points into the tree. */
+#define WT_CURSTD_JOINED 0x0080
+#define WT_CURSTD_OPEN 0x0100
+#define WT_CURSTD_OVERWRITE 0x0200
+#define WT_CURSTD_RAW 0x0400
+#define WT_CURSTD_VALUE_EXT 0x0800 /* Value points out of the tree. */
+#define WT_CURSTD_VALUE_INT 0x1000 /* Value points into the tree. */
#define WT_CURSTD_VALUE_SET (WT_CURSTD_VALUE_EXT | WT_CURSTD_VALUE_INT)
uint32_t flags;
#endif
@@ -1236,6 +1237,61 @@ struct __wt_session {
const char *name, const char *config);
/*!
+ * Join a join cursor with a reference cursor.
+ *
+ * @snippet ex_schema.c Join cursors
+ *
+ * @param session the session handle
+ * @param join_cursor a cursor that was opened using a
+ * \c "join:" URI. It may not have been used for any operations
+ * other than other join calls.
+ * @param ref_cursor either an index cursor having the same base table
+ * as the join_cursor, or a table cursor open on the same base table.
+ * The ref_cursor must be positioned.
+ *
+ * The ref_cursor limits the results seen by iterating the
+ * join_cursor to table items referred to by the key in this
+ * index. The set of keys referred to is modified by the compare
+ * config option.
+ *
+ * Multiple join calls builds up a set of ref_cursors, and the
+ * results seen by iteration are the intersection of the cursor
+ * ranges participating in the join.
+ *
+ * After the join call completes, the ref_cursor cursor may not be
+ * used for any purpose other than get_key and get_value. Any other
+ * cursor method (e.g. next, prev,close) will fail. When the
+ * join_cursor is closed, the ref_cursor is made available for
+ * general use again. The application should close ref_cursor when
+ * finished with it, although not before the join_cursor is closed.
+ *
+ * @configstart{WT_SESSION.join, see dist/api_data.py}
+ * @config{bloom_bit_count, the number of bits used per item for the
+ * bloom filter., an integer between 2 and 1000; default \c 16.}
+ * @config{bloom_hash_count, the number of hash values per item for the
+ * bloom filter., an integer between 2 and 100; default \c 8.}
+ * @config{compare, modifies the set of items to be returned so that the
+ * index key satisfies the given comparison relative to the key set in
+ * this cursor., a string\, chosen from the following options: \c "eq"\,
+ * \c "ge"\, \c "gt"\, \c "le"\, \c "lt"; default \c "eq".}
+ * @config{count, set an approximate count of the elements that would be
+ * included in the join. This is used in sizing the bloom filter\, and
+ * also influences evaluation order for cursors in the join. When the
+ * count is equal for multiple bloom filters in a composition of joins\,
+ * the bloom filter may be shared., an integer; default \c .}
+ * @config{strategy, when set to bloom\, a bloom filter is created and
+ * populated for this index. This has an up front cost but may reduce
+ * the number of accesses to the main table when iterating the joined
+ * cursor. The bloom setting requires that count be set., a string\,
+ * chosen from the following options: \c "bloom"\, \c "default"; default
+ * empty.}
+ * @configend
+ * @errors
+ */
+ int __F(join)(WT_SESSION *session, WT_CURSOR *join_cursor,
+ WT_CURSOR *ref_cursor, const char *config);
+
+ /*!
* Flush the log.
*
* @param session the session handle
@@ -4146,6 +4202,19 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection);
#define WT_STAT_DSRC_SESSION_CURSOR_OPEN 2097
/*! transaction: update conflicts */
#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2098
+
+/*!
+ * @}
+ * @name Statistics for join cursors
+ * @anchor statistics_join
+ * @{
+ */
+/*! : accesses */
+#define WT_STAT_JOIN_ACCESSES 3000
+/*! : actual count of items */
+#define WT_STAT_JOIN_ACTUAL_COUNT 3001
+/*! : bloom filter false positives */
+#define WT_STAT_JOIN_BLOOM_FALSE_POSITIVE 3002
/*! @} */
/*
* Statistics section: END
diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h
index 3f4e0ada7f1..0a1e143ce70 100644
--- a/src/include/wt_internal.h
+++ b/src/include/wt_internal.h
@@ -136,6 +136,14 @@ struct __wt_cursor_dump;
typedef struct __wt_cursor_dump WT_CURSOR_DUMP;
struct __wt_cursor_index;
typedef struct __wt_cursor_index WT_CURSOR_INDEX;
+struct __wt_cursor_join;
+ typedef struct __wt_cursor_join WT_CURSOR_JOIN;
+struct __wt_cursor_join_endpoint;
+ typedef struct __wt_cursor_join_endpoint WT_CURSOR_JOIN_ENDPOINT;
+struct __wt_cursor_join_entry;
+ typedef struct __wt_cursor_join_entry WT_CURSOR_JOIN_ENTRY;
+struct __wt_cursor_join_iter;
+ typedef struct __wt_cursor_join_iter WT_CURSOR_JOIN_ITER;
struct __wt_cursor_json;
typedef struct __wt_cursor_json WT_CURSOR_JSON;
struct __wt_cursor_log;
@@ -178,6 +186,10 @@ struct __wt_insert;
typedef struct __wt_insert WT_INSERT;
struct __wt_insert_head;
typedef struct __wt_insert_head WT_INSERT_HEAD;
+struct __wt_join_stats;
+ typedef struct __wt_join_stats WT_JOIN_STATS;
+struct __wt_join_stats_group;
+ typedef struct __wt_join_stats_group WT_JOIN_STATS_GROUP;
struct __wt_keyed_encryptor;
typedef struct __wt_keyed_encryptor WT_KEYED_ENCRYPTOR;
struct __wt_log;
diff --git a/src/lsm/lsm_stat.c b/src/lsm/lsm_stat.c
index 4381ca0df00..c1eb7a2a389 100644
--- a/src/lsm/lsm_stat.c
+++ b/src/lsm/lsm_stat.c
@@ -77,12 +77,12 @@ __curstat_lsm_init(
*/
WT_ERR(__wt_buf_fmt(
session, uribuf, "statistics:%s", chunk->uri));
- ret = __wt_curstat_open(session, uribuf->data,
+ ret = __wt_curstat_open(session, uribuf->data, NULL,
F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) ? disk_cfg : cfg,
&stat_cursor);
if (ret == WT_NOTFOUND && F_ISSET(chunk, WT_LSM_CHUNK_ONDISK))
ret = __wt_curstat_open(
- session, uribuf->data, cfg, &stat_cursor);
+ session, uribuf->data, NULL, cfg, &stat_cursor);
WT_ERR(ret);
/*
@@ -107,7 +107,7 @@ __curstat_lsm_init(
WT_ERR(__wt_buf_fmt(
session, uribuf, "statistics:%s", chunk->bloom_uri));
WT_ERR(__wt_curstat_open(
- session, uribuf->data, cfg, &stat_cursor));
+ session, uribuf->data, NULL, cfg, &stat_cursor));
/*
* The underlying statistics have now been initialized; fill in
diff --git a/src/packing/pack_impl.c b/src/packing/pack_impl.c
index 3a4428eae15..447c887dc6f 100644
--- a/src/packing/pack_impl.c
+++ b/src/packing/pack_impl.c
@@ -105,3 +105,108 @@ __wt_struct_unpack(WT_SESSION_IMPL *session,
return (ret);
}
+
+/*
+ * __wt_struct_unpack_size --
+ * Determine the packed size of a buffer matching the format.
+ */
+int
+__wt_struct_unpack_size(WT_SESSION_IMPL *session,
+ const void *buffer, size_t size, const char *fmt, size_t *resultp)
+{
+ WT_DECL_PACK_VALUE(pv);
+ WT_DECL_RET;
+ WT_PACK pack;
+ const uint8_t *p, *end;
+
+ p = buffer;
+ end = p + size;
+
+ WT_RET(__pack_init(session, &pack, fmt));
+ while ((ret = __pack_next(&pack, &pv)) == 0)
+ WT_RET(__unpack_read(session, &pv, &p, (size_t)(end - p)));
+
+ /* Be paranoid - __pack_write should never overflow. */
+ WT_ASSERT(session, p <= end);
+
+ if (ret != WT_NOTFOUND)
+ return (ret);
+
+ *resultp = WT_PTRDIFF(p, buffer);
+ return (0);
+}
+
+/*
+ * __wt_struct_repack --
+ * Return the subset of the packed buffer that represents part of
+ * the format. If the result is not contiguous in the existing
+ * buffer, a buffer is reallocated and filled.
+ */
+int
+__wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt,
+ const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf,
+ void **reallocp)
+{
+ WT_DECL_PACK_VALUE(pvin);
+ WT_DECL_PACK_VALUE(pvout);
+ WT_DECL_RET;
+ WT_PACK packin, packout;
+ const uint8_t *before, *end, *p;
+ uint8_t *newbuf, *pout;
+ size_t len;
+ const void *start;
+
+ start = newbuf = NULL;
+ p = inbuf->data;
+ end = p + inbuf->size;
+
+ /*
+ * Handle this non-contiguous case: 'U' -> 'u' at the end of the buf.
+ * The former case has the size embedded before the item, the latter
+ * does not.
+ */
+ if ((len = strlen(outfmt)) > 1 && outfmt[len - 1] == 'u' &&
+ strlen(infmt) > len && infmt[len - 1] == 'U') {
+ WT_ERR(__wt_realloc(session, NULL, inbuf->size, reallocp));
+ pout = *reallocp;
+ } else
+ pout = NULL;
+
+ WT_ERR(__pack_init(session, &packout, outfmt));
+ WT_ERR(__pack_init(session, &packin, infmt));
+
+ /* Outfmt should complete before infmt */
+ while ((ret = __pack_next(&packout, &pvout)) == 0) {
+ WT_ERR(__pack_next(&packin, &pvin));
+ before = p;
+ WT_ERR(__unpack_read(session, &pvin, &p, (size_t)(end - p)));
+ if (pvout.type != pvin.type) {
+ if (pvout.type == 'u' && pvin.type == 'U') {
+ /* Skip the prefixed size, we don't need it */
+ WT_ERR(__wt_struct_unpack_size(session, before,
+ (size_t)(end - before), "I", &len));
+ before += len;
+ } else
+ WT_ERR(ENOTSUP);
+ }
+ if (pout != NULL) {
+ memcpy(pout, before, WT_PTRDIFF(p, before));
+ pout += p - before;
+ } else if (start == NULL)
+ start = before;
+ }
+ WT_ERR_NOTFOUND_OK(ret);
+
+ /* Be paranoid - __pack_write should never overflow. */
+ WT_ASSERT(session, p <= end);
+
+ if (pout != NULL) {
+ outbuf->data = *reallocp;
+ outbuf->size = WT_PTRDIFF(pout, *reallocp);
+ } else {
+ outbuf->data = start;
+ outbuf->size = WT_PTRDIFF(p, start);
+ }
+
+err: return (ret);
+}
diff --git a/src/schema/schema_stat.c b/src/schema/schema_stat.c
index d73d66cd399..82c2e2a15dc 100644
--- a/src/schema/schema_stat.c
+++ b/src/schema/schema_stat.c
@@ -24,7 +24,7 @@ __wt_curstat_colgroup_init(WT_SESSION_IMPL *session,
WT_RET(__wt_scr_alloc(session, 0, &buf));
WT_ERR(__wt_buf_fmt(session, buf, "statistics:%s", colgroup->source));
- ret = __wt_curstat_init(session, buf->data, cfg, cst);
+ ret = __wt_curstat_init(session, buf->data, NULL, cfg, cst);
err: __wt_scr_free(session, &buf);
return (ret);
@@ -46,7 +46,7 @@ __wt_curstat_index_init(WT_SESSION_IMPL *session,
WT_RET(__wt_scr_alloc(session, 0, &buf));
WT_ERR(__wt_buf_fmt(session, buf, "statistics:%s", idx->source));
- ret = __wt_curstat_init(session, buf->data, cfg, cst);
+ ret = __wt_curstat_init(session, buf->data, NULL, cfg, cst);
err: __wt_scr_free(session, &buf);
return (ret);
@@ -159,7 +159,7 @@ __wt_curstat_table_init(WT_SESSION_IMPL *session,
WT_ERR(__wt_buf_fmt(
session, buf, "statistics:%s", table->cgroups[i]->name));
WT_ERR(__wt_curstat_open(
- session, buf->data, cfg, &stat_cursor));
+ session, buf->data, NULL, cfg, &stat_cursor));
new = (WT_DSRC_STATS *)WT_CURSOR_STATS(stat_cursor);
if (i == 0)
*stats = *new;
@@ -174,7 +174,7 @@ __wt_curstat_table_init(WT_SESSION_IMPL *session,
WT_ERR(__wt_buf_fmt(
session, buf, "statistics:%s", table->indices[i]->name));
WT_ERR(__wt_curstat_open(
- session, buf->data, cfg, &stat_cursor));
+ session, buf->data, NULL, cfg, &stat_cursor));
new = (WT_DSRC_STATS *)WT_CURSOR_STATS(stat_cursor);
__wt_stat_dsrc_aggregate_single(new, stats);
WT_ERR(stat_cursor->close(stat_cursor));
diff --git a/src/session/session_api.c b/src/session/session_api.c
index 7d718c38c4f..db81623c613 100644
--- a/src/session/session_api.c
+++ b/src/session/session_api.c
@@ -240,12 +240,12 @@ err: API_END_RET_NOTFOUND_MAP(session, ret);
}
/*
- * __wt_open_cursor --
- * Internal version of WT_SESSION::open_cursor.
+ * __session_open_cursor_int --
+ * Internal version of WT_SESSION::open_cursor, with second cursor arg.
*/
-int
-__wt_open_cursor(WT_SESSION_IMPL *session,
- const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp)
+static int
+__session_open_cursor_int(WT_SESSION_IMPL *session, const char *uri,
+ WT_CURSOR *owner, WT_CURSOR *other, const char *cfg[], WT_CURSOR **cursorp)
{
WT_COLGROUP *colgroup;
WT_DATA_SOURCE *dsrc;
@@ -267,7 +267,8 @@ __wt_open_cursor(WT_SESSION_IMPL *session,
*/
case 't':
if (WT_PREFIX_MATCH(uri, "table:"))
- WT_RET(__wt_curtable_open(session, uri, cfg, cursorp));
+ WT_RET(__wt_curtable_open(
+ session, uri, owner, cfg, cursorp));
break;
case 'c':
if (WT_PREFIX_MATCH(uri, "colgroup:")) {
@@ -288,6 +289,11 @@ __wt_open_cursor(WT_SESSION_IMPL *session,
WT_RET(__wt_curindex_open(
session, uri, owner, cfg, cursorp));
break;
+ case 'j':
+ if (WT_PREFIX_MATCH(uri, "join:"))
+ WT_RET(__wt_curjoin_open(
+ session, uri, owner, cfg, cursorp));
+ break;
case 'l':
if (WT_PREFIX_MATCH(uri, "lsm:"))
WT_RET(__wt_clsm_open(
@@ -316,7 +322,8 @@ __wt_open_cursor(WT_SESSION_IMPL *session,
break;
case 's':
if (WT_PREFIX_MATCH(uri, "statistics:"))
- WT_RET(__wt_curstat_open(session, uri, cfg, cursorp));
+ WT_RET(__wt_curstat_open(session, uri, other, cfg,
+ cursorp));
break;
default:
break;
@@ -346,6 +353,18 @@ __wt_open_cursor(WT_SESSION_IMPL *session,
}
/*
+ * __wt_open_cursor --
+ * Internal version of WT_SESSION::open_cursor.
+ */
+int
+__wt_open_cursor(WT_SESSION_IMPL *session,
+ const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp)
+{
+ return (__session_open_cursor_int(session, uri, owner, NULL, cfg,
+ cursorp));
+}
+
+/*
* __session_open_cursor --
* WT_SESSION->open_cursor method.
*/
@@ -356,18 +375,22 @@ __session_open_cursor(WT_SESSION *wt_session,
WT_CURSOR *cursor;
WT_DECL_RET;
WT_SESSION_IMPL *session;
+ bool statjoin;
cursor = *cursorp = NULL;
session = (WT_SESSION_IMPL *)wt_session;
SESSION_API_CALL(session, open_cursor, config, cfg);
- if ((to_dup == NULL && uri == NULL) || (to_dup != NULL && uri != NULL))
+ statjoin = (to_dup != NULL && uri != NULL &&
+ WT_STREQ(uri, "statistics:join"));
+ if ((to_dup == NULL && uri == NULL) ||
+ (to_dup != NULL && uri != NULL && !statjoin))
WT_ERR_MSG(session, EINVAL,
"should be passed either a URI or a cursor to duplicate, "
"but not both");
- if (to_dup != NULL) {
+ if (to_dup != NULL && !statjoin) {
uri = to_dup->uri;
if (!WT_PREFIX_MATCH(uri, "colgroup:") &&
!WT_PREFIX_MATCH(uri, "index:") &&
@@ -379,8 +402,9 @@ __session_open_cursor(WT_SESSION *wt_session,
WT_ERR(__wt_bad_object_type(session, uri));
}
- WT_ERR(__wt_open_cursor(session, uri, NULL, cfg, &cursor));
- if (to_dup != NULL)
+ WT_ERR(__session_open_cursor_int(session, uri, NULL,
+ statjoin ? to_dup : NULL, cfg, &cursor));
+ if (to_dup != NULL && !statjoin)
WT_ERR(__wt_cursor_dup_position(to_dup, cursor));
*cursorp = cursor;
@@ -614,6 +638,123 @@ err: /* Note: drop operations cannot be unrolled (yet?). */
}
/*
+ * __session_join --
+ * WT_SESSION->join method.
+ */
+static int
+__session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor,
+ WT_CURSOR *ref_cursor, const char *config)
+{
+ WT_CONFIG_ITEM cval;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ WT_CURSOR_INDEX *cindex;
+ WT_CURSOR_JOIN *cjoin;
+ WT_CURSOR_TABLE *ctable;
+ WT_INDEX *idx;
+ WT_TABLE *table;
+ uint32_t flags, range;
+ uint64_t count;
+ uint64_t bloom_bit_count, bloom_hash_count;
+
+ count = 0;
+ session = (WT_SESSION_IMPL *)wt_session;
+ SESSION_API_CALL(session, join, config, cfg);
+ table = NULL;
+
+ if (!WT_PREFIX_MATCH(join_cursor->uri, "join:")) {
+ __wt_errx(session, "not a join cursor");
+ WT_ERR(EINVAL);
+ }
+
+ if (WT_PREFIX_MATCH(ref_cursor->uri, "index:")) {
+ cindex = (WT_CURSOR_INDEX *)ref_cursor;
+ idx = cindex->index;
+ table = cindex->table;
+ WT_CURSOR_CHECKKEY(ref_cursor);
+ } else if (WT_PREFIX_MATCH(ref_cursor->uri, "table:")) {
+ idx = NULL;
+ ctable = (WT_CURSOR_TABLE *)ref_cursor;
+ table = ctable->table;
+ WT_CURSOR_CHECKKEY(ctable->cg_cursors[0]);
+ } else {
+ __wt_errx(session, "not an index or table cursor");
+ WT_ERR(EINVAL);
+ }
+
+ cjoin = (WT_CURSOR_JOIN *)join_cursor;
+ if (cjoin->table != table) {
+ __wt_errx(session, "table for join cursor does not match "
+ "table for index");
+ WT_ERR(EINVAL);
+ }
+ if (F_ISSET(ref_cursor, WT_CURSTD_JOINED)) {
+ __wt_errx(session, "index cursor already used in a join");
+ WT_ERR(EINVAL);
+ }
+
+ /* "ge" is the default */
+ range = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ;
+ flags = 0;
+ WT_ERR(__wt_config_gets(session, cfg, "compare", &cval));
+ if (cval.len != 0) {
+ if (WT_STRING_MATCH("gt", cval.str, cval.len))
+ range = WT_CURJOIN_END_GT;
+ else if (WT_STRING_MATCH("lt", cval.str, cval.len))
+ range = WT_CURJOIN_END_LT;
+ else if (WT_STRING_MATCH("le", cval.str, cval.len))
+ range = WT_CURJOIN_END_LE;
+ else if (WT_STRING_MATCH("eq", cval.str, cval.len))
+ range = WT_CURJOIN_END_EQ;
+ else if (!WT_STRING_MATCH("ge", cval.str, cval.len))
+ WT_ERR(EINVAL);
+ }
+ WT_ERR(__wt_config_gets(session, cfg, "count", &cval));
+ if (cval.len != 0)
+ count = (uint64_t)cval.val;
+
+ WT_ERR(__wt_config_gets(session, cfg, "strategy", &cval));
+ if (cval.len != 0) {
+ if (WT_STRING_MATCH("bloom", cval.str, cval.len))
+ LF_SET(WT_CURJOIN_ENTRY_BLOOM);
+ else if (!WT_STRING_MATCH("default", cval.str, cval.len))
+ WT_ERR(EINVAL);
+ }
+ WT_ERR(__wt_config_gets(session, cfg, "bloom_bit_count", &cval));
+ bloom_bit_count = (uint64_t)cval.val;
+ WT_ERR(__wt_config_gets(session, cfg, "bloom_hash_count", &cval));
+ bloom_hash_count = (uint64_t)cval.val;
+ if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM)) {
+ if (count == 0) {
+ __wt_errx(session, "count must be nonzero when "
+ "strategy=bloom");
+ WT_ERR(EINVAL);
+ }
+ if (cjoin->entries_next == 0) {
+ __wt_errx(session, "the first joined cursor cannot "
+ "specify strategy=bloom");
+ WT_ERR(EINVAL);
+ }
+ }
+ WT_ERR(__wt_curjoin_join(session, cjoin, idx, ref_cursor, flags,
+ range, count, bloom_bit_count, bloom_hash_count));
+ /*
+ * There's an implied ownership ordering that isn't
+ * known when the cursors are created: the join cursor
+ * must be closed before any of the indices. Enforce
+ * that here by reordering.
+ */
+ if (TAILQ_FIRST(&session->cursors) != join_cursor) {
+ TAILQ_REMOVE(&session->cursors, join_cursor, q);
+ TAILQ_INSERT_HEAD(&session->cursors, join_cursor, q);
+ }
+ /* Disable the reference cursor for regular operations */
+ F_SET(ref_cursor, WT_CURSTD_JOINED);
+
+err: API_END_RET_NOTFOUND_MAP(session, ret);
+}
+
+/*
* __session_salvage --
* WT_SESSION->salvage method.
*/
@@ -1145,6 +1286,7 @@ __open_session(WT_CONNECTION_IMPL *conn,
__session_create,
__wt_session_compact,
__session_drop,
+ __session_join,
__session_log_flush,
__session_log_printf,
__session_rename,
diff --git a/src/support/stat.c b/src/support/stat.c
index 7ca9186ea8d..77fd3a80234 100644
--- a/src/support/stat.c
+++ b/src/support/stat.c
@@ -104,10 +104,12 @@ static const char * const __stats_dsrc_desc[] = {
"transaction: update conflicts",
};
-const char *
-__wt_stat_dsrc_desc(int slot)
+int
+__wt_stat_dsrc_desc(WT_CURSOR_STAT *cst, int slot, const char **p)
{
- return (__stats_dsrc_desc[slot]);
+ WT_UNUSED(cst);
+ *p = __stats_dsrc_desc[slot];
+ return (0);
}
void
@@ -659,10 +661,12 @@ static const char * const __stats_connection_desc[] = {
"connection: total write I/Os",
};
-const char *
-__wt_stat_connection_desc(int slot)
+int
+__wt_stat_connection_desc(WT_CURSOR_STAT *cst, int slot, const char **p)
{
- return (__stats_connection_desc[slot]);
+ WT_UNUSED(cst);
+ *p = __stats_connection_desc[slot];
+ return (0);
}
void
@@ -1053,3 +1057,49 @@ __wt_stat_connection_aggregate(
to->txn_commit += WT_STAT_READ(from, txn_commit);
to->txn_rollback += WT_STAT_READ(from, txn_rollback);
}
+
+static const char * const __stats_join_desc[] = {
+ ": accesses",
+ ": actual count of items",
+ ": bloom filter false positives",
+};
+
+int
+__wt_stat_join_desc(WT_CURSOR_STAT *cst, int slot, const char **p)
+{
+ WT_UNUSED(cst);
+ *p = __stats_join_desc[slot];
+ return (0);
+}
+
+void
+__wt_stat_join_init_single(WT_JOIN_STATS *stats)
+{
+ memset(stats, 0, sizeof(*stats));
+}
+
+void
+__wt_stat_join_clear_single(WT_JOIN_STATS *stats)
+{
+ stats->accesses = 0;
+ stats->actual_count = 0;
+ stats->bloom_false_positive = 0;
+}
+
+void
+__wt_stat_join_clear_all(WT_JOIN_STATS **stats)
+{
+ u_int i;
+
+ for (i = 0; i < WT_COUNTER_SLOTS; ++i)
+ __wt_stat_join_clear_single(stats[i]);
+}
+
+void
+__wt_stat_join_aggregate(
+ WT_JOIN_STATS **from, WT_JOIN_STATS *to)
+{
+ to->accesses += WT_STAT_READ(from, accesses);
+ to->actual_count += WT_STAT_READ(from, actual_count);
+ to->bloom_false_positive += WT_STAT_READ(from, bloom_false_positive);
+}
diff --git a/test/suite/test_join01.py b/test/suite/test_join01.py
new file mode 100644
index 00000000000..ca6e5fbcabb
--- /dev/null
+++ b/test/suite/test_join01.py
@@ -0,0 +1,396 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2015 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+import wiredtiger, wttest
+from wtscenario import check_scenarios, multiply_scenarios, number_scenarios
+
+# test_join01.py
+# Join operations
+# Basic tests for join
+class test_join01(wttest.WiredTigerTestCase):
+ table_name1 = 'test_join01'
+ nentries = 100
+
+ scenarios = [
+ ('table', dict(ref='table')),
+ ('index', dict(ref='index'))
+ ]
+
+ # Override WiredTigerTestCase, we have statistics tests.
+ def setUpConnectionOpen(self, dir):
+ conn = wiredtiger.wiredtiger_open(dir,
+ 'create,statistics=(all),' + 'error_prefix="%s: "' % self.shortid())
+ return conn
+
+ def gen_key(self, i):
+ return [ i + 1 ]
+
+ def gen_values(self, i):
+ s = str(i)
+ rs = s[::-1]
+ sort3 = (self.nentries * (i % 3)) + i # multiples of 3 sort first
+ return [s, rs, sort3]
+
+ # Common function for testing iteration of join cursors
+ def iter_common(self, jc, do_proj):
+ # See comments in join_common()
+ expect = [73, 82, 62, 83, 92]
+ while jc.next() == 0:
+ [k] = jc.get_keys()
+ i = k - 1
+ if do_proj: # our projection test simply reverses the values
+ [v2,v1,v0] = jc.get_values()
+ else:
+ [v0,v1,v2] = jc.get_values()
+ self.assertEquals(self.gen_values(i), [v0,v1,v2])
+ if len(expect) == 0 or i != expect[0]:
+ self.tty(' result ' + str(i) + ' is not in: ' + str(expect))
+ self.assertTrue(i == expect[0])
+ expect.remove(i)
+ self.assertEquals(0, len(expect))
+
+ # Stats are collected twice: after iterating
+ # through the join cursor once, and secondly after resetting
+ # the join cursor and iterating again.
+ def stats(self, jc, which):
+ statcur = self.session.open_cursor('statistics:join', jc, None)
+ self.check_stats(statcur, 0, 'join: index:join01:index1: ' +
+ 'bloom filter false positives')
+ statcur.close()
+
+ def statstr_to_int(self, str):
+ """
+ Convert a statistics value string, which may be in either form:
+ '12345' or '33M (33604836)'
+ """
+ parts = str.rpartition('(')
+ return int(parts[2].rstrip(')'))
+
+ # string should appear with a minimum value of least "min".
+ def check_stats(self, statcursor, min, lookfor):
+ stringclass = ''.__class__
+ intclass = (0).__class__
+
+ # Reset the cursor, we're called multiple times.
+ statcursor.reset()
+
+ found = False
+ foundval = 0
+ self.printVerbose(3, 'statistics:')
+ for id, desc, valstr, val in statcursor:
+ self.assertEqual(type(desc), stringclass)
+ self.assertEqual(type(valstr), stringclass)
+ self.assertEqual(type(val), intclass)
+ self.assertEqual(val, self.statstr_to_int(valstr))
+ self.printVerbose(3, ' stat: \'' + desc + '\', \'' +
+ valstr + '\', ' + str(val))
+ if desc == lookfor:
+ found = True
+ foundval = val
+
+ self.assertTrue(found, 'in stats, did not see: ' + lookfor)
+ self.assertTrue(foundval >= min)
+
+ # Common function for testing the most basic functionality
+ # of joins
+ def join_common(self, joincfg0, joincfg1, do_proj, do_stats):
+ #self.tty('join_common(' + joincfg0 + ',' + joincfg1 + ',' +
+ # str(do_proj) + ')')
+ self.session.create('table:join01', 'key_format=r' +
+ ',value_format=SSi,columns=(k,v0,v1,v2)')
+ self.session.create('index:join01:index0','columns=(v0)')
+ self.session.create('index:join01:index1','columns=(v1)')
+ self.session.create('index:join01:index2','columns=(v2)')
+
+ c = self.session.open_cursor('table:join01', None, None)
+ for i in range(0, self.nentries):
+ c.set_key(*self.gen_key(i))
+ c.set_value(*self.gen_values(i))
+ c.insert()
+ c.close()
+
+ if do_proj:
+ proj_suffix = '(v2,v1,v0)' # Reversed values
+ else:
+ proj_suffix = '' # Default projection (v0,v1,v2)
+
+ # We join on index2 first, not using bloom indices.
+ # This defines the order that items are returned.
+ # index2 is sorts multiples of 3 first (see gen_values())
+ # and by using 'gt' and key 99, we'll skip multiples of 3,
+ # and examine primary keys 2,5,8,...,95,98,1,4,7,...,94,97.
+ jc = self.session.open_cursor('join:table:join01' + proj_suffix,
+ None, None)
+ c2 = self.session.open_cursor('index:join01:index2', None, None)
+ c2.set_key(99) # skips all entries w/ primary key divisible by three
+ self.assertEquals(0, c2.search())
+ self.session.join(jc, c2, 'compare=gt')
+
+ # Then select all the numbers 0-99 whose string representation
+ # sort >= '60'.
+ if self.ref == 'index':
+ c0 = self.session.open_cursor('index:join01:index0', None, None)
+ c0.set_key('60')
+ else:
+ c0 = self.session.open_cursor('table:join01', None, None)
+ c0.set_key(60)
+ self.assertEquals(0, c0.search())
+ self.session.join(jc, c0, 'compare=ge' + joincfg0)
+
+ # Then select all numbers whose reverse string representation
+ # is in '20' < x < '40'.
+ c1a = self.session.open_cursor('index:join01:index1', None, None)
+ c1a.set_key('21')
+ self.assertEquals(0, c1a.search())
+ self.session.join(jc, c1a, 'compare=gt' + joincfg1)
+
+ c1b = self.session.open_cursor('index:join01:index1', None, None)
+ c1b.set_key('41')
+ self.assertEquals(0, c1b.search())
+ self.session.join(jc, c1b, 'compare=lt' + joincfg1)
+
+ # Numbers that satisfy these 3 conditions (with ordering implied by c2):
+ # [73, 82, 62, 83, 92].
+ #
+ # After iterating, we should be able to reset and iterate again.
+ if do_stats:
+ self.stats(jc, 0)
+ self.iter_common(jc, do_proj)
+ if do_stats:
+ self.stats(jc, 1)
+ jc.reset()
+ self.iter_common(jc, do_proj)
+ if do_stats:
+ self.stats(jc, 2)
+ jc.reset()
+ self.iter_common(jc, do_proj)
+
+ jc.close()
+ c2.close()
+ c1a.close()
+ c1b.close()
+ c0.close()
+ self.session.drop('table:join01')
+
+ # Test joins with basic functionality
+ def test_join(self):
+ bloomcfg1000 = ',strategy=bloom,count=1000'
+ bloomcfg10000 = ',strategy=bloom,count=10000'
+ for cfga in [ '', bloomcfg1000, bloomcfg10000 ]:
+ for cfgb in [ '', bloomcfg1000, bloomcfg10000 ]:
+ for do_proj in [ False, True ]:
+ #self.tty('cfga=' + cfga +
+ # ', cfgb=' + cfgb +
+ # ', doproj=' + str(do_proj))
+ self.join_common(cfga, cfgb, do_proj, False)
+
+ def test_join_errors(self):
+ self.session.create('table:join01', 'key_format=r,value_format=SS'
+ ',columns=(k,v0,v1)')
+ self.session.create('table:join01B', 'key_format=r,value_format=SS'
+ ',columns=(k,v0,v1)')
+ self.session.create('index:join01:index0','columns=(v0)')
+ self.session.create('index:join01:index1','columns=(v1)')
+ self.session.create('index:join01B:index0','columns=(v0)')
+ jc = self.session.open_cursor('join:table:join01', None, None)
+ tc = self.session.open_cursor('table:join01', None, None)
+ fc = self.session.open_cursor('file:join01.wt', None, None)
+ ic0 = self.session.open_cursor('index:join01:index0', None, None)
+ ic0again = self.session.open_cursor('index:join01:index0', None, None)
+ ic1 = self.session.open_cursor('index:join01:index1', None, None)
+ icB = self.session.open_cursor('index:join01B:index0', None, None)
+ tcB = self.session.open_cursor('table:join01B', None, None)
+
+ tc.set_key(1)
+ tc.set_value('val1', 'val1')
+ tc.insert()
+ tcB.set_key(1)
+ tcB.set_value('val1', 'val1')
+ tcB.insert()
+ fc.next()
+
+ # Joining using a non join-cursor
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(tc, ic0, 'compare=ge'),
+ '/not a join cursor/')
+ # Joining a table cursor, not index
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, fc, 'compare=ge'),
+ '/not an index or table cursor/')
+ # Joining a non positioned cursor
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0, 'compare=ge'),
+ '/requires key be set/')
+
+ # minimally position the cursors now
+ ic0.next()
+ ic0again.next()
+ icB.next()
+
+ # Joining non matching index
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, icB, 'compare=ge'),
+ '/table for join cursor does not match/')
+
+ # The cursor must be positioned
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic1, 'compare=ge'),
+ '/requires key be set/')
+ ic1.next()
+
+ # The first cursor joined cannot be bloom
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic1,
+ 'compare=ge,strategy=bloom,count=1000'),
+ '/first joined cursor cannot specify strategy=bloom/')
+
+ # This succeeds.
+ self.session.join(jc, ic1, 'compare=ge'),
+
+ # With bloom filters, a count is required
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0, 'compare=ge,strategy=bloom'),
+ '/count must be nonzero/')
+
+ # This succeeds.
+ self.session.join(jc, ic0, 'compare=ge,strategy=bloom,count=1000'),
+
+ bloom_config = ',strategy=bloom,count=1000'
+ # Cannot use the same index cursor
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0,
+ 'compare=le' + bloom_config),
+ '/index cursor already used in a join/')
+
+ # When joining with the same index, need compatible compares
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0again, 'compare=ge' + bloom_config),
+ '/join has overlapping ranges/')
+
+ # Another incompatible compare
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0again, 'compare=gt' + bloom_config),
+ '/join has overlapping ranges/')
+
+ # Compare is compatible, but bloom args need to match
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0again, 'compare=le'),
+ '/join has incompatible strategy/')
+
+ # Counts need to match for bloom filters
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0again, 'compare=le,strategy=bloom,'
+ 'count=100'), '/count.* does not match previous count/')
+
+ # This succeeds
+ self.session.join(jc, ic0again, 'compare=le,strategy=bloom,count=1000')
+
+ # Need to do initial next() before getting key/values
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: jc.get_keys(),
+ '/join cursor must be advanced with next/')
+
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: jc.get_values(),
+ '/join cursor must be advanced with next/')
+
+ # Operations on the joined cursor are frozen until the join is closed.
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: ic0.next(),
+ '/index cursor is being used in a join/')
+
+ # Operations on the joined cursor are frozen until the join is closed.
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: ic0.prev(),
+ '/index cursor is being used in a join/')
+
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: ic0.reset(),
+ '/index cursor is being used in a join/')
+
+ # Only a small number of operations allowed on a join cursor
+ self.assertRaises(wiredtiger.WiredTigerError,
+ lambda: jc.search())
+
+ self.assertRaises(wiredtiger.WiredTigerError,
+ lambda: jc.prev())
+
+ self.assertEquals(jc.next(), 0)
+ self.assertEquals(jc.next(), wiredtiger.WT_NOTFOUND)
+
+ # Only after the join cursor is closed can we use the index cursor
+ # normally
+ jc.close()
+ self.assertEquals(ic0.next(), wiredtiger.WT_NOTFOUND)
+ self.assertEquals(ic0.prev(), 0)
+
+ # common code for making sure that cursors can be
+ # implicitly closed, no matter the order they are created
+ def cursor_close_common(self, joinfirst):
+ self.session.create('table:join01', 'key_format=r' +
+ ',value_format=SS,columns=(k,v0,v1)')
+ self.session.create('index:join01:index0','columns=(v0)')
+ self.session.create('index:join01:index1','columns=(v1)')
+ c = self.session.open_cursor('table:join01', None, None)
+ for i in range(0, self.nentries):
+ c.set_key(*self.gen_key(i))
+ c.set_value(*self.gen_values(i))
+ c.insert()
+ c.close()
+
+ if joinfirst:
+ jc = self.session.open_cursor('join:table:join01', None, None)
+ c0 = self.session.open_cursor('index:join01:index0', None, None)
+ c1 = self.session.open_cursor('index:join01:index1', None, None)
+ c0.next() # index cursors must be positioned
+ c1.next()
+ if not joinfirst:
+ jc = self.session.open_cursor('join:table:join01', None, None)
+ self.session.join(jc, c0, 'compare=ge')
+ self.session.join(jc, c1, 'compare=ge')
+ self.session.close()
+ self.session = None
+
+ def test_cursor_close1(self):
+ self.cursor_close_common(True)
+
+ def test_cursor_close2(self):
+ self.cursor_close_common(False)
+
+ def test_stats(self):
+ bloomcfg1000 = ',strategy=bloom,count=1000'
+ bloomcfg10 = ',strategy=bloom,count=10'
+ self.join_common(bloomcfg1000, bloomcfg1000, False, True)
+
+ # Intentially run with an underconfigured Bloom filter,
+ # statistics should pick up some false positives.
+ self.join_common(bloomcfg10, bloomcfg10, False, True)
+
+
+if __name__ == '__main__':
+ wttest.run()
diff --git a/test/suite/test_join02.py b/test/suite/test_join02.py
new file mode 100644
index 00000000000..bf376575103
--- /dev/null
+++ b/test/suite/test_join02.py
@@ -0,0 +1,290 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2015 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+import wiredtiger, wttest, suite_random
+from wtscenario import check_scenarios, multiply_scenarios, number_scenarios
+
+# test_join02.py
+# Join operations
+# Join several indices together, trying all comparison combinations
+class test_join02(wttest.WiredTigerTestCase):
+ table_name1 = 'test_join02'
+ nentries = 1000
+
+ keyscen = [
+ ('key-r', dict(keyformat='r')),
+ ('key-S', dict(keyformat='S')),
+ ('key-i', dict(keyformat='i')),
+ ('key-iS', dict(keyformat='iS'))
+ ]
+
+ bloomscen = [
+ ('bloom', dict(usebloom=True)),
+ ('nobloom', dict(usebloom=False))
+ ]
+
+ scenarios = number_scenarios(multiply_scenarios('.', keyscen, bloomscen))
+
+ # Start our range from 1, since WT record numbers start at 1,
+ # it makes things work out nicer.
+ def range(self):
+ return range(1, self.nentries + 1)
+
+ def gen_key(self, i):
+ if self.keyformat == 'S':
+ return [ 'key%06d' % i ] # zero pad so it sorts expectedly
+ elif self.keyformat == 'iS':
+ return [ i, 'key%06d' % i ]
+ else:
+ return [ i ]
+
+ def gen_values(self, i):
+ s = str(i)
+ x = 'x' * i
+ rs = s[::-1]
+ f = int(s[0:1])
+ return [i, s, x, rs, f]
+
+ def reinit_joinconfig(self):
+ self.rand = suite_random.suite_random(self.seed)
+ self.seed += 1
+
+ def get_joinconfig(self):
+ # When we're running the bloom scenario, make it so the
+ # bloom filters are often shared. Make the number of
+ # hashes and number of bits per item so they don't always
+ # match up; WT should allow it.
+ if self.usebloom:
+ c = 10000 if (self.rand.rand32() % 3) != 0 else 100000
+ k = 8 if (self.rand.rand32() % 10) != 0 else 10
+ b = 16 if (self.rand.rand32() % 11) != 0 else 12
+ return \
+ ',strategy=bloom,count=' + str(c) + \
+ ',bloom_bit_count=' + str(b) + \
+ ',bloom_hash_count=' + str(k)
+ else:
+ return ''
+
+ def do_join(self, jc, curleft, curright, choice, mbr):
+ c0 = choice[0]
+ if c0 == None:
+ return mbr
+ # The first join cannot use a bloom filter
+ if jc.first_join:
+ joinconfig = ''
+ jc.first_join = False
+ else:
+ joinconfig = self.get_joinconfig()
+ if c0 != None:
+ #self.tty('join(jc, ' + curleft.name + ' ' + c0 +
+ # ' ' + str(curleft.low) + ')')
+ curleft.reset()
+ curleft.set_key(*curleft.low)
+ self.assertEquals(0, curleft.search())
+ self.session.join(jc, curleft, 'compare=' + c0 + joinconfig)
+ if c0 == 'eq':
+ mbr = mbr.intersection(curleft.eqmembers)
+ elif c0 == 'ge':
+ mbr = mbr.intersection(
+ set(curleft.eqmembers.union(curleft.gtmembers)))
+ elif c0 == 'gt':
+ mbr = mbr.intersection(curleft.gtmembers)
+ c1 = choice[1] if len(choice) > 1 else None
+ if c1 != None:
+ #self.tty('join(jc, ' + curright.name + ' ' + c1 +
+ # ' ' + str(curright.high) + ')')
+ curright.reset()
+ curright.set_key(*curright.high)
+ self.assertEquals(0, curright.search())
+ self.session.join(jc, curright, 'compare=' + c1 + joinconfig)
+ if c1 == 'le':
+ mbr = mbr.intersection(
+ set(curright.eqmembers.union(curright.ltmembers)))
+ elif c1 == 'lt':
+ mbr = mbr.intersection(curright.ltmembers)
+ return mbr
+
+ def iterate(self, jc, mbr):
+ #self.tty('iteration expects ' + str(len(mbr)) +
+ # ' entries: ' + str(mbr))
+ while jc.next() == 0:
+ keys = jc.get_keys()
+ [v0,v1,v2,v3,v4] = jc.get_values()
+ k0 = keys[0]
+ k1 = keys[1] if len(keys) > 1 else None
+ if self.keyformat == 'S':
+ i = int(str(k0[3:]))
+ elif self.keyformat == 'iS':
+ i = k0
+ self.assertEquals(i, int(str(k1[3:])))
+ else:
+ i = k0
+ #self.tty(' iteration got key: ' + str(k0) + ',' + str(k1))
+ #self.tty(' iteration got values: ' + str([v0,v1,v2,v3,v4]))
+ #self.tty(' iteration expects values: ' + str(self.gen_values(i)))
+ self.assertEquals(self.gen_values(i), [v0,v1,v2,v3,v4])
+ if not i in mbr:
+ self.tty(' result ' + str(i) + ' is not in: ' + str(mbr))
+ self.assertTrue(i in mbr)
+ mbr.remove(i)
+ self.assertEquals(0, len(mbr))
+
+ def mkmbr(self, expr):
+ return frozenset([x for x in self.range() if expr(x)])
+
+ def test_basic_join(self):
+ self.seed = 1
+ if self.keyformat == 'iS':
+ keycols = 'k0,k1'
+ else:
+ keycols = 'k'
+ self.session.create('table:join02', 'key_format=' + self.keyformat +
+ ',value_format=iSuSi,columns=(' + keycols +
+ ',v0,v1,v2,v3,v4)')
+ self.session.create('index:join02:index0','columns=(v0)')
+ self.session.create('index:join02:index1','columns=(v1)')
+ self.session.create('index:join02:index2','columns=(v2)')
+ self.session.create('index:join02:index3','columns=(v3)')
+ self.session.create('index:join02:index4','columns=(v4)')
+ c = self.session.open_cursor('table:join02', None, None)
+ for i in self.range():
+ c.set_key(*self.gen_key(i))
+ c.set_value(*self.gen_values(i))
+ c.insert()
+ c.close()
+
+ # Use the primary table in one of the joins.
+ c0a = self.session.open_cursor('table:join02', None, None)
+ c0b = self.session.open_cursor('table:join02', None, None)
+ c1a = self.session.open_cursor('index:join02:index1', None, None)
+ c1b = self.session.open_cursor('index:join02:index1', None, None)
+ c2a = self.session.open_cursor('index:join02:index2', None, None)
+ c2b = self.session.open_cursor('index:join02:index2', None, None)
+ c3a = self.session.open_cursor('index:join02:index3', None, None)
+ c3b = self.session.open_cursor('index:join02:index3', None, None)
+ c4a = self.session.open_cursor('index:join02:index4', None, None)
+
+ # Attach extra properties to each cursor. For cursors that
+ # may appear on the 'left' side of a range CA < x < CB,
+ # we give a low value of the range, and calculate the members
+ # of the set we expect to see for a 'gt' comparison, as well
+ # as the 'eq' comparison. For cursors that appear on the
+ # 'right side of the range, we give a high value of the range,
+ # and calculate membership sets for 'lt' and 'eq'.
+ #
+ # We've defined the low/high values so that there's a lot of
+ # overlap between the values when we're doing ranges.
+ c0a.name = 'c0a'
+ c0b.name = 'c0b'
+ if self.keyformat == 'i' or self.keyformat == 'r':
+ c0a.low = [ 205 ]
+ c0b.high = [ 990 ]
+ elif self.keyformat == 'S':
+ c0a.low = [ 'key000205' ]
+ c0b.high = [ 'key000990' ]
+ elif self.keyformat == 'iS':
+ c0a.low = [ 205, 'key000205' ]
+ c0b.high = [ 990, 'key000990' ]
+ c0a.gtmembers = self.mkmbr(lambda x: x > 205)
+ c0a.eqmembers = self.mkmbr(lambda x: x == 205)
+ c0b.ltmembers = self.mkmbr(lambda x: x < 990)
+ c0b.eqmembers = self.mkmbr(lambda x: x == 990)
+
+ c1a.low = [ '150' ]
+ c1a.gtmembers = self.mkmbr(lambda x: str(x) > '150')
+ c1a.eqmembers = self.mkmbr(lambda x: str(x) == '150')
+ c1a.name = 'c1a'
+ c1b.high = [ '733' ]
+ c1b.ltmembers = self.mkmbr(lambda x: str(x) < '733')
+ c1b.eqmembers = self.mkmbr(lambda x: str(x) == '733')
+ c1b.name = 'c1b'
+
+ c2a.low = [ 'x' * 321 ]
+ c2a.gtmembers = self.mkmbr(lambda x: x > 321)
+ c2a.eqmembers = self.mkmbr(lambda x: x == 321)
+ c2a.name = 'c2a'
+ c2b.high = [ 'x' * 765 ]
+ c2b.ltmembers = self.mkmbr(lambda x: x < 765)
+ c2b.eqmembers = self.mkmbr(lambda x: x == 765)
+ c2b.name = 'c2b'
+
+ c3a.low = [ '432' ]
+ c3a.gtmembers = self.mkmbr(lambda x: str(x)[::-1] > '432')
+ c3a.eqmembers = self.mkmbr(lambda x: str(x)[::-1] == '432')
+ c3a.name = 'c3a'
+ c3b.high = [ '876' ]
+ c3b.ltmembers = self.mkmbr(lambda x: str(x)[::-1] < '876')
+ c3b.eqmembers = self.mkmbr(lambda x: str(x)[::-1] == '876')
+ c3b.name = 'c3b'
+
+ c4a.low = [ 4 ]
+ c4a.gtmembers = self.mkmbr(lambda x: str(x)[0:1] > '4')
+ c4a.eqmembers = self.mkmbr(lambda x: str(x)[0:1] == '4')
+ c4a.name = 'c4a'
+
+ choices = [[None], ['eq'], ['ge'], ['gt'], [None, 'le'], [None, 'lt'],
+ ['ge', 'le' ], ['ge', 'lt' ], ['gt', 'le' ], ['gt', 'lt' ]]
+ smallchoices = [[None], ['eq'], ['ge'], ['gt', 'le' ]]
+ for i0 in smallchoices:
+ for i1 in choices:
+ for i2 in smallchoices:
+ for i3 in smallchoices:
+ for i4 in [[None], ['eq'], ['ge'], ['gt']]:
+ if i0[0] == None and i1[0] == None and \
+ i2[0] == None and i3[0] == None and \
+ i4[0] == None:
+ continue
+ self.reinit_joinconfig()
+ #self.tty('Begin test: ' +
+ # ','.join([str(i0),str(i1),str(i2),
+ # str(i3),str(i4)]))
+ jc = self.session.open_cursor('join:table:join02',
+ None, None)
+ jc.first_join = True
+ mbr = set(self.range())
+
+ # It shouldn't matter the order of the joins
+ mbr = self.do_join(jc, c3a, c3b, i3, mbr)
+ mbr = self.do_join(jc, c2a, c2b, i2, mbr)
+ mbr = self.do_join(jc, c4a, None, i4, mbr)
+ mbr = self.do_join(jc, c1a, c1b, i1, mbr)
+ mbr = self.do_join(jc, c0a, c0b, i0, mbr)
+ self.iterate(jc, mbr)
+ jc.close()
+ c0a.close()
+ c0b.close()
+ c1a.close()
+ c1b.close()
+ c2a.close()
+ c2b.close()
+ c3a.close()
+ c3b.close()
+ c4a.close()
+
+if __name__ == '__main__':
+ wttest.run()
diff --git a/test/suite/test_join03.py b/test/suite/test_join03.py
new file mode 100644
index 00000000000..552e3b41748
--- /dev/null
+++ b/test/suite/test_join03.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2015 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+import os
+import wiredtiger, wttest, run
+from wtscenario import check_scenarios, multiply_scenarios, number_scenarios
+
+# test_join03.py
+# Join operations
+# Joins with a custom extractor
+class test_join03(wttest.WiredTigerTestCase):
+ table_name1 = 'test_join03'
+ nentries = 100
+
+ # Return the wiredtiger_open extension argument for a shared library.
+ def extensionArg(self, exts):
+ extfiles = []
+ for ext in exts:
+ (dirname, name, libname) = ext
+ if name != None and name != 'none':
+ testdir = os.path.dirname(__file__)
+ extdir = os.path.join(run.wt_builddir, 'ext', dirname)
+ extfile = os.path.join(
+ extdir, name, '.libs', 'libwiredtiger_' + libname + '.so')
+ if not os.path.exists(extfile):
+ self.skipTest('extension "' + extfile + '" not built')
+ if not extfile in extfiles:
+ extfiles.append(extfile)
+ if len(extfiles) == 0:
+ return ''
+ else:
+ return ',extensions=["' + '","'.join(extfiles) + '"]'
+
+ # Override WiredTigerTestCase, we have extensions.
+ def setUpConnectionOpen(self, dir):
+ extarg = self.extensionArg([('extractors', 'csv', 'csv_extractor')])
+ connarg = 'create,error_prefix="{0}: ",{1}'.format(
+ self.shortid(), extarg)
+ conn = wiredtiger.wiredtiger_open(dir, connarg)
+ self.pr(`conn`)
+ return conn
+
+ def gen_key(self, i):
+ return [ i + 1 ]
+
+ def gen_values(self, i):
+ s = str(i)
+ rs = s[::-1].lstrip('0')
+ return [ s + ',' + rs ]
+
+ # Common function for testing iteration of join cursors
+ def iter_common(self, jc):
+ mbr = set([62, 63, 72, 73, 82, 83, 92, 93])
+ while jc.next() == 0:
+ [k] = jc.get_keys()
+ i = k - 1
+ [v] = jc.get_values()
+ self.assertEquals(self.gen_values(i), [v])
+ if not i in mbr:
+ self.tty(' result ' + str(i) + ' is not in: ' + str(mbr))
+ self.assertTrue(i in mbr)
+ mbr.remove(i)
+ self.assertEquals(0, len(mbr))
+
+ # Common function for testing the most basic functionality
+ # of joins
+ def join(self, csvformat, args0, args1):
+ self.session.create('table:join03', 'key_format=r' +
+ ',value_format=S,columns=(k,v)')
+ fmt = csvformat[0]
+ self.session.create('index:join03:index0','key_format=' + fmt + ',' +
+ 'extractor=csv,app_metadata={"format" : "' +
+ fmt + '","field" : "0"}')
+ fmt = csvformat[1]
+ self.session.create('index:join03:index1','key_format=' + fmt + ',' +
+ 'extractor=csv,app_metadata={"format" : "' +
+ fmt + '","field" : "1"}')
+
+ c = self.session.open_cursor('table:join03', None, None)
+ for i in range(0, self.nentries):
+ c.set_key(*self.gen_key(i))
+ c.set_value(*self.gen_values(i))
+ c.insert()
+ c.close()
+
+ jc = self.session.open_cursor('join:table:join03', None, None)
+
+ # All the numbers 0-99 whose string representation
+ # sort >= '60' and whose reverse string representation
+ # is in '20' < x < '40'. That is: [62, 63, 72, 73, 82, 83, 92, 93]
+ c0 = self.session.open_cursor('index:join03:index0', None, None)
+ if csvformat[0] == 'S':
+ c0.set_key('60')
+ else:
+ c0.set_key(60)
+ self.assertEquals(0, c0.search())
+ self.session.join(jc, c0, 'compare=ge' + args0)
+
+ c1a = self.session.open_cursor('index:join03:index1', None, None)
+ if csvformat[1] == 'S':
+ c1a.set_key('21')
+ else:
+ c1a.set_key(21)
+ self.assertEquals(0, c1a.search())
+ self.session.join(jc, c1a, 'compare=gt' + args1)
+
+ c1b = self.session.open_cursor('index:join03:index1', None, None)
+ if csvformat[1] == 'S':
+ c1b.set_key('41')
+ else:
+ c1b.set_key(41)
+ self.assertEquals(0, c1b.search())
+ self.session.join(jc, c1b, 'compare=lt' + args1)
+
+ # Iterate, and make sure that reset allows us to iterate again.
+ self.iter_common(jc)
+
+ jc.close()
+ c1a.close()
+ c1b.close()
+ c0.close()
+ self.session.drop('table:join03')
+
+ # Test joins using CSV fields that are interpreted as different types
+ # to make sure all the extractor plumbing used in joins is working.
+ def test_join(self):
+ for extraargs in [ '', ',strategy=bloom,count=1000' ]:
+ for csvformat in [ 'SS', 'ii', 'Si', 'iS' ]:
+ self.join(csvformat, '', extraargs)
+
+
+if __name__ == '__main__':
+ wttest.run()
diff --git a/test/suite/test_schema05.py b/test/suite/test_schema05.py
index 2a7bc042c80..b0562f7983c 100644
--- a/test/suite/test_schema05.py
+++ b/test/suite/test_schema05.py
@@ -89,9 +89,10 @@ class test_schema05(wttest.WiredTigerTestCase):
# Create self.nindices index files, each with a column from the CSV
for i in range(0, self.nindices):
si = str(i)
- self.session.create("index:schema05:x" + si,
- "key_format=S,columns=(key),"
- "extractor=csv,app_metadata=" + si)
+ self.session.create('index:schema05:x' + si,
+ 'key_format=S,columns=(key),'
+ 'extractor=csv,app_metadata={"format" : "S",' +
+ '"field" : "' + si + '"}')
def drop_indices(self):
for i in range(0, self.nindices):