summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Gorrod <alexg@wiredtiger.com>2015-11-19 00:56:54 +0000
committerAlex Gorrod <alexg@wiredtiger.com>2015-11-19 00:56:54 +0000
commit76d4b2f41e816cd496d577c6909ef0ac7101bfbf (patch)
tree416923934062b85a86c56a0a8ff107b9ddb3f370
parentb78e5b6d3581eed2f28c120992d0f66a14802ebe (diff)
parent4368d3975e0baa53508269f3fb2d712ecab7a584 (diff)
downloadmongo-76d4b2f41e816cd496d577c6909ef0ac7101bfbf.tar.gz
Merge branch 'develop' into wt-2222-snapshot-stats
-rw-r--r--build_win/filelist.win1
-rw-r--r--dist/api_data.py27
-rw-r--r--dist/filelist1
-rw-r--r--dist/s_define.list1
-rw-r--r--dist/s_funcs.list2
-rw-r--r--dist/s_string.ok12
-rw-r--r--dist/stat.py44
-rw-r--r--dist/stat_data.py20
-rw-r--r--examples/c/ex_schema.c39
-rw-r--r--examples/java/com/wiredtiger/examples/ex_schema.java43
-rw-r--r--ext/extractors/csv/csv_extractor.c45
-rw-r--r--lang/java/java_doc.i1
-rw-r--r--src/bloom/bloom.c41
-rw-r--r--src/btree/bt_cursor.c1
-rw-r--r--src/btree/bt_delete.c2
-rw-r--r--src/btree/bt_split.c44
-rw-r--r--src/config/config_def.c18
-rw-r--r--src/conn/conn_stat.c17
-rw-r--r--src/cursor/cur_dump.c2
-rw-r--r--src/cursor/cur_index.c54
-rw-r--r--src/cursor/cur_join.c1049
-rw-r--r--src/cursor/cur_stat.c146
-rw-r--r--src/cursor/cur_table.c55
-rw-r--r--src/docs/cursor-join.dox19
-rw-r--r--src/docs/cursors.dox1
-rw-r--r--src/docs/data-sources.dox4
-rw-r--r--src/docs/programming.dox1
-rw-r--r--src/docs/spell.ok3
-rw-r--r--src/include/api.h16
-rw-r--r--src/include/config.h45
-rw-r--r--src/include/cursor.h74
-rw-r--r--src/include/cursor.i64
-rw-r--r--src/include/extern.h22
-rw-r--r--src/include/stat.h15
-rw-r--r--src/include/wiredtiger.in309
-rw-r--r--src/include/wt_internal.h12
-rw-r--r--src/lsm/lsm_stat.c6
-rw-r--r--src/packing/pack_impl.c105
-rw-r--r--src/reconcile/rec_write.c1
-rw-r--r--src/schema/schema_stat.c8
-rw-r--r--src/schema/schema_truncate.c3
-rw-r--r--src/session/session_api.c165
-rw-r--r--src/support/stat.c79
-rw-r--r--test/suite/test_join01.py396
-rw-r--r--test/suite/test_join02.py290
-rw-r--r--test/suite/test_join03.py158
-rw-r--r--test/suite/test_schema05.py7
47 files changed, 3156 insertions, 312 deletions
diff --git a/build_win/filelist.win b/build_win/filelist.win
index 9d0ee10d305..af6ddf98da9 100644
--- a/build_win/filelist.win
+++ b/build_win/filelist.win
@@ -72,6 +72,7 @@ src/cursor/cur_ds.c
src/cursor/cur_dump.c
src/cursor/cur_file.c
src/cursor/cur_index.c
+src/cursor/cur_join.c
src/cursor/cur_json.c
src/cursor/cur_log.c
src/cursor/cur_metadata.c
diff --git a/dist/api_data.py b/dist/api_data.py
index a86647a27f2..f58a48b4a0b 100644
--- a/dist/api_data.py
+++ b/dist/api_data.py
@@ -772,6 +772,33 @@ methods = {
type='boolean'),
]),
+'WT_SESSION.join' : Method([
+ Config('compare', '"eq"', r'''
+ modifies the set of items to be returned so that the index key
+ satisfies the given comparison relative to the key set in this
+ cursor''',
+ choices=['eq', 'ge', 'gt', 'le', 'lt']),
+ Config('count', '', r'''
+ set an approximate count of the elements that would be included in
+ the join. This is used in sizing the bloom filter, and also influences
+ evaluation order for cursors in the join. When the count is equal
+ for multiple bloom filters in a composition of joins, the bloom
+ filter may be shared''',
+ type='int'),
+ Config('bloom_bit_count', '16', r'''
+ the number of bits used per item for the bloom filter''',
+ min='2', max='1000'),
+ Config('bloom_hash_count', '8', r'''
+ the number of hash values per item for the bloom filter''',
+ min='2', max='100'),
+ Config('strategy', '', r'''
+ when set to bloom, a bloom filter is created and populated for
+ this index. This has an up front cost but may reduce the number
+ of accesses to the main table when iterating the joined cursor.
+ The bloom setting requires that count be set''',
+ choices=['bloom', 'default']),
+]),
+
'WT_SESSION.log_flush' : Method([
Config('sync', 'on', r'''
forcibly flush the log and wait for it to achieve the synchronization
diff --git a/dist/filelist b/dist/filelist
index f33f0e9a962..52af87c2a68 100644
--- a/dist/filelist
+++ b/dist/filelist
@@ -72,6 +72,7 @@ src/cursor/cur_ds.c
src/cursor/cur_dump.c
src/cursor/cur_file.c
src/cursor/cur_index.c
+src/cursor/cur_join.c
src/cursor/cur_json.c
src/cursor/cur_log.c
src/cursor/cur_metadata.c
diff --git a/dist/s_define.list b/dist/s_define.list
index a2b86610755..8b0d9a0bdcd 100644
--- a/dist/s_define.list
+++ b/dist/s_define.list
@@ -4,6 +4,7 @@ API_CALL
API_CALL_NOCONF
API_SESSION_INIT
FLD_MASK
+JOINABLE_CURSOR_CALL_CHECK
LF_MASK
LLONG_MAX
LLONG_MIN
diff --git a/dist/s_funcs.list b/dist/s_funcs.list
index 3b5690a4bc2..ed6cf43bb2f 100644
--- a/dist/s_funcs.list
+++ b/dist/s_funcs.list
@@ -27,6 +27,8 @@ __wt_log_scan
__wt_nlpo2
__wt_nlpo2_round
__wt_print_huffman_code
+__wt_stat_join_aggregate
+__wt_stat_join_clear_all
__wt_try_readlock
wiredtiger_config_parser_open
wiredtiger_config_validate
diff --git a/dist/s_string.ok b/dist/s_string.ok
index 21cd360c144..7de139f6a40 100644
--- a/dist/s_string.ok
+++ b/dist/s_string.ok
@@ -221,6 +221,7 @@ OUTBUFF
OVFL
ObWgfvgw
Obama
+Outfmt
PARAM
POSIX
PREDEFINE
@@ -351,6 +352,7 @@ allocfile
allocsize
amd
ao
+ap
api
arg
argc
@@ -421,6 +423,7 @@ checksums
chk
chongo
cip
+cjoin
ckpt
ckptfrag
ckptlist
@@ -464,6 +467,7 @@ curdump
curextract
curfile
curindex
+curjoin
curlog
curmetadata
cursoring
@@ -510,6 +514,7 @@ dhandles
difftime
dir
dirlist
+disjunction
dlclose
dlh
dll
@@ -541,6 +546,7 @@ enqueue
enqueued
env
eof
+eq
equalp
errhandler
errno
@@ -593,6 +599,7 @@ ftruncate
func
gcc
gdb
+ge
getenv
getline
getone
@@ -608,6 +615,7 @@ goesc
gostring
gostruct
goutf
+gt
hashval
havesize
hdr
@@ -633,6 +641,7 @@ indirects
indx
infeasible
inflateInit
+infmt
init
initn
initsize
@@ -651,6 +660,7 @@ io
ip
islocked
ispo
+iter
iteratively
jnr
jrx
@@ -669,6 +679,7 @@ latencies
lbrace
lbracket
ld
+le
len
lenp
level's
@@ -715,6 +726,7 @@ mem
memalign
membar
memcpy
+memget
memmove
memset
memsize
diff --git a/dist/stat.py b/dist/stat.py
index c9684665a53..d62fda3fcb9 100644
--- a/dist/stat.py
+++ b/dist/stat.py
@@ -5,7 +5,7 @@ import re, string, sys, textwrap
from dist import compare_srcfile
# Read the source files.
-from stat_data import groups, dsrc_stats, connection_stats
+from stat_data import groups, dsrc_stats, connection_stats, join_stats
def print_struct(title, name, base, stats):
'''Print the structures for the stat.h file.'''
@@ -35,9 +35,17 @@ for line in open('../src/include/stat.h', 'r'):
print_struct(
'connections', 'connection', 1000, connection_stats)
print_struct('data sources', 'dsrc', 2000, dsrc_stats)
+ print_struct('join cursors', 'join', 3000, join_stats)
f.close()
compare_srcfile(tmp_file, '../src/include/stat.h')
+def print_defines_one(capname, base, stats):
+ for v, l in enumerate(stats, base):
+ f.write('/*! %s */\n' % '\n * '.join(textwrap.wrap(l.desc, 70)))
+ f.write('#define\tWT_STAT_' + capname + '_' + l.name.upper() + "\t" *
+ max(1, 6 - int((len('WT_STAT_' + capname + '_' + l.name)) / 8)) +
+ str(v) + '\n')
+
def print_defines():
'''Print the #defines for the wiredtiger.in file.'''
f.write('''
@@ -51,11 +59,7 @@ def print_defines():
* @{
*/
''')
- for v, l in enumerate(connection_stats, 1000):
- f.write('/*! %s */\n' % '\n * '.join(textwrap.wrap(l.desc, 70)))
- f.write('#define\tWT_STAT_CONN_' + l.name.upper() + "\t" *
- max(1, 6 - int((len('WT_STAT_CONN_' + l.name)) / 8)) +
- str(v) + '\n')
+ print_defines_one('CONN', 1000, connection_stats)
f.write('''
/*!
* @}
@@ -64,11 +68,16 @@ def print_defines():
* @{
*/
''')
- for v, l in enumerate(dsrc_stats, 2000):
- f.write('/*! %s */\n' % '\n * '.join(textwrap.wrap(l.desc, 70)))
- f.write('#define\tWT_STAT_DSRC_' + l.name.upper() + "\t" *
- max(1, 6 - int((len('WT_STAT_DSRC_' + l.name)) / 8)) +
- str(v) + '\n')
+ print_defines_one('DSRC', 2000, dsrc_stats)
+ f.write('''
+/*!
+ * @}
+ * @name Statistics for join cursors
+ * @anchor statistics_join
+ * @{
+ */
+''')
+ print_defines_one('JOIN', 3000, join_stats)
f.write('/*! @} */\n')
# Update the #defines in the wiredtiger.in file.
@@ -98,10 +107,12 @@ def print_func(name, handle, list):
f.write('};\n')
f.write('''
-const char *
-__wt_stat_''' + name + '''_desc(int slot)
+int
+__wt_stat_''' + name + '''_desc(WT_CURSOR_STAT *cst, int slot, const char **p)
{
-\treturn (__stats_''' + name + '''_desc[slot]);
+\tWT_UNUSED(cst);
+\t*p = __stats_''' + name + '''_desc[slot];
+\treturn (0);
}
''')
@@ -113,7 +124,8 @@ __wt_stat_''' + name + '_init_single(WT_' + name.upper() + '''_STATS *stats)
}
''')
- f.write('''
+ if handle != None:
+ f.write('''
void
__wt_stat_''' + name + '_init(' + handle + ''' *handle)
{
@@ -205,6 +217,7 @@ f.write('#include "wt_internal.h"\n')
print_func('dsrc', 'WT_DATA_HANDLE', dsrc_stats)
print_func('connection', 'WT_CONNECTION_IMPL', connection_stats)
+print_func('join', None, join_stats)
f.close()
compare_srcfile(tmp_file, '../src/support/stat.c')
@@ -224,6 +237,7 @@ for l in sorted(dsrc_stats):
scale_info += ' \'' + l.desc + '\',\n'
if 'no_clear' in l.flags:
clear_info += ' \'' + l.desc + '\',\n'
+# No join statistics can be captured in wtstats
scale_info += ']\n'
clear_info += ']\n'
prefix_info = 'prefix_list = [\n'
diff --git a/dist/stat_data.py b/dist/stat_data.py
index 11327cc2c34..8185736ad2b 100644
--- a/dist/stat_data.py
+++ b/dist/stat_data.py
@@ -67,6 +67,10 @@ class DhandleStat(Stat):
prefix = 'data-handle'
def __init__(self, name, desc, flags=''):
Stat.__init__(self, name, DhandleStat.prefix, desc, flags)
+class JoinStat(Stat):
+ prefix = '' # prefix is inserted dynamically
+ def __init__(self, name, desc, flags=''):
+ Stat.__init__(self, name, JoinStat.prefix, desc, flags)
class LogStat(Stat):
prefix = 'log'
def __init__(self, name, desc, flags=''):
@@ -280,6 +284,8 @@ connection_stats = [
# Reconciliation statistics
##########################################
RecStat('rec_pages', 'page reconciliation calls'),
+ RecStat('rec_page_delete', 'pages deleted'),
+ RecStat('rec_page_delete_fast', 'fast-path pages deleted'),
RecStat('rec_pages_eviction', 'page reconciliation calls for eviction'),
RecStat('rec_split_stashed_bytes',
'split bytes currently awaiting free', 'no_clear,no_scale'),
@@ -358,6 +364,7 @@ connection_stats = [
CursorStat('cursor_restart', 'cursor restarted searches'),
CursorStat('cursor_search', 'cursor search calls'),
CursorStat('cursor_search_near', 'cursor search near calls'),
+ CursorStat('cursor_truncate', 'truncate calls'),
CursorStat('cursor_update', 'cursor update calls'),
##########################################
@@ -399,6 +406,7 @@ dsrc_stats = [
CursorStat('cursor_restart', 'restarted searches'),
CursorStat('cursor_search', 'search calls'),
CursorStat('cursor_search_near', 'search near calls'),
+ CursorStat('cursor_truncate', 'truncate calls'),
CursorStat('cursor_update', 'update calls'),
CursorStat('cursor_update_bytes', 'cursor-update value bytes updated'),
@@ -529,6 +537,7 @@ dsrc_stats = [
RecStat('rec_overflow_key_leaf', 'leaf-page overflow keys'),
RecStat('rec_overflow_value', 'overflow values written'),
RecStat('rec_page_delete', 'pages deleted'),
+ RecStat('rec_page_delete_fast', 'fast-path pages deleted'),
RecStat('rec_page_match', 'page checksum matches'),
RecStat('rec_pages', 'page reconciliation calls'),
RecStat('rec_pages_eviction', 'page reconciliation calls for eviction'),
@@ -544,3 +553,14 @@ dsrc_stats = [
]
dsrc_stats = sorted(dsrc_stats, key=attrgetter('name'))
+
+##########################################
+# Cursor Join statistics
+##########################################
+join_stats = [
+ JoinStat('accesses', 'accesses'),
+ JoinStat('actual_count', 'actual count of items'),
+ JoinStat('bloom_false_positive', 'bloom filter false positives'),
+]
+
+join_stats = sorted(join_stats, key=attrgetter('name'))
diff --git a/examples/c/ex_schema.c b/examples/c/ex_schema.c
index 8b74500acd3..dabb568129e 100644
--- a/examples/c/ex_schema.c
+++ b/examples/c/ex_schema.c
@@ -49,12 +49,16 @@ typedef struct {
static POP_RECORD pop_data[] = {
{ "AU", 1900, 4000000 },
+ { "AU", 1950, 8267337 },
{ "AU", 2000, 19053186 },
{ "CAN", 1900, 5500000 },
+ { "CAN", 1950, 14011422 },
{ "CAN", 2000, 31099561 },
{ "UK", 1900, 369000000 },
+ { "UK", 1950, 50127000 },
{ "UK", 2000, 59522468 },
{ "USA", 1900, 76212168 },
+ { "USA", 1950, 150697361 },
{ "USA", 2000, 301279593 },
{ "", 0, 0 }
};
@@ -65,7 +69,7 @@ main(void)
{
POP_RECORD *p;
WT_CONNECTION *conn;
- WT_CURSOR *cursor;
+ WT_CURSOR *cursor, *cursor2, *join_cursor;
WT_SESSION *session;
const char *country;
uint64_t recno, population;
@@ -321,6 +325,39 @@ main(void)
/*! [Access only the index] */
ret = cursor->close(cursor);
+ /*! [Join cursors] */
+ /* Open cursors needed by the join. */
+ ret = session->open_cursor(session,
+ "join:table:poptable", NULL, NULL, &join_cursor);
+ ret = session->open_cursor(session,
+ "index:poptable:country", NULL, NULL, &cursor);
+ ret = session->open_cursor(session,
+ "index:poptable:immutable_year", NULL, NULL, &cursor2);
+
+ /* select values WHERE country == "AU" AND year > 1900 */
+ cursor->set_key(cursor, "AU\0\0\0");
+ ret = cursor->search(cursor);
+ ret = session->join(session, join_cursor, cursor,
+ "compare=eq,count=10");
+ cursor2->set_key(cursor2, (uint16_t)1900);
+ ret = cursor2->search(cursor2);
+ ret = session->join(session, join_cursor, cursor2,
+ "compare=gt,count=10,strategy=bloom");
+
+ /* List the values that are joined */
+ while ((ret = join_cursor->next(join_cursor)) == 0) {
+ ret = join_cursor->get_key(join_cursor, &recno);
+ ret = join_cursor->get_value(join_cursor, &country, &year,
+ &population);
+ printf("ID %" PRIu64, recno);
+ printf(": country %s, year %u, population %" PRIu64 "\n",
+ country, year, population);
+ }
+ /*! [Join cursors] */
+ ret = join_cursor->close(join_cursor);
+ ret = cursor2->close(cursor2);
+ ret = cursor->close(cursor);
+
ret = conn->close(conn, NULL);
return (ret);
diff --git a/examples/java/com/wiredtiger/examples/ex_schema.java b/examples/java/com/wiredtiger/examples/ex_schema.java
index 5b849ecf430..ba15db62a14 100644
--- a/examples/java/com/wiredtiger/examples/ex_schema.java
+++ b/examples/java/com/wiredtiger/examples/ex_schema.java
@@ -57,12 +57,16 @@ public class ex_schema {
popData = new ArrayList<PopRecord>();
popData.add(new PopRecord("AU", (short)1900, 4000000 ));
+ popData.add(new PopRecord("AU", (short)1950, 8267337 ));
popData.add(new PopRecord("AU", (short)2000, 19053186 ));
popData.add(new PopRecord("CAN", (short)1900, 5500000 ));
+ popData.add(new PopRecord("CAN", (short)1950, 14011422 ));
popData.add(new PopRecord("CAN", (short)2000, 31099561 ));
popData.add(new PopRecord("UK", (short)1900, 369000000 ));
+ popData.add(new PopRecord("UK", (short)1950, 50127000 ));
popData.add(new PopRecord("UK", (short)2000, 59522468 ));
popData.add(new PopRecord("USA", (short)1900, 76212168 ));
+ popData.add(new PopRecord("USA", (short)1950, 150697361 ));
popData.add(new PopRecord("USA", (short)2000, 301279593 ));
};
/*! [schema declaration] */
@@ -72,7 +76,7 @@ public class ex_schema {
throws WiredTigerException
{
Connection conn;
- Cursor cursor;
+ Cursor cursor, cursor2, join_cursor;
Session session;
String country;
long recno, population;
@@ -206,7 +210,7 @@ public class ex_schema {
* for a particular country.
*/
cursor = session.open_cursor("colgroup:poptable:main", null, null);
- cursor.putKeyLong(2);
+ cursor.putKeyRecord(2);
if ((ret = cursor.search()) == 0) {
country = cursor.getValueString();
year = cursor.getValueShort();
@@ -223,7 +227,7 @@ public class ex_schema {
* population of a particular country.
*/
cursor = session.open_cursor("colgroup:poptable:population", null, null);
- cursor.putKeyLong(2);
+ cursor.putKeyRecord(2);
if ((ret = cursor.search()) == 0) {
population = cursor.getValueLong();
System.out.println("ID 2: population " + population);
@@ -335,6 +339,39 @@ public class ex_schema {
/*! [Access only the index] */
ret = cursor.close();
+ /*! [Join cursors] */
+ /* Open cursors needed by the join. */
+ join_cursor = session.open_cursor(
+ "join:table:poptable", null, null);
+ cursor = session.open_cursor(
+ "index:poptable:country", null, null);
+ cursor2 = session.open_cursor(
+ "index:poptable:immutable_year", null, null);
+
+ /* select values WHERE country == "AU" AND year > 1900 */
+ cursor.putKeyString("AU");
+ ret = cursor.search();
+ session.join(join_cursor, cursor, "compare=eq,count=10");
+ cursor2.putKeyShort((short)1900);
+ ret = cursor2.search();
+ session.join(join_cursor, cursor2,
+ "compare=gt,count=10,strategy=bloom");
+
+ /* List the values that are joined */
+ while ((ret = join_cursor.next()) == 0) {
+ recno = join_cursor.getKeyRecord();
+ country = join_cursor.getValueString();
+ year = join_cursor.getValueShort();
+ population = join_cursor.getValueLong();
+ System.out.print("ID " + recno);
+ System.out.println( ": country " + country + ", year " + year +
+ ", population " + population);
+ }
+ /*! [Join cursors] */
+ ret = join_cursor.close();
+ ret = cursor2.close();
+ ret = cursor.close();
+
ret = conn.close(null);
return (ret);
diff --git a/ext/extractors/csv/csv_extractor.c b/ext/extractors/csv/csv_extractor.c
index 34b8d7c7c64..8d50cc7ec5d 100644
--- a/ext/extractors/csv/csv_extractor.c
+++ b/ext/extractors/csv/csv_extractor.c
@@ -49,7 +49,8 @@
typedef struct {
WT_EXTRACTOR extractor; /* Must come first */
WT_EXTENSION_API *wt_api; /* Extension API */
- int field_num; /* Field to extract */
+ int field; /* Field to extract */
+ int format_isnum; /* Field contents are numeric */
} CSV_EXTRACTOR;
/*
@@ -61,15 +62,15 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session,
const WT_ITEM *key, const WT_ITEM *value, WT_CURSOR *result_cursor)
{
char *copy, *p, *pend, *valstr;
- const CSV_EXTRACTOR *cvs_extractor;
- int i, ret;
+ const CSV_EXTRACTOR *csv_extractor;
+ int i, ret, val;
size_t len;
WT_EXTENSION_API *wtapi;
(void)key; /* Unused parameters */
- cvs_extractor = (const CSV_EXTRACTOR *)extractor;
- wtapi = cvs_extractor->wt_api;
+ csv_extractor = (const CSV_EXTRACTOR *)extractor;
+ wtapi = csv_extractor->wt_api;
/* Unpack the value. */
if ((ret = wtapi->struct_unpack(wtapi,
@@ -78,11 +79,11 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session,
p = valstr;
pend = strchr(p, ',');
- for (i = 0; i < cvs_extractor->field_num && pend != NULL; i++) {
+ for (i = 0; i < csv_extractor->field && pend != NULL; i++) {
p = pend + 1;
pend = strchr(p, ',');
}
- if (i == cvs_extractor->field_num) {
+ if (i == csv_extractor->field) {
if (pend == NULL)
pend = p + strlen(p);
/*
@@ -95,7 +96,12 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session,
return (errno);
strncpy(copy, p, len);
copy[len] = '\0';
- result_cursor->set_key(result_cursor, copy);
+ if (csv_extractor->format_isnum) {
+ if ((val = atoi(copy)) < 0)
+ return (EINVAL);
+ result_cursor->set_key(result_cursor, val);
+ } else
+ result_cursor->set_key(result_cursor, copy);
ret = result_cursor->insert(result_cursor);
free(copy);
if (ret != 0)
@@ -107,7 +113,7 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session,
/*
* csv_customize --
* The customize function creates a customized extractor,
- * needed to save the field number.
+ * needed to save the field number and format.
*/
static int
csv_customize(WT_EXTRACTOR *extractor, WT_SESSION *session,
@@ -115,20 +121,37 @@ csv_customize(WT_EXTRACTOR *extractor, WT_SESSION *session,
{
const CSV_EXTRACTOR *orig;
CSV_EXTRACTOR *csv_extractor;
+ WT_CONFIG_ITEM field, format;
+ WT_CONFIG_PARSER *parser;
+ WT_EXTENSION_API *wtapi;
+ int ret;
long field_num;
(void)session; /* Unused parameters */
(void)uri; /* Unused parameters */
orig = (const CSV_EXTRACTOR *)extractor;
- field_num = strtol(appcfg->str, NULL, 10);
+ wtapi = orig->wt_api;
+ if ((ret = wtapi->config_parser_open(wtapi, session, appcfg->str,
+ appcfg->len, &parser)) != 0)
+ return (ret);
+ if ((ret = parser->get(parser, "field", &field)) != 0 ||
+ (ret = parser->get(parser, "format", &format)) != 0) {
+ if (ret == WT_NOTFOUND)
+ return (EINVAL);
+ return (ret);
+ }
+ field_num = strtol(field.str, NULL, 10);
if (field_num < 0 || field_num > INT_MAX)
return (EINVAL);
+ if (format.len != 1 || (format.str[0] != 'S' && format.str[0] != 'i'))
+ return (EINVAL);
if ((csv_extractor = calloc(1, sizeof(CSV_EXTRACTOR))) == NULL)
return (errno);
*csv_extractor = *orig;
- csv_extractor->field_num = (int)field_num;
+ csv_extractor->field = field_num;
+ csv_extractor->format_isnum = (format.str[0] == 'i');
*customp = (WT_EXTRACTOR *)csv_extractor;
return (0);
}
diff --git a/lang/java/java_doc.i b/lang/java/java_doc.i
index 75c14dbfe8f..17317ab875b 100644
--- a/lang/java/java_doc.i
+++ b/lang/java/java_doc.i
@@ -33,6 +33,7 @@ COPYDOC(__wt_session, WT_SESSION, open_cursor)
COPYDOC(__wt_session, WT_SESSION, create)
COPYDOC(__wt_session, WT_SESSION, compact)
COPYDOC(__wt_session, WT_SESSION, drop)
+COPYDOC(__wt_session, WT_SESSION, join)
COPYDOC(__wt_session, WT_SESSION, log_flush)
COPYDOC(__wt_session, WT_SESSION, log_printf)
COPYDOC(__wt_session, WT_SESSION, rename)
diff --git a/src/bloom/bloom.c b/src/bloom/bloom.c
index 9225b9fe3b5..e3a21f25dc1 100644
--- a/src/bloom/bloom.c
+++ b/src/bloom/bloom.c
@@ -314,6 +314,47 @@ __wt_bloom_get(WT_BLOOM *bloom, WT_ITEM *key)
}
/*
+ * __wt_bloom_inmem_get --
+ * Tests whether the given key is in the Bloom filter.
+ * This can be used in place of __wt_bloom_get
+ * for Bloom filters that are memory only.
+ */
+int
+__wt_bloom_inmem_get(WT_BLOOM *bloom, WT_ITEM *key)
+{
+ uint64_t h1, h2;
+ uint32_t i;
+
+ h1 = __wt_hash_fnv64(key->data, key->size);
+ h2 = __wt_hash_city64(key->data, key->size);
+ for (i = 0; i < bloom->k; i++, h1 += h2) {
+ if (!__bit_test(bloom->bitstring, h1 % bloom->m))
+ return (WT_NOTFOUND);
+ }
+ return (0);
+}
+
+/*
+ * __wt_bloom_intersection --
+ * Modify the Bloom filter to contain the intersection of this
+ * filter with another.
+ */
+int
+__wt_bloom_intersection(WT_BLOOM *bloom, WT_BLOOM *other)
+{
+ uint64_t i, nbytes;
+
+ if (bloom->k != other->k || bloom->factor != other->factor ||
+ bloom->m != other->m || bloom->n != other->n)
+ return (EINVAL);
+
+ nbytes = __bitstr_size(bloom->m);
+ for (i = 0; i < nbytes; i++)
+ bloom->bitstring[i] &= other->bitstring[i];
+ return (0);
+}
+
+/*
* __wt_bloom_close --
* Close the Bloom filter, release any resources.
*/
diff --git a/src/btree/bt_cursor.c b/src/btree/bt_cursor.c
index 3290fd6374c..69512f45933 100644
--- a/src/btree/bt_cursor.c
+++ b/src/btree/bt_cursor.c
@@ -1093,6 +1093,7 @@ __wt_btcur_range_truncate(WT_CURSOR_BTREE *start, WT_CURSOR_BTREE *stop)
cbt = (start != NULL) ? start : stop;
session = (WT_SESSION_IMPL *)cbt->iface.session;
btree = cbt->btree;
+ WT_STAT_FAST_DATA_INCR(session, cursor_truncate);
/*
* We always delete in a forward direction because it's faster, assert
diff --git a/src/btree/bt_delete.c b/src/btree/bt_delete.c
index 757b7b51cdd..98c6390e0f4 100644
--- a/src/btree/bt_delete.c
+++ b/src/btree/bt_delete.c
@@ -138,6 +138,8 @@ __wt_delete_page(WT_SESSION_IMPL *session, WT_REF *ref, bool *skipp)
WT_ERR(__wt_txn_modify_ref(session, ref));
*skipp = true;
+ WT_STAT_FAST_CONN_INCR(session, rec_page_delete_fast);
+ WT_STAT_FAST_DATA_INCR(session, rec_page_delete_fast);
WT_PUBLISH(ref->state, WT_REF_DELETED);
return (0);
diff --git a/src/btree/bt_split.c b/src/btree/bt_split.c
index eaeac683f9a..caba12b78f1 100644
--- a/src/btree/bt_split.c
+++ b/src/btree/bt_split.c
@@ -693,6 +693,7 @@ static int
__split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new,
uint32_t new_entries, size_t parent_incr, bool exclusive, bool discard)
{
+ WT_DECL_ITEM(scr);
WT_DECL_RET;
WT_IKEY *ikey;
WT_PAGE *parent;
@@ -702,6 +703,7 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new,
uint64_t split_gen;
uint32_t i, j;
uint32_t deleted_entries, parent_entries, result_entries;
+ uint32_t *deleted_refs;
bool complete, empty_parent;
parent = ref->home;
@@ -727,14 +729,20 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new,
* array anyway. Switch them to the special split state, so that any
* reading thread will restart.
*/
+ WT_RET(__wt_scr_alloc(session, 10 * sizeof(uint32_t), &scr));
for (deleted_entries = 0, i = 0; i < parent_entries; ++i) {
next_ref = pindex->index[i];
WT_ASSERT(session, next_ref->state != WT_REF_SPLIT);
- if (next_ref->state == WT_REF_DELETED &&
+ if ((discard && next_ref == ref) ||
+ (next_ref->state == WT_REF_DELETED &&
__wt_delete_page_skip(session, next_ref, true) &&
__wt_atomic_casv32(
- &next_ref->state, WT_REF_DELETED, WT_REF_SPLIT))
- deleted_entries++;
+ &next_ref->state, WT_REF_DELETED, WT_REF_SPLIT))) {
+ WT_ERR(__wt_buf_grow(session, scr,
+ (deleted_entries + 1) * sizeof(uint32_t)));
+ deleted_refs = scr->mem;
+ deleted_refs[deleted_entries++] = i;
+ }
}
/*
@@ -742,7 +750,9 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new,
* pages, less any WT_REFs we're removing (deleted entries plus the
* entry we're replacing).
*/
- result_entries = (parent_entries + new_entries) - (deleted_entries + 1);
+ result_entries = (parent_entries + new_entries) - deleted_entries;
+ if (!discard)
+ --result_entries;
/*
* If there are no remaining entries on the parent, give up, we can't
@@ -795,17 +805,14 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new,
#endif
/*
- * If discarding the page's original WT_REF field, reset it to split and
- * increment the number of entries being discarded. Threads cursoring
- * through the tree were blocked because that WT_REF state was set to
- * locked. Changing the locked state to split unblocks those threads and
- * causes them to re-calculate their position based on the just-updated
- * parent page's index.
+ * If discarding the page's original WT_REF field, reset it to split.
+ * Threads cursoring through the tree were blocked because that WT_REF
+ * state was set to locked. Changing the locked state to split unblocks
+ * those threads and causes them to re-calculate their position based
+ * on the just-updated parent page's index.
*/
- if (discard) {
- ++deleted_entries;
+ if (discard)
WT_PUBLISH(ref->state, WT_REF_SPLIT);
- }
/*
* Push out the changes: not required for correctness, but don't let
@@ -842,11 +849,9 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new,
* Acquire a new split generation.
*/
split_gen = __wt_atomic_addv64(&S2C(session)->split_gen, 1);
- for (i = 0; deleted_entries > 0 && i < parent_entries; ++i) {
- next_ref = pindex->index[i];
- if (next_ref->state != WT_REF_SPLIT)
- continue;
- --deleted_entries;
+ for (i = 0, deleted_refs = scr->mem; i < deleted_entries; ++i) {
+ next_ref = pindex->index[deleted_refs[i]];
+ WT_ASSERT(session, next_ref->state == WT_REF_SPLIT);
/*
* We set the WT_REF to split, discard it, freeing any resources
@@ -906,7 +911,8 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new,
__wt_cache_page_inmem_decr(session, parent, parent_decr);
__wt_page_modify_set(session, parent);
-err: /*
+err: __wt_scr_free(session, &scr);
+ /*
* A note on error handling: if we completed the split, return success,
* nothing really bad can have happened, and our caller has to proceed
* with the split.
diff --git a/src/config/config_def.c b/src/config/config_def.c
index e1284d18c09..d79ce6853e6 100644
--- a/src/config/config_def.c
+++ b/src/config/config_def.c
@@ -295,6 +295,19 @@ static const WT_CONFIG_CHECK confchk_WT_SESSION_drop[] = {
{ NULL, NULL, NULL, NULL, NULL, 0 }
};
+static const WT_CONFIG_CHECK confchk_WT_SESSION_join[] = {
+ { "bloom_bit_count", "int", NULL, "min=2,max=1000", NULL, 0 },
+ { "bloom_hash_count", "int", NULL, "min=2,max=100", NULL, 0 },
+ { "compare", "string",
+ NULL, "choices=[\"eq\",\"ge\",\"gt\",\"le\",\"lt\"]",
+ NULL, 0 },
+ { "count", "int", NULL, NULL, NULL, 0 },
+ { "strategy", "string",
+ NULL, "choices=[\"bloom\",\"default\"]",
+ NULL, 0 },
+ { NULL, NULL, NULL, NULL, NULL, 0 }
+};
+
static const WT_CONFIG_CHECK confchk_WT_SESSION_log_flush[] = {
{ "sync", "string",
NULL, "choices=[\"background\",\"off\",\"on\"]",
@@ -893,6 +906,11 @@ static const WT_CONFIG_ENTRY config_entries[] = {
"force=0,remove_files=",
confchk_WT_SESSION_drop, 2
},
+ { "WT_SESSION.join",
+ "bloom_bit_count=16,bloom_hash_count=8,compare=\"eq\",count=,"
+ "strategy=",
+ confchk_WT_SESSION_join, 5
+ },
{ "WT_SESSION.log_flush",
"sync=on",
confchk_WT_SESSION_log_flush, 1
diff --git a/src/conn/conn_stat.c b/src/conn/conn_stat.c
index 455ec9514f0..31438e10606 100644
--- a/src/conn/conn_stat.c
+++ b/src/conn/conn_stat.c
@@ -154,7 +154,7 @@ __statlog_dump(WT_SESSION_IMPL *session, const char *name, bool conn_stats)
WT_DECL_RET;
int64_t *stats;
int i;
- const char *uri;
+ const char *desc, *uri;
const char *cfg[] = {
WT_CONFIG_BASE(session, WT_SESSION_open_cursor), NULL };
@@ -175,16 +175,19 @@ __statlog_dump(WT_SESSION_IMPL *session, const char *name, bool conn_stats)
* If we don't find an underlying object, silently ignore it, the object
* may exist only intermittently.
*/
- switch (ret = __wt_curstat_open(session, uri, cfg, &cursor)) {
+ switch (ret = __wt_curstat_open(session, uri, NULL, cfg, &cursor)) {
case 0:
cst = (WT_CURSOR_STAT *)cursor;
- for (stats = cst->stats, i = 0; i < cst->stats_count; ++i)
+ for (stats = cst->stats, i = 0; i < cst->stats_count; ++i) {
+ if (conn_stats)
+ WT_ERR(__wt_stat_connection_desc(cst, i,
+ &desc));
+ else
+ WT_ERR(__wt_stat_dsrc_desc(cst, i, &desc));
WT_ERR(__wt_fprintf(conn->stat_fp,
"%s %" PRId64 " %s %s\n",
- conn->stat_stamp, stats[i],
- name, conn_stats ?
- __wt_stat_connection_desc(i) :
- __wt_stat_dsrc_desc(i)));
+ conn->stat_stamp, stats[i], name, desc));
+ }
WT_ERR(cursor->close(cursor));
break;
case EBUSY:
diff --git a/src/cursor/cur_dump.c b/src/cursor/cur_dump.c
index 6c11c4b407e..e5799fbad05 100644
--- a/src/cursor/cur_dump.c
+++ b/src/cursor/cur_dump.c
@@ -329,7 +329,7 @@ __curdump_close(WT_CURSOR *cursor)
cdump = (WT_CURSOR_DUMP *)cursor;
child = cdump->child;
- CURSOR_API_CALL(cursor, session, get_key, NULL);
+ CURSOR_API_CALL(cursor, session, close, NULL);
if (child != NULL)
WT_TRET(child->close(child));
/* We shared the child's URI. */
diff --git a/src/cursor/cur_index.c b/src/cursor/cur_index.c
index fd2a6cd7480..a909eaece99 100644
--- a/src/cursor/cur_index.c
+++ b/src/cursor/cur_index.c
@@ -8,6 +8,20 @@
#include "wt_internal.h"
+ /*
+ * __wt_curindex_joined --
+ * Produce an error that this cursor is being used in a join call.
+ */
+int
+__wt_curindex_joined(WT_CURSOR *cursor)
+{
+ WT_SESSION_IMPL *session;
+
+ session = (WT_SESSION_IMPL *)cursor->session;
+ __wt_errx(session, "index cursor is being used in a join");
+ return (ENOTSUP);
+}
+
/*
* __curindex_get_value --
* WT_CURSOR->get_value implementation for index cursors.
@@ -15,32 +29,16 @@
static int
__curindex_get_value(WT_CURSOR *cursor, ...)
{
- WT_CURSOR_INDEX *cindex;
WT_DECL_RET;
- WT_ITEM *item;
WT_SESSION_IMPL *session;
va_list ap;
- cindex = (WT_CURSOR_INDEX *)cursor;
- CURSOR_API_CALL(cursor, session, get_value, NULL);
- WT_CURSOR_NEEDVALUE(cursor);
-
va_start(ap, cursor);
- if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) {
- ret = __wt_schema_project_merge(session,
- cindex->cg_cursors, cindex->value_plan,
- cursor->value_format, &cursor->value);
- if (ret == 0) {
- item = va_arg(ap, WT_ITEM *);
- item->data = cursor->value.data;
- item->size = cursor->value.size;
- }
- } else
- ret = __wt_schema_project_out(session,
- cindex->cg_cursors, cindex->value_plan, ap);
- va_end(ap);
+ JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL);
+ WT_ERR(__wt_curindex_get_valuev(cursor, ap));
-err: API_END_RET(session, ret);
+err: va_end(ap);
+ API_END_RET(session, ret);
}
/*
@@ -53,7 +51,7 @@ __curindex_set_value(WT_CURSOR *cursor, ...)
WT_DECL_RET;
WT_SESSION_IMPL *session;
- CURSOR_API_CALL(cursor, session, set_value, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, set_value, NULL);
ret = ENOTSUP;
err: cursor->saved_err = ret;
F_CLR(cursor, WT_CURSTD_VALUE_SET);
@@ -72,7 +70,7 @@ __curindex_compare(WT_CURSOR *a, WT_CURSOR *b, int *cmpp)
WT_SESSION_IMPL *session;
cindex = (WT_CURSOR_INDEX *)a;
- CURSOR_API_CALL(a, session, compare, NULL);
+ JOINABLE_CURSOR_API_CALL(a, session, compare, NULL);
/* Check both cursors are "index:" type. */
if (!WT_PREFIX_MATCH(a->uri, "index:") ||
@@ -150,7 +148,7 @@ __curindex_next(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
cindex = (WT_CURSOR_INDEX *)cursor;
- CURSOR_API_CALL(cursor, session, next, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL);
F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
if ((ret = cindex->child->next(cindex->child)) == 0)
@@ -171,7 +169,7 @@ __curindex_prev(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
cindex = (WT_CURSOR_INDEX *)cursor;
- CURSOR_API_CALL(cursor, session, prev, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, prev, NULL);
F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
if ((ret = cindex->child->prev(cindex->child)) == 0)
@@ -194,7 +192,7 @@ __curindex_reset(WT_CURSOR *cursor)
u_int i;
cindex = (WT_CURSOR_INDEX *)cursor;
- CURSOR_API_CALL(cursor, session, reset, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL);
F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
WT_TRET(cindex->child->reset(cindex->child));
@@ -225,7 +223,7 @@ __curindex_search(WT_CURSOR *cursor)
cindex = (WT_CURSOR_INDEX *)cursor;
child = cindex->child;
- CURSOR_API_CALL(cursor, session, search, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, search, NULL);
/*
* We are searching using the application-specified key, which
@@ -284,7 +282,7 @@ __curindex_search_near(WT_CURSOR *cursor, int *exact)
WT_SESSION_IMPL *session;
cindex = (WT_CURSOR_INDEX *)cursor;
- CURSOR_API_CALL(cursor, session, search_near, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, search_near, NULL);
__wt_cursor_set_raw_key(cindex->child, &cursor->key);
if ((ret = cindex->child->search_near(cindex->child, exact)) == 0)
ret = __curindex_move(cindex);
@@ -311,7 +309,7 @@ __curindex_close(WT_CURSOR *cursor)
cindex = (WT_CURSOR_INDEX *)cursor;
idx = cindex->index;
- CURSOR_API_CALL(cursor, session, close, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL);
if ((cp = cindex->cg_cursors) != NULL)
for (i = 0, cp = cindex->cg_cursors;
diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c
new file mode 100644
index 00000000000..be121daa61f
--- /dev/null
+++ b/src/cursor/cur_join.c
@@ -0,0 +1,1049 @@
+/*-
+ * Copyright (c) 2014-2015 MongoDB, Inc.
+ * Copyright (c) 2008-2014 WiredTiger, Inc.
+ * All rights reserved.
+ *
+ * See the file LICENSE for redistribution information.
+ */
+
+#include "wt_internal.h"
+
+/*
+ * __curjoin_entry_iter_init --
+ * Initialize an iteration for the index managed by a join entry.
+ *
+ */
+static int
+__curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ITER **iterp)
+{
+ WT_CURSOR *newcur;
+ WT_CURSOR *to_dup;
+ WT_DECL_RET;
+ const char *raw_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), "raw", NULL };
+ const char *def_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), NULL };
+ const char *uri, **config;
+ char *uribuf;
+ WT_CURSOR_JOIN_ITER *iter;
+ size_t size;
+
+ iter = NULL;
+ uribuf = NULL;
+ to_dup = entry->ends[0].cursor;
+
+ uri = to_dup->uri;
+ if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW))
+ config = &raw_cfg[0];
+ else
+ config = &def_cfg[0];
+
+ if (cjoin->projection != NULL) {
+ size = strlen(uri) + strlen(cjoin->projection) + 1;
+ WT_ERR(__wt_calloc(session, size, 1, &uribuf));
+ snprintf(uribuf, size, "%s%s", uri, cjoin->projection);
+ uri = uribuf;
+ }
+ WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, config,
+ &newcur));
+ WT_ERR(__wt_cursor_dup_position(to_dup, newcur));
+ WT_ERR(__wt_calloc_one(session, &iter));
+ iter->cjoin = cjoin;
+ iter->session = session;
+ iter->entry = entry;
+ iter->cursor = newcur;
+ iter->advance = false;
+ *iterp = iter;
+
+ if (0) {
+err: __wt_free(session, iter);
+ }
+ __wt_free(session, uribuf);
+ return (ret);
+}
+
+/*
+ * __curjoin_pack_recno --
+ * Pack the given recno into a buffer; prepare an item referencing it.
+ *
+ */
+static int
+__curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf,
+ size_t bufsize, WT_ITEM *item)
+{
+ WT_DECL_RET;
+ WT_SESSION *wtsession;
+ size_t sz;
+
+ wtsession = (WT_SESSION *)session;
+ WT_ERR(wiredtiger_struct_size(wtsession, &sz, "r", r));
+ WT_ASSERT(session, sz < bufsize);
+ WT_ERR(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r));
+ item->size = sz;
+ item->data = buf;
+
+err: return (ret);
+}
+
+/*
+ * __curjoin_entry_iter_next --
+ * Get the next item in an iteration.
+ *
+ */
+static int
+__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey,
+ uint64_t *rp)
+{
+ WT_CURSOR *firstcg_cur;
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ uint64_t r;
+
+ if (iter->advance)
+ WT_ERR(iter->cursor->next(iter->cursor));
+ else
+ iter->advance = true;
+
+ session = iter->session;
+ cjoin = iter->cjoin;
+
+ /*
+ * Set our key to the primary key, we'll also need this
+ * to check membership.
+ */
+ if (iter->entry->index != NULL)
+ firstcg_cur = ((WT_CURSOR_INDEX *)iter->cursor)->cg_cursors[0];
+ else
+ firstcg_cur = ((WT_CURSOR_TABLE *)iter->cursor)->cg_cursors[0];
+ if (WT_CURSOR_RECNO(&cjoin->iface)) {
+ r = *(uint64_t *)firstcg_cur->key.data;
+ WT_ERR(__curjoin_pack_recno(session, r, cjoin->recno_buf,
+ sizeof(cjoin->recno_buf), primkey));
+ *rp = r;
+ } else {
+ WT_ITEM_SET(*primkey, firstcg_cur->key);
+ *rp = 0;
+ }
+ iter->curkey = primkey;
+ iter->entry->stats.actual_count++;
+ iter->entry->stats.accesses++;
+
+err: return (ret);
+}
+
+/*
+ * __curjoin_entry_iter_reset --
+ * Reset an iteration to the starting point.
+ *
+ */
+static int
+__curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter)
+{
+ WT_DECL_RET;
+
+ if (iter->advance) {
+ WT_ERR(iter->cursor->reset(iter->cursor));
+ WT_ERR(__wt_cursor_dup_position(
+ iter->cjoin->entries[0].ends[0].cursor, iter->cursor));
+ iter->advance = false;
+ iter->entry->stats.actual_count = 0;
+ }
+
+err: return (ret);
+}
+
+/*
+ * __curjoin_entry_iter_ready --
+ * The iterator is positioned.
+ *
+ */
+static bool
+__curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter)
+{
+ return (iter->advance);
+}
+
+/*
+ * __curjoin_entry_iter_close --
+ * Close the iteration, release resources.
+ *
+ */
+static int
+__curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *iter)
+{
+ if (iter->cursor != NULL)
+ return (iter->cursor->close(iter->cursor));
+ else
+ return (0);
+ __wt_free(iter->session, iter);
+}
+
+/*
+ * __curjoin_get_key --
+ * WT_CURSOR->get_key for join cursors.
+ */
+static int
+__curjoin_get_key(WT_CURSOR *cursor, ...)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ va_list ap;
+
+ cjoin = (WT_CURSOR_JOIN *)cursor;
+
+ va_start(ap, cursor);
+ CURSOR_API_CALL(cursor, session, get_key, NULL);
+
+ if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) ||
+ !__curjoin_entry_iter_ready(cjoin->iter)) {
+ __wt_errx(session, "join cursor must be advanced with next()");
+ WT_ERR(EINVAL);
+ }
+ WT_ERR(__wt_cursor_get_keyv(cursor, cursor->flags, ap));
+
+err: va_end(ap);
+ API_END_RET(session, ret);
+}
+
+/*
+ * __curjoin_get_value --
+ * WT_CURSOR->get_value for join cursors.
+ */
+static int
+__curjoin_get_value(WT_CURSOR *cursor, ...)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_CURSOR_JOIN_ITER *iter;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ va_list ap;
+
+ cjoin = (WT_CURSOR_JOIN *)cursor;
+ iter = cjoin->iter;
+
+ va_start(ap, cursor);
+ CURSOR_API_CALL(cursor, session, get_value, NULL);
+
+ if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) ||
+ !__curjoin_entry_iter_ready(iter)) {
+ __wt_errx(session, "join cursor must be advanced with next()");
+ WT_ERR(EINVAL);
+ }
+ if (iter->entry->index != NULL)
+ WT_ERR(__wt_curindex_get_valuev(iter->cursor, ap));
+ else
+ WT_ERR(__wt_curtable_get_valuev(iter->cursor, ap));
+
+err: va_end(ap);
+ API_END_RET(session, ret);
+}
+
+/*
+ * __curjoin_init_bloom --
+ * Populate Bloom filters
+ */
+static int
+__curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_CURSOR_JOIN_ENTRY *entry, WT_BLOOM *bloom)
+{
+ WT_COLLATOR *collator;
+ WT_CURSOR *c;
+ WT_CURSOR_INDEX *cindex;
+ WT_CURSOR_JOIN_ENDPOINT *end, *endmax;
+ WT_DECL_RET;
+ WT_ITEM curkey, curvalue, *k;
+ WT_TABLE *maintable;
+ char *uri;
+ const char *raw_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), "raw", NULL };
+ const char *mainkey_str, *p;
+ void *allocbuf;
+ size_t mainkey_len, size;
+ u_int i;
+ int cmp, skip;
+
+ c = NULL;
+ allocbuf = NULL;
+ skip = 0;
+
+ if (entry->index != NULL) {
+ /*
+ * Open a cursor having a projection of the keys of the
+ * index we're comparing against. Open it raw, we're
+ * going to compare it to the raw keys of the
+ * reference cursors.
+ */
+ maintable = ((WT_CURSOR_TABLE *)entry->main)->table;
+ mainkey_str = maintable->colconf.str + 1;
+ for (p = mainkey_str, i = 0;
+ p != NULL && i < maintable->nkey_columns; i++)
+ p = strchr(p + 1, ',');
+ WT_ASSERT(session, p != 0);
+ mainkey_len = WT_PTRDIFF(p, mainkey_str);
+ size = strlen(entry->index->name) + mainkey_len + 3;
+ WT_ERR(__wt_calloc(session, size, 1, &uri));
+ snprintf(uri, size, "%s(%.*s)", entry->index->name,
+ (int)mainkey_len, mainkey_str);
+ } else {
+ /*
+ * For joins on the main table, we just need the primary
+ * key for comparison, we don't need any values.
+ */
+ size = strlen(cjoin->table->name) + 3;
+ WT_ERR(__wt_calloc(session, size, 1, &uri));
+ snprintf(uri, size, "%s()", cjoin->table->name);
+ }
+ WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, raw_cfg, &c));
+
+ /* Initially position the cursor if necessary. */
+ endmax = &entry->ends[entry->ends_next];
+ if ((end = &entry->ends[0]) < endmax &&
+ F_ISSET(end, WT_CURJOIN_END_GE)) {
+ WT_ERR(__wt_cursor_dup_position(end->cursor, c));
+ if (end->flags == WT_CURJOIN_END_GE)
+ skip = 1;
+ }
+ collator = (entry->index == NULL) ? NULL : entry->index->collator;
+ while (ret == 0) {
+ c->get_key(c, &curkey);
+ if (entry->index != NULL) {
+ cindex = (WT_CURSOR_INDEX *)c;
+ if (cindex->index->extractor == NULL) {
+ /*
+ * Repack so it's comparable to the
+ * reference endpoints.
+ */
+ k = &cindex->child->key;
+ WT_ERR(__wt_struct_repack(session,
+ cindex->child->key_format,
+ entry->main->value_format, k, &curkey,
+ &allocbuf));
+ } else
+ curkey = cindex->child->key;
+ }
+ for (end = &entry->ends[skip]; end < endmax; end++) {
+ WT_ERR(__wt_compare(session, collator, &curkey,
+ &end->key, &cmp));
+ if (!F_ISSET(end, WT_CURJOIN_END_LT)) {
+ if (cmp < 0 || (cmp == 0 &&
+ !F_ISSET(end, WT_CURJOIN_END_EQ)))
+ goto advance;
+ if (cmp > 0) {
+ if (F_ISSET(end, WT_CURJOIN_END_GT))
+ skip = 1;
+ else
+ goto done;
+ }
+ } else {
+ if (cmp > 0 || (cmp == 0 &&
+ !F_ISSET(end, WT_CURJOIN_END_EQ)))
+ goto done;
+ }
+ }
+ if (entry->index != NULL)
+ c->get_value(c, &curvalue);
+ else
+ c->get_key(c, &curvalue);
+ WT_ERR(__wt_bloom_insert(bloom, &curvalue));
+ entry->stats.actual_count++;
+advance:
+ if ((ret = c->next(c)) == WT_NOTFOUND)
+ break;
+ }
+done:
+ WT_ERR_NOTFOUND_OK(ret);
+
+err: if (c != NULL)
+ WT_TRET(c->close(c));
+ __wt_free(session, allocbuf);
+ return (ret);
+}
+
+/*
+ * __curjoin_endpoint_init_key --
+ * Set the key in the reference endpoint.
+ */
+static int
+__curjoin_endpoint_init_key(WT_SESSION_IMPL *session,
+ WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ENDPOINT *endpoint)
+{
+ WT_CURSOR *cursor;
+ WT_CURSOR_INDEX *cindex;
+ WT_DECL_RET;
+ WT_ITEM *k;
+ uint64_t r;
+ void *allocbuf;
+
+ allocbuf = NULL;
+ if ((cursor = endpoint->cursor) != NULL) {
+ if (entry->index != NULL) {
+ cindex = (WT_CURSOR_INDEX *)endpoint->cursor;
+ if (cindex->index->extractor == NULL) {
+ WT_ERR(__wt_struct_repack(session,
+ cindex->child->key_format,
+ entry->main->value_format,
+ &cindex->child->key, &endpoint->key,
+ &allocbuf));
+ if (allocbuf != NULL)
+ F_SET(endpoint, WT_CURJOIN_END_OWN_KEY);
+ } else
+ endpoint->key = cindex->child->key;
+ } else {
+ k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key;
+ if (WT_CURSOR_RECNO(cursor)) {
+ r = *(uint64_t *)k->data;
+ WT_ERR(__curjoin_pack_recno(session, r,
+ endpoint->recno_buf,
+ sizeof(endpoint->recno_buf),
+ &endpoint->key));
+ }
+ else
+ endpoint->key = *k;
+ }
+ }
+ if (0) {
+err: __wt_free(session, allocbuf);
+ }
+ return (ret);
+}
+
+/*
+ * __curjoin_init_iter --
+ * Initialize before any iteration.
+ */
+static int
+__curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin)
+{
+ WT_BLOOM *bloom;
+ WT_DECL_RET;
+ WT_CURSOR_JOIN_ENTRY *je, *jeend, *je2;
+ WT_CURSOR_JOIN_ENDPOINT *end;
+ uint64_t k, m;
+
+ if (cjoin->entries_next == 0) {
+ __wt_errx(session, "join cursor has not yet been joined "
+ "with any other cursors");
+ WT_ERR(EINVAL);
+ }
+
+ je = &cjoin->entries[0];
+ WT_ERR(__curjoin_entry_iter_init(session, cjoin, je, &cjoin->iter));
+
+ jeend = &cjoin->entries[cjoin->entries_next];
+ for (je = cjoin->entries; je < jeend; je++) {
+ __wt_stat_join_init_single(&je->stats);
+ for (end = &je->ends[0]; end < &je->ends[je->ends_next];
+ end++)
+ WT_ERR(__curjoin_endpoint_init_key(session, je, end));
+
+ /*
+ * The first entry is iterated as the 'outermost' cursor.
+ * For the common GE case, we don't have to test against
+ * the left reference key, we know it will be true since
+ * the btree is ordered.
+ */
+ if (je == cjoin->entries && je->ends[0].flags ==
+ (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ))
+ F_SET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT);
+
+ if (F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) {
+ if (je->bloom == NULL) {
+ /*
+ * Look for compatible filters to be shared,
+ * pick compatible numbers for bit counts
+ * and number of hashes.
+ */
+ m = je->bloom_bit_count;
+ k = je->bloom_hash_count;
+ for (je2 = je + 1; je2 < jeend; je2++)
+ if (F_ISSET(je2,
+ WT_CURJOIN_ENTRY_BLOOM) &&
+ je2->count == je->count) {
+ m = WT_MAX(
+ je2->bloom_bit_count, m);
+ k = WT_MAX(
+ je2->bloom_hash_count, k);
+ }
+ je->bloom_bit_count = m;
+ je->bloom_hash_count = k;
+ WT_ERR(__wt_bloom_create(session, NULL,
+ NULL, je->count, m, k, &je->bloom));
+ F_SET(je, WT_CURJOIN_ENTRY_OWN_BLOOM);
+ WT_ERR(__curjoin_init_bloom(session, cjoin,
+ je, je->bloom));
+ /*
+ * Share the Bloom filter, making all
+ * config info consistent.
+ */
+ for (je2 = je + 1; je2 < jeend; je2++)
+ if (F_ISSET(je2,
+ WT_CURJOIN_ENTRY_BLOOM) &&
+ je2->count == je->count) {
+ WT_ASSERT(session,
+ je2->bloom == NULL);
+ je2->bloom = je->bloom;
+ je2->bloom_bit_count = m;
+ je2->bloom_hash_count = k;
+ }
+ } else {
+ /*
+ * Create a temporary filter that we'll
+ * merge into the shared one. The Bloom
+ * parameters of the two filters must match.
+ */
+ WT_ERR(__wt_bloom_create(session, NULL,
+ NULL, je->count, je->bloom_bit_count,
+ je->bloom_hash_count, &bloom));
+ WT_ERR(__curjoin_init_bloom(session, cjoin,
+ je, bloom));
+ WT_ERR(__wt_bloom_intersection(je->bloom,
+ bloom));
+ WT_ERR(__wt_bloom_close(bloom));
+ }
+ }
+ }
+ F_SET(cjoin, WT_CURJOIN_INITIALIZED);
+
+err:
+ return (ret);
+}
+
+/*
+ * __curjoin_entry_in_range --
+ * Check if a key is in the range specified by the entry, returning
+ * WT_NOTFOUND if not.
+ */
+static int
+__curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry,
+ WT_ITEM *curkey, bool skip_left)
+{
+ WT_COLLATOR *collator;
+ WT_CURSOR_JOIN_ENDPOINT *end, *endmax;
+ WT_DECL_RET;
+ int cmp;
+
+ collator = (entry->index != NULL) ? entry->index->collator : NULL;
+ endmax = &entry->ends[entry->ends_next];
+ for (end = &entry->ends[skip_left ? 1 : 0]; end < endmax; end++) {
+ WT_ERR(__wt_compare(session, collator, curkey, &end->key,
+ &cmp));
+ if (!F_ISSET(end, WT_CURJOIN_END_LT)) {
+ if (cmp < 0 ||
+ (cmp == 0 &&
+ !F_ISSET(end, WT_CURJOIN_END_EQ)) ||
+ (cmp > 0 && !F_ISSET(end, WT_CURJOIN_END_GT)))
+ WT_ERR(WT_NOTFOUND);
+ } else {
+ if (cmp > 0 ||
+ (cmp == 0 &&
+ !F_ISSET(end, WT_CURJOIN_END_EQ)) ||
+ (cmp < 0 && !F_ISSET(end, WT_CURJOIN_END_LT)))
+ WT_ERR(WT_NOTFOUND);
+ }
+ }
+err: return (ret);
+}
+
+typedef struct {
+ WT_CURSOR iface;
+ WT_CURSOR_JOIN_ENTRY *entry;
+ int ismember;
+} WT_CURJOIN_EXTRACTOR;
+
+/*
+ * __curjoin_extract_insert --
+ * Handle a key produced by a custom extractor.
+ */
+static int
+__curjoin_extract_insert(WT_CURSOR *cursor) {
+ WT_CURJOIN_EXTRACTOR *cextract;
+ WT_DECL_RET;
+ WT_ITEM ikey;
+ WT_SESSION_IMPL *session;
+
+ cextract = (WT_CURJOIN_EXTRACTOR *)cursor;
+ /*
+ * This insert method may be called multiple times during a single
+ * extraction. If we already have a definitive answer to the
+ * membership question, exit early.
+ */
+ if (cextract->ismember)
+ return (0);
+
+ session = (WT_SESSION_IMPL *)cursor->session;
+
+ WT_ITEM_SET(ikey, cursor->key);
+ /*
+ * We appended a padding byte to the key to avoid rewriting the last
+ * column. Strip that away here.
+ */
+ WT_ASSERT(session, ikey.size > 0);
+ --ikey.size;
+
+ ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false);
+ if (ret == WT_NOTFOUND)
+ ret = 0;
+ else
+ cextract->ismember = 1;
+
+ return (ret);
+}
+
+/*
+ * __curjoin_entry_member --
+ * Do a membership check for a particular index that was joined,
+ * if not a member, returns WT_NOTFOUND.
+ */
+static int
+__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_CURSOR_JOIN_ENTRY *entry, bool skip_left)
+{
+ WT_CURJOIN_EXTRACTOR extract_cursor;
+ WT_CURSOR *c;
+ WT_CURSOR_STATIC_INIT(iface,
+ __wt_cursor_get_key, /* get-key */
+ __wt_cursor_get_value, /* get-value */
+ __wt_cursor_set_key, /* set-key */
+ __wt_cursor_set_value, /* set-value */
+ __wt_cursor_notsup, /* compare */
+ __wt_cursor_notsup, /* equals */
+ __wt_cursor_notsup, /* next */
+ __wt_cursor_notsup, /* prev */
+ __wt_cursor_notsup, /* reset */
+ __wt_cursor_notsup, /* search */
+ __wt_cursor_notsup, /* search-near */
+ __curjoin_extract_insert, /* insert */
+ __wt_cursor_notsup, /* update */
+ __wt_cursor_notsup, /* reconfigure */
+ __wt_cursor_notsup, /* remove */
+ __wt_cursor_notsup); /* close */
+ WT_DECL_RET;
+ WT_INDEX *idx;
+ WT_ITEM *key, v;
+ bool bloom_found;
+
+ key = cjoin->iter->curkey;
+ entry->stats.accesses++;
+ bloom_found = false;
+
+ if (entry->bloom != NULL) {
+ /*
+ * If we don't own the Bloom filter, we must be sharing one
+ * in a previous entry. So the shared filter has already
+ * been checked and passed.
+ */
+ if (!F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM))
+ return (0);
+
+ /*
+ * If the item is not in the Bloom filter, we return
+ * immediately, otherwise, we still need to check the
+ * long way.
+ */
+ WT_ERR(__wt_bloom_inmem_get(entry->bloom, key));
+ bloom_found = true;
+ }
+ if (entry->index != NULL) {
+ c = entry->main;
+ c->set_key(c, key);
+ if ((ret = c->search(c)) == 0)
+ ret = c->get_value(c, &v);
+ else if (ret == WT_NOTFOUND)
+ WT_ERR_MSG(session, WT_ERROR,
+ "main table for join is missing entry.");
+ c->reset(c);
+ WT_ERR(ret);
+ } else
+ v = *key;
+
+ if ((idx = entry->index) != NULL && idx->extractor != NULL) {
+ extract_cursor.iface = iface;
+ extract_cursor.iface.session = &session->iface;
+ extract_cursor.iface.key_format = idx->exkey_format;
+ extract_cursor.ismember = 0;
+ extract_cursor.entry = entry;
+ WT_ERR(idx->extractor->extract(idx->extractor,
+ &session->iface, key, &v, &extract_cursor.iface));
+ if (!extract_cursor.ismember)
+ WT_ERR(WT_NOTFOUND);
+ } else
+ WT_ERR(__curjoin_entry_in_range(session, entry, &v, skip_left));
+
+ if (0) {
+err: if (ret == WT_NOTFOUND && bloom_found)
+ entry->stats.bloom_false_positive++;
+ }
+ return (ret);
+}
+
+/*
+ * __curjoin_next --
+ * WT_CURSOR::next for join cursors.
+ */
+static int
+__curjoin_next(WT_CURSOR *cursor)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ bool skip_left;
+ u_int i;
+
+ cjoin = (WT_CURSOR_JOIN *)cursor;
+
+ CURSOR_API_CALL(cursor, session, next, NULL);
+
+ if (F_ISSET(cjoin, WT_CURJOIN_ERROR)) {
+ __wt_errx(session, "join cursor encountered previous error");
+ WT_ERR(WT_ERROR);
+ }
+ if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED))
+ WT_ERR(__curjoin_init_iter(session, cjoin));
+
+nextkey:
+ if ((ret = __curjoin_entry_iter_next(cjoin->iter, &cursor->key,
+ &cursor->recno)) == 0) {
+ F_SET(cursor, WT_CURSTD_KEY_EXT);
+
+ /*
+ * We may have already established membership for the
+ * 'left' case for the first entry, since we're
+ * using that in our iteration.
+ */
+ skip_left = F_ISSET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT);
+ for (i = 0; i < cjoin->entries_next; i++) {
+ ret = __curjoin_entry_member(session, cjoin,
+ &cjoin->entries[i], skip_left);
+ if (ret == WT_NOTFOUND)
+ goto nextkey;
+ skip_left = false;
+ WT_ERR(ret);
+ }
+ }
+
+ if (0) {
+err: F_SET(cjoin, WT_CURJOIN_ERROR);
+ }
+ API_END_RET(session, ret);
+}
+
+/*
+ * __curjoin_reset --
+ * WT_CURSOR::reset for join cursors.
+ */
+static int
+__curjoin_reset(WT_CURSOR *cursor)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+
+ cjoin = (WT_CURSOR_JOIN *)cursor;
+
+ CURSOR_API_CALL(cursor, session, reset, NULL);
+
+ if (F_ISSET(cjoin, WT_CURJOIN_INITIALIZED))
+ WT_ERR(__curjoin_entry_iter_reset(cjoin->iter));
+
+err: API_END_RET(session, ret);
+}
+
+/*
+ * __curjoin_close --
+ * WT_CURSOR::close for join cursors.
+ */
+static int
+__curjoin_close(WT_CURSOR *cursor)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_CURSOR_JOIN_ENDPOINT *end;
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ u_int i;
+
+ cjoin = (WT_CURSOR_JOIN *)cursor;
+
+ CURSOR_API_CALL(cursor, session, close, NULL);
+
+ __wt_schema_release_table(session, cjoin->table);
+ /* These are owned by the table */
+ cursor->internal_uri = NULL;
+ cursor->key_format = NULL;
+ if (cjoin->projection != NULL) {
+ __wt_free(session, cjoin->projection);
+ __wt_free(session, cursor->value_format);
+ }
+
+ for (entry = cjoin->entries, i = 0; i < cjoin->entries_next;
+ entry++, i++) {
+ if (entry->main != NULL)
+ WT_TRET(entry->main->close(entry->main));
+ if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM))
+ WT_TRET(__wt_bloom_close(entry->bloom));
+ for (end = &entry->ends[0];
+ end < &entry->ends[entry->ends_next]; end++) {
+ F_CLR(end->cursor, WT_CURSTD_JOINED);
+ if (F_ISSET(end, WT_CURJOIN_END_OWN_KEY))
+ __wt_free(session, end->key.data);
+ }
+ }
+
+ if (cjoin->iter != NULL)
+ WT_TRET(__curjoin_entry_iter_close(cjoin->iter));
+ __wt_free(session, cjoin->entries);
+ WT_TRET(__wt_cursor_close(cursor));
+
+err: API_END_RET(session, ret);
+}
+
+/*
+ * __wt_curjoin_open --
+ * Initialize a join cursor.
+ *
+ * Join cursors are read-only.
+ */
+int
+__wt_curjoin_open(WT_SESSION_IMPL *session,
+ const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp)
+{
+ WT_CURSOR_STATIC_INIT(iface,
+ __curjoin_get_key, /* get-key */
+ __curjoin_get_value, /* get-value */
+ __wt_cursor_notsup, /* set-key */
+ __wt_cursor_notsup, /* set-value */
+ __wt_cursor_notsup, /* compare */
+ __wt_cursor_notsup, /* equals */
+ __curjoin_next, /* next */
+ __wt_cursor_notsup, /* prev */
+ __curjoin_reset, /* reset */
+ __wt_cursor_notsup, /* search */
+ __wt_cursor_notsup, /* search-near */
+ __wt_cursor_notsup, /* insert */
+ __wt_cursor_notsup, /* update */
+ __wt_cursor_notsup, /* remove */
+ __wt_cursor_notsup, /* reconfigure */
+ __curjoin_close); /* close */
+ WT_CURSOR *cursor;
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_ITEM(tmp);
+ WT_DECL_RET;
+ WT_TABLE *table;
+ size_t size;
+ const char *tablename, *columns;
+
+ WT_STATIC_ASSERT(offsetof(WT_CURSOR_JOIN, iface) == 0);
+
+ if (!WT_PREFIX_SKIP(uri, "join:"))
+ return (EINVAL);
+ tablename = uri;
+ if (!WT_PREFIX_SKIP(tablename, "table:"))
+ return (EINVAL);
+
+ columns = strchr(tablename, '(');
+ if (columns == NULL)
+ size = strlen(tablename);
+ else
+ size = WT_PTRDIFF(columns, tablename);
+ WT_RET(__wt_schema_get_table(session, tablename, size, 0, &table));
+
+ WT_RET(__wt_calloc_one(session, &cjoin));
+ cursor = &cjoin->iface;
+ *cursor = iface;
+ cursor->session = &session->iface;
+ cursor->internal_uri = table->name;
+ cursor->key_format = table->key_format;
+ cursor->value_format = table->value_format;
+ cjoin->table = table;
+
+ /* Handle projections. */
+ WT_ERR(__wt_scr_alloc(session, 0, &tmp));
+ if (columns != NULL) {
+ WT_ERR(__wt_struct_reformat(session, table,
+ columns, strlen(columns), NULL, 1, tmp));
+ WT_ERR(__wt_strndup(
+ session, tmp->data, tmp->size, &cursor->value_format));
+ WT_ERR(__wt_strdup(session, columns, &cjoin->projection));
+ }
+
+ if (owner != NULL)
+ WT_ERR(EINVAL);
+
+ WT_ERR(__wt_cursor_init(cursor, uri, owner, cfg, cursorp));
+
+ if (0) {
+err: WT_TRET(__curjoin_close(cursor));
+ *cursorp = NULL;
+ }
+
+ __wt_scr_free(session, &tmp);
+ return (ret);
+}
+
+/*
+ * __wt_curjoin_join --
+ * Add a new join to a join cursor.
+ */
+int
+__wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
+ WT_INDEX *idx, WT_CURSOR *ref_cursor, uint32_t flags, uint32_t range,
+ uint64_t count, uint64_t bloom_bit_count, uint64_t bloom_hash_count)
+{
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_DECL_RET;
+ WT_CURSOR_JOIN_ENDPOINT *end, *newend;
+ bool hasins, needbloom, range_eq;
+ u_int i, ins, nonbloom;
+ const char *raw_cfg[] = { WT_CONFIG_BASE(
+ session, WT_SESSION_open_cursor), "raw", NULL };
+ char *main_uri;
+ size_t namesize, newsize;
+
+ entry = NULL;
+ hasins = needbloom = false;
+ ins = 0; /* -Wuninitialized */
+ main_uri = NULL;
+ nonbloom = 0; /* -Wuninitialized */
+ namesize = strlen(cjoin->table->name);
+
+ for (i = 0; i < cjoin->entries_next; i++) {
+ if (cjoin->entries[i].index == idx) {
+ entry = &cjoin->entries[i];
+ break;
+ }
+ if (!needbloom && i > 0 &&
+ !F_ISSET(&cjoin->entries[i], WT_CURJOIN_ENTRY_BLOOM)) {
+ needbloom = true;
+ nonbloom = i;
+ }
+ }
+ if (entry == NULL) {
+ WT_ERR(__wt_realloc_def(session, &cjoin->entries_allocated,
+ cjoin->entries_next + 1, &cjoin->entries));
+ if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM) && needbloom) {
+ /*
+ * Reorder the list so that after the first entry,
+ * the Bloom filtered entries come next, followed by
+ * the non-Bloom entries. Once the Bloom filters
+ * are built, determining membership via Bloom is
+ * faster than without Bloom, so we can answer
+ * membership questions more quickly, and with less
+ * I/O, with the Bloom entries first.
+ */
+ entry = &cjoin->entries[nonbloom];
+ memmove(entry + 1, entry,
+ (cjoin->entries_next - nonbloom) *
+ sizeof(WT_CURSOR_JOIN_ENTRY));
+ memset(entry, 0, sizeof(WT_CURSOR_JOIN_ENTRY));
+ }
+ else
+ entry = &cjoin->entries[cjoin->entries_next];
+ entry->index = idx;
+ entry->flags = flags;
+ entry->count = count;
+ entry->bloom_bit_count = bloom_bit_count;
+ entry->bloom_hash_count = bloom_hash_count;
+ ++cjoin->entries_next;
+ } else {
+ /* Merge the join into an existing entry for this index */
+ if (count != 0 && entry->count != 0 && entry->count != count) {
+ __wt_errx(session, "count=%" PRIu64 " does not match "
+ "previous count=%" PRIu64 " for this index",
+ count, entry->count);
+ WT_ERR(EINVAL);
+ }
+ if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM) !=
+ F_ISSET(entry, WT_CURJOIN_ENTRY_BLOOM)) {
+ __wt_errx(session, "join has incompatible strategy "
+ "values for the same index");
+ WT_ERR(EINVAL);
+ }
+ /*
+ * Check against other comparisons (we call them endpoints)
+ * already set up for this index.
+ * We allow either:
+ * - one or more "eq" (with disjunction)
+ * - exactly one "eq" (with conjunction)
+ * - exactly one of "gt" or "ge" (conjunction or disjunction)
+ * - exactly one of "lt" or "le" (conjunction or disjunction)
+ * - one of "gt"/"ge" along with one of "lt"/"le"
+ * (currently restricted to conjunction).
+ *
+ * Some other combinations, although expressible either do
+ * not make sense (X == 3 AND X == 5) or are reducible (X <
+ * 7 AND X < 9). Other specific cases of (X < 7 OR X > 15)
+ * or (X == 4 OR X > 15) make sense but we don't handle yet.
+ */
+ for (i = 0; i < entry->ends_next; i++) {
+ end = &entry->ends[i];
+ range_eq = (range == WT_CURJOIN_END_EQ);
+ if ((F_ISSET(end, WT_CURJOIN_END_GT) &&
+ ((range & WT_CURJOIN_END_GT) != 0 || range_eq)) ||
+ (F_ISSET(end, WT_CURJOIN_END_LT) &&
+ ((range & WT_CURJOIN_END_LT) != 0 || range_eq)) ||
+ (end->flags == WT_CURJOIN_END_EQ &&
+ (range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT))
+ != 0)) {
+ __wt_errx(session,
+ "join has overlapping ranges");
+ WT_ERR(EINVAL);
+ }
+ if (range == WT_CURJOIN_END_EQ &&
+ end->flags == WT_CURJOIN_END_EQ &&
+ !F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)) {
+ __wt_errx(session,
+ "compare=eq can only be combined "
+ "using operation=or");
+ WT_ERR(EINVAL);
+ }
+
+ /*
+ * Sort "gt"/"ge" to the front, followed by any number
+ * of "eq", and finally "lt"/"le".
+ */
+ if (!hasins &&
+ ((range & WT_CURJOIN_END_GT) != 0 ||
+ (range == WT_CURJOIN_END_EQ &&
+ !F_ISSET(end, WT_CURJOIN_END_GT)))) {
+ ins = i;
+ hasins = true;
+ }
+ }
+ /* All checks completed, merge any new configuration now */
+ entry->count = count;
+ entry->bloom_bit_count =
+ WT_MAX(entry->bloom_bit_count, bloom_bit_count);
+ entry->bloom_hash_count =
+ WT_MAX(entry->bloom_hash_count, bloom_hash_count);
+ }
+ WT_ERR(__wt_realloc_def(session, &entry->ends_allocated,
+ entry->ends_next + 1, &entry->ends));
+ if (!hasins)
+ ins = entry->ends_next;
+ newend = &entry->ends[ins];
+ memmove(newend + 1, newend,
+ (entry->ends_next - ins) * sizeof(WT_CURSOR_JOIN_ENDPOINT));
+ memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT));
+ entry->ends_next++;
+ newend->cursor = ref_cursor;
+ F_SET(newend, range);
+
+ /* Open the main file with a projection of the indexed columns. */
+ if (entry->main == NULL && entry->index != NULL) {
+ namesize = strlen(cjoin->table->name);
+ newsize = namesize + entry->index->colconf.len + 1;
+ WT_ERR(__wt_calloc(session, 1, newsize, &main_uri));
+ snprintf(main_uri, newsize, "%s%.*s",
+ cjoin->table->name, (int)entry->index->colconf.len,
+ entry->index->colconf.str);
+ WT_ERR(__wt_open_cursor(session, main_uri,
+ (WT_CURSOR *)cjoin, raw_cfg, &entry->main));
+ }
+
+err: if (main_uri != NULL)
+ __wt_free(session, main_uri);
+ return (ret);
+}
diff --git a/src/cursor/cur_stat.c b/src/cursor/cur_stat.c
index 81d028c165a..65d2dc81406 100644
--- a/src/cursor/cur_stat.c
+++ b/src/cursor/cur_stat.c
@@ -103,7 +103,7 @@ __curstat_get_value(WT_CURSOR *cursor, ...)
va_list ap;
size_t size;
uint64_t *v;
- const char **p;
+ const char *desc, **p;
cst = (WT_CURSOR_STAT *)cursor;
va_start(ap, cursor);
@@ -111,15 +111,13 @@ __curstat_get_value(WT_CURSOR *cursor, ...)
WT_CURSOR_NEEDVALUE(cursor);
+ WT_ERR(cst->stats_desc(cst, WT_STAT_KEY_OFFSET(cst), &desc));
if (F_ISSET(cursor, WT_CURSTD_RAW)) {
WT_ERR(__wt_struct_size(session, &size, cursor->value_format,
- cst->stats_desc(WT_STAT_KEY_OFFSET(cst)),
- cst->pv.data, cst->v));
+ desc, cst->pv.data, cst->v));
WT_ERR(__wt_buf_initsize(session, &cursor->value, size));
WT_ERR(__wt_struct_pack(session, cursor->value.mem, size,
- cursor->value_format,
- cst->stats_desc(WT_STAT_KEY_OFFSET(cst)),
- cst->pv.data, cst->v));
+ cursor->value_format, desc, cst->pv.data, cst->v));
item = va_arg(ap, WT_ITEM *);
item->data = cursor->value.data;
@@ -130,7 +128,7 @@ __curstat_get_value(WT_CURSOR *cursor, ...)
* pointer support isn't documented, but it's a cheap test.
*/
if ((p = va_arg(ap, const char **)) != NULL)
- *p = cst->stats_desc(WT_STAT_KEY_OFFSET(cst));
+ *p = desc;
if ((p = va_arg(ap, const char **)) != NULL)
*p = cst->pv.data;
if ((v = va_arg(ap, uint64_t *)) != NULL)
@@ -201,7 +199,9 @@ __curstat_next(WT_CURSOR *cursor)
/* Initialize on demand. */
if (cst->notinitialized) {
WT_ERR(__wt_curstat_init(
- session, cursor->internal_uri, cst->cfg, cst));
+ session, cursor->internal_uri, NULL, cst->cfg, cst));
+ if (cst->next_set != NULL)
+ WT_ERR((*cst->next_set)(session, cst, true, true));
cst->notinitialized = false;
}
@@ -211,15 +211,19 @@ __curstat_next(WT_CURSOR *cursor)
cst->key = WT_STAT_KEY_MIN(cst);
} else if (cst->key < WT_STAT_KEY_MAX(cst))
++cst->key;
- else {
- F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
+ else if (cst->next_set != NULL)
+ WT_ERR((*cst->next_set)(session, cst, true, false));
+ else
WT_ERR(WT_NOTFOUND);
- }
+
cst->v = (uint64_t)cst->stats[WT_STAT_KEY_OFFSET(cst)];
WT_ERR(__curstat_print_value(session, cst->v, &cst->pv));
F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
-err: API_END_RET(session, ret);
+ if (0) {
+err: F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
+ }
+ API_END_RET(session, ret);
}
/*
@@ -239,7 +243,9 @@ __curstat_prev(WT_CURSOR *cursor)
/* Initialize on demand. */
if (cst->notinitialized) {
WT_ERR(__wt_curstat_init(
- session, cursor->internal_uri, cst->cfg, cst));
+ session, cursor->internal_uri, NULL, cst->cfg, cst));
+ if (cst->next_set != NULL)
+ WT_ERR((*cst->next_set)(session, cst, false, true));
cst->notinitialized = false;
}
@@ -249,16 +255,19 @@ __curstat_prev(WT_CURSOR *cursor)
cst->key = WT_STAT_KEY_MAX(cst);
} else if (cst->key > WT_STAT_KEY_MIN(cst))
--cst->key;
- else {
- F_CLR(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
+ else if (cst->next_set != NULL)
+ WT_ERR((*cst->next_set)(session, cst, false, false));
+ else
WT_ERR(WT_NOTFOUND);
- }
cst->v = (uint64_t)cst->stats[WT_STAT_KEY_OFFSET(cst)];
WT_ERR(__curstat_print_value(session, cst->v, &cst->pv));
F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
-err: API_END_RET(session, ret);
+ if (0) {
+err: F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
+ }
+ API_END_RET(session, ret);
}
/*
@@ -301,7 +310,7 @@ __curstat_search(WT_CURSOR *cursor)
/* Initialize on demand. */
if (cst->notinitialized) {
WT_ERR(__wt_curstat_init(
- session, cursor->internal_uri, cst->cfg, cst));
+ session, cursor->internal_uri, NULL, cst->cfg, cst));
cst->notinitialized = false;
}
@@ -332,6 +341,7 @@ __curstat_close(WT_CURSOR *cursor)
__curstat_free_config(session, cst);
__wt_buf_free(session, &cst->pv);
+ __wt_free(session, cst->desc_buf);
WT_ERR(__wt_cursor_close(cursor));
@@ -426,12 +436,102 @@ __wt_curstat_dsrc_final(WT_CURSOR_STAT *cst)
}
/*
+ * __curstat_join_next_set --
+ * Advance to another index used in a join to give another set of
+ * statistics.
+ */
+static int
+__curstat_join_next_set(WT_SESSION_IMPL *session, WT_CURSOR_STAT *cst,
+ bool forw, bool init)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_JOIN_STATS_GROUP *join_group;
+ ssize_t pos;
+
+ WT_ASSERT(session, WT_STREQ(cst->iface.uri, "statistics:join"));
+ join_group = &cst->u.join_stats_group;
+ cjoin = join_group->join_cursor;
+ if (init)
+ pos = forw ? 0 : cjoin->entries_next - 1;
+ else
+ pos = join_group->join_cursor_entry + (forw ? 1 : -1);
+ if (pos < 0 || (size_t)pos >= cjoin->entries_next)
+ return (WT_NOTFOUND);
+
+ join_group->join_cursor_entry = pos;
+ if (cjoin->entries[pos].index == NULL) {
+ WT_ASSERT(session, WT_PREFIX_MATCH(cjoin->iface.uri, "join:"));
+ join_group->desc_prefix = cjoin->iface.uri + 5;
+ } else
+ join_group->desc_prefix = cjoin->entries[pos].index->name;
+ join_group->join_stats = cjoin->entries[pos].stats;
+ if (!init)
+ cst->key = forw ? WT_STAT_KEY_MIN(cst) : WT_STAT_KEY_MAX(cst);
+ return (0);
+}
+
+/*
+ * __curstat_join_desc --
+ * Assemble the description field based on current index and statistic.
+ */
+static int
+__curstat_join_desc(WT_CURSOR_STAT *cst, int slot, const char **resultp)
+{
+ size_t len;
+ const char *static_desc;
+ WT_JOIN_STATS_GROUP *sgrp;
+ WT_SESSION_IMPL *session;
+
+ sgrp = &cst->u.join_stats_group;
+ session = (WT_SESSION_IMPL *)sgrp->join_cursor->iface.session;
+ WT_RET(__wt_stat_join_desc(cst, slot, &static_desc));
+ len = strlen("join: ") + strlen(sgrp->desc_prefix) +
+ strlen(static_desc) + 1;
+ WT_RET(__wt_realloc(session, NULL, len, &cst->desc_buf));
+ snprintf(cst->desc_buf, len, "join: %s%s", sgrp->desc_prefix,
+ static_desc);
+ *resultp = cst->desc_buf;
+ return (0);
+}
+
+/*
+ * __curstat_join_init --
+ * Initialize the statistics for a joined cursor.
+ */
+static int
+__curstat_join_init(WT_SESSION_IMPL *session,
+ WT_CURSOR *curjoin, const char *cfg[], WT_CURSOR_STAT *cst)
+{
+ WT_CURSOR_JOIN *cjoin;
+ WT_DECL_RET;
+
+ WT_UNUSED(cfg);
+
+ if (curjoin == NULL && cst->u.join_stats_group.join_cursor != NULL)
+ curjoin = &cst->u.join_stats_group.join_cursor->iface;
+ if (curjoin == NULL || !WT_PREFIX_MATCH(curjoin->uri, "join:"))
+ WT_ERR_MSG(session, EINVAL,
+ "join cursor must be used with statistics:join");
+ cjoin = (WT_CURSOR_JOIN *)curjoin;
+ memset(&cst->u.join_stats_group, 0, sizeof(WT_JOIN_STATS_GROUP));
+ cst->u.join_stats_group.join_cursor = cjoin;
+
+ cst->stats = (int64_t *)&cst->u.join_stats_group.join_stats;
+ cst->stats_base = WT_JOIN_STATS_BASE;
+ cst->stats_count = sizeof(WT_JOIN_STATS) / sizeof(int64_t);
+ cst->stats_desc = __curstat_join_desc;
+ cst->next_set = __curstat_join_next_set;
+
+err: return (ret);
+}
+
+/*
* __wt_curstat_init --
* Initialize a statistics cursor.
*/
int
__wt_curstat_init(WT_SESSION_IMPL *session,
- const char *uri, const char *cfg[], WT_CURSOR_STAT *cst)
+ const char *uri, WT_CURSOR *curjoin, const char *cfg[], WT_CURSOR_STAT *cst)
{
const char *dsrc_uri;
@@ -442,6 +542,10 @@ __wt_curstat_init(WT_SESSION_IMPL *session,
dsrc_uri = uri + strlen("statistics:");
+ if (WT_STREQ(dsrc_uri, "join"))
+ return (
+ __curstat_join_init(session, curjoin, cfg, cst));
+
if (WT_PREFIX_MATCH(dsrc_uri, "colgroup:"))
return (
__wt_curstat_colgroup_init(session, dsrc_uri, cfg, cst));
@@ -467,7 +571,7 @@ __wt_curstat_init(WT_SESSION_IMPL *session,
*/
int
__wt_curstat_open(WT_SESSION_IMPL *session,
- const char *uri, const char *cfg[], WT_CURSOR **cursorp)
+ const char *uri, WT_CURSOR *other, const char *cfg[], WT_CURSOR **cursorp)
{
WT_CONNECTION_IMPL *conn;
WT_CURSOR_STATIC_INIT(iface,
@@ -581,7 +685,7 @@ __wt_curstat_open(WT_SESSION_IMPL *session,
* objects like tables, we need to a valid set of statistics when before
* the open returns.
*/
- WT_ERR(__wt_curstat_init(session, uri, cst->cfg, cst));
+ WT_ERR(__wt_curstat_init(session, uri, other, cst->cfg, cst));
cst->notinitialized = false;
/* The cursor isn't yet positioned. */
diff --git a/src/cursor/cur_table.c b/src/cursor/cur_table.c
index c5a00649334..dca72a16ee5 100644
--- a/src/cursor/cur_table.c
+++ b/src/cursor/cur_table.c
@@ -186,34 +186,16 @@ __wt_curtable_get_key(WT_CURSOR *cursor, ...)
int
__wt_curtable_get_value(WT_CURSOR *cursor, ...)
{
- WT_CURSOR *primary;
- WT_CURSOR_TABLE *ctable;
WT_DECL_RET;
- WT_ITEM *item;
WT_SESSION_IMPL *session;
va_list ap;
- ctable = (WT_CURSOR_TABLE *)cursor;
- primary = *ctable->cg_cursors;
- CURSOR_API_CALL(cursor, session, get_value, NULL);
- WT_CURSOR_NEEDVALUE(primary);
-
va_start(ap, cursor);
- if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) {
- ret = __wt_schema_project_merge(session,
- ctable->cg_cursors, ctable->plan,
- cursor->value_format, &cursor->value);
- if (ret == 0) {
- item = va_arg(ap, WT_ITEM *);
- item->data = cursor->value.data;
- item->size = cursor->value.size;
- }
- } else
- ret = __wt_schema_project_out(session,
- ctable->cg_cursors, ctable->plan, ap);
- va_end(ap);
+ JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL);
+ WT_ERR(__wt_curtable_get_valuev(cursor, ap));
-err: API_END_RET(session, ret);
+err: va_end(ap);
+ API_END_RET(session, ret);
}
/*
@@ -264,7 +246,7 @@ __wt_curtable_set_value(WT_CURSOR *cursor, ...)
u_int i;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, set_value, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, set_value, NULL);
va_start(ap, cursor);
if (F_ISSET(cursor, WT_CURSOR_RAW_OK | WT_CURSTD_DUMP_JSON)) {
@@ -332,7 +314,7 @@ __curtable_compare(WT_CURSOR *a, WT_CURSOR *b, int *cmpp)
WT_DECL_RET;
WT_SESSION_IMPL *session;
- CURSOR_API_CALL(a, session, compare, NULL);
+ JOINABLE_CURSOR_API_CALL(a, session, compare, NULL);
/*
* Confirm both cursors refer to the same source and have keys, then
@@ -362,7 +344,7 @@ __curtable_next(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, next, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL);
APPLY_CG(ctable, next);
err: API_END_RET(session, ret);
@@ -383,7 +365,7 @@ __curtable_next_random(WT_CURSOR *cursor)
u_int i;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, next, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL);
cp = ctable->cg_cursors;
/* Split out the first next, it retrieves the random record. */
@@ -414,7 +396,7 @@ __curtable_prev(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, prev, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, prev, NULL);
APPLY_CG(ctable, prev);
err: API_END_RET(session, ret);
@@ -432,7 +414,7 @@ __curtable_reset(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, reset, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL);
APPLY_CG(ctable, reset);
err: API_END_RET(session, ret);
@@ -450,7 +432,7 @@ __curtable_search(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, search, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, search, NULL);
APPLY_CG(ctable, search);
err: API_END_RET(session, ret);
@@ -470,7 +452,7 @@ __curtable_search_near(WT_CURSOR *cursor, int *exact)
u_int i;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, search_near, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, search_near, NULL);
cp = ctable->cg_cursors;
primary = *cp;
WT_ERR(primary->search_near(primary, exact));
@@ -501,7 +483,7 @@ __curtable_insert(WT_CURSOR *cursor)
u_int i;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_UPDATE_API_CALL(cursor, session, insert, NULL);
+ JOINABLE_CURSOR_UPDATE_API_CALL(cursor, session, insert, NULL);
WT_ERR(__curtable_open_indices(ctable));
/*
@@ -568,7 +550,7 @@ __curtable_update(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_UPDATE_API_CALL(cursor, session, update, NULL);
+ JOINABLE_CURSOR_UPDATE_API_CALL(cursor, session, update, NULL);
WT_ERR(__curtable_open_indices(ctable));
/*
@@ -619,7 +601,7 @@ __curtable_remove(WT_CURSOR *cursor)
WT_SESSION_IMPL *session;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_REMOVE_API_CALL(cursor, session, NULL);
+ JOINABLE_CURSOR_REMOVE_API_CALL(cursor, session, NULL);
WT_ERR(__curtable_open_indices(ctable));
/* Find the old record so it can be removed from indices */
@@ -659,6 +641,7 @@ __wt_table_range_truncate(WT_CURSOR_TABLE *start, WT_CURSOR_TABLE *stop)
/* Open any indices. */
WT_RET(__curtable_open_indices(ctable));
WT_RET(__wt_scr_alloc(session, 128, &key));
+ WT_STAT_FAST_DATA_INCR(session, cursor_truncate);
/*
* Step through the cursor range, removing the index entries.
@@ -730,7 +713,7 @@ __curtable_close(WT_CURSOR *cursor)
u_int i;
ctable = (WT_CURSOR_TABLE *)cursor;
- CURSOR_API_CALL(cursor, session, close, NULL);
+ JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL);
if (ctable->cg_cursors != NULL)
for (i = 0, cp = ctable->cg_cursors;
@@ -853,7 +836,7 @@ __curtable_open_indices(WT_CURSOR_TABLE *ctable)
*/
int
__wt_curtable_open(WT_SESSION_IMPL *session,
- const char *uri, const char *cfg[], WT_CURSOR **cursorp)
+ const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp)
{
WT_CURSOR_STATIC_INIT(iface,
__wt_curtable_get_key, /* get-key */
@@ -944,7 +927,7 @@ __wt_curtable_open(WT_SESSION_IMPL *session,
}
WT_ERR(__wt_cursor_init(
- cursor, cursor->internal_uri, NULL, cfg, cursorp));
+ cursor, cursor->internal_uri, owner, cfg, cursorp));
if (F_ISSET(cursor, WT_CURSTD_DUMP_JSON))
WT_ERR(__wt_json_column_init(cursor, table->key_format,
diff --git a/src/docs/cursor-join.dox b/src/docs/cursor-join.dox
new file mode 100644
index 00000000000..51da6b174bf
--- /dev/null
+++ b/src/docs/cursor-join.dox
@@ -0,0 +1,19 @@
+/*! @m_page{{c,java},cursor_join,Join cursors}
+
+Join cursors provide a way to iterate over a subset of a table, where the subset is specified by relationships with reference cursors.
+
+A join cursor is created with WT_SESSION::open_cursor using a \c
+"join:table:<name>" URI prefix. Then reference cursors are positioned to
+keys on indices and joined to the join cursor using WT_SESSION::join calls.
+The result is a join cursor that can be iterated to satisfy the join
+equation.
+
+Here is an example using join cursors:
+
+@snippet ex_schema.c Join cursors
+
+Joins support various comparison operators: \c "eq", \c "gt", \c "ge", \c "lt", \c "le". Ranges with lower and upper bounds can also be specified, by joining two cursors on the same index, for example, one with \c "compare=ge" and another \c "compare=lt". In addition to joining indices, the main table can be joined so that a range of primary keys can be specified.
+
+All the joins should be done on the join cursor before WT_CURSOR::next is called. Calling WT_CURSOR::next on a join cursor for the first time populates any bloom filters and performs other initialization. The join cursor's key is the primary key (the key for the main table), and its value is the entire set of values of the main table. A join cursor can be created with a projection by appending \c "(col1,col2,...)" to the URI if a different set of values is needed.
+
+*/
diff --git a/src/docs/cursors.dox b/src/docs/cursors.dox
index c9455e2976c..b6271951f91 100644
--- a/src/docs/cursors.dox
+++ b/src/docs/cursors.dox
@@ -19,6 +19,7 @@ See the following for more details:
- @subpage data_sources
- @ref metadata
- @ref cursor_log
+- @ref cursor_join
@section cursor_projections Projections
diff --git a/src/docs/data-sources.dox b/src/docs/data-sources.dox
index 0c446b2bf78..d09d1cbc1b8 100644
--- a/src/docs/data-sources.dox
+++ b/src/docs/data-sources.dox
@@ -13,6 +13,10 @@ The following are the builtin basic cursor types:
index cursor,
key=index key\, value=table value(s) with optional projection
of columns}
+@row{<tt>join:table:\<table name\>[\<projection\>]</tt>,
+ join cursor,
+ key=table key\, value=table value(s) with optional projection
+ of columns}
</table>
Some administrative tasks can be accomplished using the following special
cursor types that give access to data managed by WiredTiger:
diff --git a/src/docs/programming.dox b/src/docs/programming.dox
index d99c34d1da2..f005f6d3e2d 100644
--- a/src/docs/programming.dox
+++ b/src/docs/programming.dox
@@ -43,6 +43,7 @@ each of which is ordered by one or more columns.
- @ref transaction_named_snapshots
- @subpage shared_cache
- @subpage statistics
+- @subpage cursor_join
- @subpage cursor_log
- @subpage_single upgrade
diff --git a/src/docs/spell.ok b/src/docs/spell.ok
index b887f0ceee2..86af82d8fd2 100644
--- a/src/docs/spell.ok
+++ b/src/docs/spell.ok
@@ -197,6 +197,7 @@ endinternal
english
env
eof
+eq
erlang
errno
exe
@@ -220,6 +221,7 @@ freelist
fsync
gcc
gdbm
+ge
getKey
getValue
getopt
@@ -260,6 +262,7 @@ keyvalue
kvs
lang
lastname
+le
len
leveldb
li
diff --git a/src/include/api.h b/src/include/api.h
index 74c58845c43..4821b450f9e 100644
--- a/src/include/api.h
+++ b/src/include/api.h
@@ -116,11 +116,23 @@
API_CALL_NOCONF(s, WT_CURSOR, n, cur, \
((bt) == NULL) ? NULL : ((WT_BTREE *)(bt))->dhandle)
+#define JOINABLE_CURSOR_CALL_CHECK(cur) \
+ if (F_ISSET(cur, WT_CURSTD_JOINED)) \
+ WT_ERR(__wt_curindex_joined(cur))
+
+#define JOINABLE_CURSOR_API_CALL(cur, s, n, bt) \
+ CURSOR_API_CALL(cur, s, n, bt); \
+ JOINABLE_CURSOR_CALL_CHECK(cur)
+
#define CURSOR_REMOVE_API_CALL(cur, s, bt) \
(s) = (WT_SESSION_IMPL *)(cur)->session; \
TXN_API_CALL_NOCONF(s, WT_CURSOR, remove, cur, \
((bt) == NULL) ? NULL : ((WT_BTREE *)(bt))->dhandle);
+#define JOINABLE_CURSOR_REMOVE_API_CALL(cur, s, bt) \
+ CURSOR_REMOVE_API_CALL(cur, s, bt); \
+ JOINABLE_CURSOR_CALL_CHECK(cur)
+
#define CURSOR_UPDATE_API_CALL(cur, s, n, bt) \
(s) = (WT_SESSION_IMPL *)(cur)->session; \
TXN_API_CALL_NOCONF(s, WT_CURSOR, n, cur, \
@@ -128,6 +140,10 @@
if (F_ISSET(S2C(s), WT_CONN_IN_MEMORY) && __wt_cache_full(s)) \
WT_ERR(WT_CACHE_FULL);
+#define JOINABLE_CURSOR_UPDATE_API_CALL(cur, s, n, bt) \
+ CURSOR_UPDATE_API_CALL(cur, s, n, bt); \
+ JOINABLE_CURSOR_CALL_CHECK(cur)
+
#define CURSOR_UPDATE_API_END(s, ret) \
TXN_API_END(s, ret)
diff --git a/src/include/config.h b/src/include/config.h
index 408639ab2a9..e836abaccba 100644
--- a/src/include/config.h
+++ b/src/include/config.h
@@ -68,28 +68,29 @@ struct __wt_config_parser_impl {
#define WT_CONFIG_ENTRY_WT_SESSION_compact 16
#define WT_CONFIG_ENTRY_WT_SESSION_create 17
#define WT_CONFIG_ENTRY_WT_SESSION_drop 18
-#define WT_CONFIG_ENTRY_WT_SESSION_log_flush 19
-#define WT_CONFIG_ENTRY_WT_SESSION_log_printf 20
-#define WT_CONFIG_ENTRY_WT_SESSION_open_cursor 21
-#define WT_CONFIG_ENTRY_WT_SESSION_reconfigure 22
-#define WT_CONFIG_ENTRY_WT_SESSION_rename 23
-#define WT_CONFIG_ENTRY_WT_SESSION_reset 24
-#define WT_CONFIG_ENTRY_WT_SESSION_rollback_transaction 25
-#define WT_CONFIG_ENTRY_WT_SESSION_salvage 26
-#define WT_CONFIG_ENTRY_WT_SESSION_snapshot 27
-#define WT_CONFIG_ENTRY_WT_SESSION_strerror 28
-#define WT_CONFIG_ENTRY_WT_SESSION_transaction_sync 29
-#define WT_CONFIG_ENTRY_WT_SESSION_truncate 30
-#define WT_CONFIG_ENTRY_WT_SESSION_upgrade 31
-#define WT_CONFIG_ENTRY_WT_SESSION_verify 32
-#define WT_CONFIG_ENTRY_colgroup_meta 33
-#define WT_CONFIG_ENTRY_file_meta 34
-#define WT_CONFIG_ENTRY_index_meta 35
-#define WT_CONFIG_ENTRY_table_meta 36
-#define WT_CONFIG_ENTRY_wiredtiger_open 37
-#define WT_CONFIG_ENTRY_wiredtiger_open_all 38
-#define WT_CONFIG_ENTRY_wiredtiger_open_basecfg 39
-#define WT_CONFIG_ENTRY_wiredtiger_open_usercfg 40
+#define WT_CONFIG_ENTRY_WT_SESSION_join 19
+#define WT_CONFIG_ENTRY_WT_SESSION_log_flush 20
+#define WT_CONFIG_ENTRY_WT_SESSION_log_printf 21
+#define WT_CONFIG_ENTRY_WT_SESSION_open_cursor 22
+#define WT_CONFIG_ENTRY_WT_SESSION_reconfigure 23
+#define WT_CONFIG_ENTRY_WT_SESSION_rename 24
+#define WT_CONFIG_ENTRY_WT_SESSION_reset 25
+#define WT_CONFIG_ENTRY_WT_SESSION_rollback_transaction 26
+#define WT_CONFIG_ENTRY_WT_SESSION_salvage 27
+#define WT_CONFIG_ENTRY_WT_SESSION_snapshot 28
+#define WT_CONFIG_ENTRY_WT_SESSION_strerror 29
+#define WT_CONFIG_ENTRY_WT_SESSION_transaction_sync 30
+#define WT_CONFIG_ENTRY_WT_SESSION_truncate 31
+#define WT_CONFIG_ENTRY_WT_SESSION_upgrade 32
+#define WT_CONFIG_ENTRY_WT_SESSION_verify 33
+#define WT_CONFIG_ENTRY_colgroup_meta 34
+#define WT_CONFIG_ENTRY_file_meta 35
+#define WT_CONFIG_ENTRY_index_meta 36
+#define WT_CONFIG_ENTRY_table_meta 37
+#define WT_CONFIG_ENTRY_wiredtiger_open 38
+#define WT_CONFIG_ENTRY_wiredtiger_open_all 39
+#define WT_CONFIG_ENTRY_wiredtiger_open_basecfg 40
+#define WT_CONFIG_ENTRY_wiredtiger_open_usercfg 41
/*
* configuration section: END
* DO NOT EDIT: automatically built by dist/flags.py.
diff --git a/src/include/cursor.h b/src/include/cursor.h
index 1cbe76216b1..23d3f3745db 100644
--- a/src/include/cursor.h
+++ b/src/include/cursor.h
@@ -264,6 +264,66 @@ struct __wt_cursor_index {
uint8_t *cg_needvalue;
};
+struct __wt_cursor_join_iter {
+ WT_SESSION_IMPL *session;
+ WT_CURSOR_JOIN *cjoin;
+ WT_CURSOR_JOIN_ENTRY *entry;
+ WT_CURSOR *cursor;
+ WT_ITEM *curkey;
+ bool advance;
+};
+
+struct __wt_cursor_join_endpoint {
+ WT_ITEM key;
+ uint8_t recno_buf[10]; /* holds packed recno */
+ WT_CURSOR *cursor;
+
+#define WT_CURJOIN_END_LT 0x01 /* include values < cursor */
+#define WT_CURJOIN_END_EQ 0x02 /* include values == cursor */
+#define WT_CURJOIN_END_GT 0x04 /* include values > cursor */
+#define WT_CURJOIN_END_GE (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ)
+#define WT_CURJOIN_END_LE (WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ)
+#define WT_CURJOIN_END_OWN_KEY 0x08 /* must free key's data */
+ uint8_t flags; /* range for this endpoint */
+};
+
+struct __wt_cursor_join_entry {
+ WT_INDEX *index;
+ WT_CURSOR *main; /* raw main table cursor */
+ WT_BLOOM *bloom; /* Bloom filter handle */
+ uint64_t bloom_bit_count; /* bits per item in bloom */
+ uint64_t bloom_hash_count; /* hash functions in bloom */
+ uint64_t count; /* approx number of matches */
+
+#define WT_CURJOIN_ENTRY_BLOOM 0x01 /* use a bloom filter */
+#define WT_CURJOIN_ENTRY_DISJUNCTION 0x02 /* endpoints are or-ed */
+#define WT_CURJOIN_ENTRY_OWN_BLOOM 0x04 /* this entry owns the bloom */
+ uint8_t flags;
+
+ WT_CURSOR_JOIN_ENDPOINT *ends; /* reference endpoints */
+ size_t ends_allocated;
+ size_t ends_next;
+
+ WT_JOIN_STATS stats; /* Join statistics */
+};
+
+struct __wt_cursor_join {
+ WT_CURSOR iface;
+
+ WT_TABLE *table;
+ const char *projection;
+ WT_CURSOR_JOIN_ITER *iter;
+ WT_CURSOR_JOIN_ENTRY *entries;
+ size_t entries_allocated;
+ u_int entries_next;
+ uint8_t recno_buf[10]; /* holds packed recno */
+
+#define WT_CURJOIN_ERROR 0x01 /* Error in initialization */
+#define WT_CURJOIN_INITIALIZED 0x02 /* Successful initialization */
+#define WT_CURJOIN_SKIP_FIRST_LEFT 0x04 /* First check not needed */
+ uint8_t flags;
+};
+
struct __wt_cursor_json {
char *key_buf; /* JSON formatted string */
char *value_buf; /* JSON formatted string */
@@ -298,6 +358,13 @@ struct __wt_cursor_metadata {
uint32_t flags;
};
+struct __wt_join_stats_group {
+ const char *desc_prefix; /* Prefix appears before description */
+ WT_CURSOR_JOIN *join_cursor;
+ ssize_t join_cursor_entry; /* Position in entries */
+ WT_JOIN_STATS join_stats;
+};
+
struct __wt_cursor_stat {
WT_CURSOR iface;
@@ -307,14 +374,19 @@ struct __wt_cursor_stat {
int64_t *stats; /* Statistics */
int stats_base; /* Base statistics value */
int stats_count; /* Count of statistics values */
- const char *(*stats_desc)(int); /* Statistics descriptions */
+ int (*stats_desc)(WT_CURSOR_STAT *, int, const char **);
+ /* Statistics descriptions */
+ int (*next_set)(WT_SESSION_IMPL *, WT_CURSOR_STAT *, bool,
+ bool); /* Advance to next set */
union { /* Copies of the statistics */
WT_DSRC_STATS dsrc_stats;
WT_CONNECTION_STATS conn_stats;
+ WT_JOIN_STATS_GROUP join_stats_group;
} u;
const char **cfg; /* Original cursor configuration */
+ char *desc_buf; /* Saved description string */
int key; /* Current stats key */
uint64_t v; /* Current stats value */
diff --git a/src/include/cursor.i b/src/include/cursor.i
index c6ce04cab6f..9dd280534b4 100644
--- a/src/include/cursor.i
+++ b/src/include/cursor.i
@@ -139,6 +139,70 @@ __curfile_leave(WT_CURSOR_BTREE *cbt)
}
/*
+ * __wt_curindex_get_valuev --
+ * Internal implementation of WT_CURSOR->get_value for index cursors
+ */
+static inline int
+__wt_curindex_get_valuev(WT_CURSOR *cursor, va_list ap)
+{
+ WT_CURSOR_INDEX *cindex;
+ WT_DECL_RET;
+ WT_ITEM *item;
+ WT_SESSION_IMPL *session;
+
+ cindex = (WT_CURSOR_INDEX *)cursor;
+ session = (WT_SESSION_IMPL *)cursor->session;
+ WT_CURSOR_NEEDVALUE(cursor);
+
+ if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) {
+ ret = __wt_schema_project_merge(session,
+ cindex->cg_cursors, cindex->value_plan,
+ cursor->value_format, &cursor->value);
+ if (ret == 0) {
+ item = va_arg(ap, WT_ITEM *);
+ item->data = cursor->value.data;
+ item->size = cursor->value.size;
+ }
+ } else
+ ret = __wt_schema_project_out(session,
+ cindex->cg_cursors, cindex->value_plan, ap);
+err: return (ret);
+}
+
+/*
+ * __wt_curtable_get_valuev --
+ * Internal implementation of WT_CURSOR->get_value for table cursors.
+ */
+static inline int
+__wt_curtable_get_valuev(WT_CURSOR *cursor, va_list ap)
+{
+ WT_CURSOR *primary;
+ WT_CURSOR_TABLE *ctable;
+ WT_DECL_RET;
+ WT_ITEM *item;
+ WT_SESSION_IMPL *session;
+
+ ctable = (WT_CURSOR_TABLE *)cursor;
+ session = (WT_SESSION_IMPL *)cursor->session;
+ primary = *ctable->cg_cursors;
+ WT_CURSOR_NEEDVALUE(primary);
+
+ if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) {
+ ret = __wt_schema_project_merge(session,
+ ctable->cg_cursors, ctable->plan,
+ cursor->value_format, &cursor->value);
+ if (ret == 0) {
+ item = va_arg(ap, WT_ITEM *);
+ item->data = cursor->value.data;
+ item->size = cursor->value.size;
+ }
+ } else
+ ret = __wt_schema_project_out(session,
+ ctable->cg_cursors, ctable->plan, ap);
+err: return (ret);
+}
+
+/*
* __wt_cursor_dhandle_incr_use --
* Increment the in-use counter in cursor's data source.
*/
diff --git a/src/include/extern.h b/src/include/extern.h
index a6ccc526f8c..743a3c3ac31 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -83,6 +83,8 @@ extern int __wt_bloom_finalize(WT_BLOOM *bloom);
extern int __wt_bloom_hash(WT_BLOOM *bloom, WT_ITEM *key, WT_BLOOM_HASH *bhash);
extern int __wt_bloom_hash_get(WT_BLOOM *bloom, WT_BLOOM_HASH *bhash);
extern int __wt_bloom_get(WT_BLOOM *bloom, WT_ITEM *key);
+extern int __wt_bloom_inmem_get(WT_BLOOM *bloom, WT_ITEM *key);
+extern int __wt_bloom_intersection(WT_BLOOM *bloom, WT_BLOOM *other);
extern int __wt_bloom_close(WT_BLOOM *bloom);
extern int __wt_bloom_drop(WT_BLOOM *bloom, const char *config);
extern int __wt_compact(WT_SESSION_IMPL *session, const char *cfg[]);
@@ -274,7 +276,10 @@ extern int __wt_curdump_create(WT_CURSOR *child, WT_CURSOR *owner, WT_CURSOR **c
extern int __wt_curfile_update_check(WT_CURSOR *cursor);
extern int __wt_curfile_create(WT_SESSION_IMPL *session, WT_CURSOR *owner, const char *cfg[], bool bulk, bool bitmap, WT_CURSOR **cursorp);
extern int __wt_curfile_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
+extern int __wt_curindex_joined(WT_CURSOR *cursor);
extern int __wt_curindex_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
+extern int __wt_curjoin_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
+extern int __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_INDEX *idx, WT_CURSOR *ref_cursor, uint32_t flags, uint32_t range, uint64_t count, uint64_t bloom_bit_count, uint64_t bloom_hash_count);
extern int __wt_json_alloc_unpack(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, WT_CURSOR_JSON *json, bool iskey, va_list ap);
extern void __wt_json_close(WT_SESSION_IMPL *session, WT_CURSOR *cursor);
extern size_t __wt_json_unpack_char(char ch, u_char *buf, size_t bufsz, bool force_unicode);
@@ -287,8 +292,8 @@ extern int __wt_json_strncpy(char **pdst, size_t dstlen, const char *src, size_t
extern int __wt_curlog_open(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR **cursorp);
extern int __wt_curmetadata_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
extern void __wt_curstat_dsrc_final(WT_CURSOR_STAT *cst);
-extern int __wt_curstat_init(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR_STAT *cst);
-extern int __wt_curstat_open(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR **cursorp);
+extern int __wt_curstat_init(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *curjoin, const char *cfg[], WT_CURSOR_STAT *cst);
+extern int __wt_curstat_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *other, const char *cfg[], WT_CURSOR **cursorp);
extern int __wt_cursor_notsup(WT_CURSOR *cursor);
extern int __wt_cursor_noop(WT_CURSOR *cursor);
extern void __wt_cursor_set_notsup(WT_CURSOR *cursor);
@@ -316,7 +321,7 @@ extern int __wt_curtable_get_value(WT_CURSOR *cursor, ...);
extern void __wt_curtable_set_key(WT_CURSOR *cursor, ...);
extern void __wt_curtable_set_value(WT_CURSOR *cursor, ...);
extern int __wt_table_range_truncate(WT_CURSOR_TABLE *start, WT_CURSOR_TABLE *stop);
-extern int __wt_curtable_open(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR **cursorp);
+extern int __wt_curtable_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
extern int __wt_evict_file(WT_SESSION_IMPL *session, WT_CACHE_OP syncop);
extern void __wt_evict_list_clear_page(WT_SESSION_IMPL *session, WT_REF *ref);
extern int __wt_evict_server_wake(WT_SESSION_IMPL *session);
@@ -533,6 +538,8 @@ extern int __wt_struct_confchk(WT_SESSION_IMPL *session, WT_CONFIG_ITEM *v);
extern int __wt_struct_size(WT_SESSION_IMPL *session, size_t *sizep, const char *fmt, ...);
extern int __wt_struct_pack(WT_SESSION_IMPL *session, void *buffer, size_t size, const char *fmt, ...);
extern int __wt_struct_unpack(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, ...);
+extern int __wt_struct_unpack_size(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, size_t *resultp);
+extern int __wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt, const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf, void **reallocp);
extern int __wt_ovfl_discard_add(WT_SESSION_IMPL *session, WT_PAGE *page, WT_CELL *cell);
extern void __wt_ovfl_discard_free(WT_SESSION_IMPL *session, WT_PAGE *page);
extern int __wt_ovfl_reuse_search(WT_SESSION_IMPL *session, WT_PAGE *page, uint8_t **addrp, size_t *addr_sizep, const void *value, size_t value_size);
@@ -674,19 +681,24 @@ __wt_scr_alloc_func(WT_SESSION_IMPL *session, size_t size, WT_ITEM **scratchp
extern void __wt_scr_discard(WT_SESSION_IMPL *session);
extern void *__wt_ext_scr_alloc( WT_EXTENSION_API *wt_api, WT_SESSION *wt_session, size_t size);
extern void __wt_ext_scr_free(WT_EXTENSION_API *wt_api, WT_SESSION *wt_session, void *p);
-extern const char *__wt_stat_dsrc_desc(int slot);
+extern int __wt_stat_dsrc_desc(WT_CURSOR_STAT *cst, int slot, const char **p);
extern void __wt_stat_dsrc_init_single(WT_DSRC_STATS *stats);
extern void __wt_stat_dsrc_init(WT_DATA_HANDLE *handle);
extern void __wt_stat_dsrc_clear_single(WT_DSRC_STATS *stats);
extern void __wt_stat_dsrc_clear_all(WT_DSRC_STATS **stats);
extern void __wt_stat_dsrc_aggregate_single( WT_DSRC_STATS *from, WT_DSRC_STATS *to);
extern void __wt_stat_dsrc_aggregate( WT_DSRC_STATS **from, WT_DSRC_STATS *to);
-extern const char *__wt_stat_connection_desc(int slot);
+extern int __wt_stat_connection_desc(WT_CURSOR_STAT *cst, int slot, const char **p);
extern void __wt_stat_connection_init_single(WT_CONNECTION_STATS *stats);
extern void __wt_stat_connection_init(WT_CONNECTION_IMPL *handle);
extern void __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats);
extern void __wt_stat_connection_clear_all(WT_CONNECTION_STATS **stats);
extern void __wt_stat_connection_aggregate( WT_CONNECTION_STATS **from, WT_CONNECTION_STATS *to);
+extern int __wt_stat_join_desc(WT_CURSOR_STAT *cst, int slot, const char **p);
+extern void __wt_stat_join_init_single(WT_JOIN_STATS *stats);
+extern void __wt_stat_join_clear_single(WT_JOIN_STATS *stats);
+extern void __wt_stat_join_clear_all(WT_JOIN_STATS **stats);
+extern void __wt_stat_join_aggregate( WT_JOIN_STATS **from, WT_JOIN_STATS *to);
extern void __wt_txn_release_snapshot(WT_SESSION_IMPL *session);
extern void __wt_txn_get_snapshot(WT_SESSION_IMPL *session);
extern void __wt_txn_update_oldest(WT_SESSION_IMPL *session, bool force);
diff --git a/src/include/stat.h b/src/include/stat.h
index 81991caddc6..dfe7ee5c6cd 100644
--- a/src/include/stat.h
+++ b/src/include/stat.h
@@ -298,6 +298,7 @@ struct __wt_connection_stats {
int64_t cursor_restart;
int64_t cursor_search;
int64_t cursor_search_near;
+ int64_t cursor_truncate;
int64_t cursor_update;
int64_t dh_conn_handle_count;
int64_t dh_session_handles;
@@ -359,6 +360,8 @@ struct __wt_connection_stats {
int64_t page_read_blocked;
int64_t page_sleep;
int64_t read_io;
+ int64_t rec_page_delete;
+ int64_t rec_page_delete_fast;
int64_t rec_pages;
int64_t rec_pages_eviction;
int64_t rec_split_stashed_bytes;
@@ -466,6 +469,7 @@ struct __wt_dsrc_stats {
int64_t cursor_restart;
int64_t cursor_search;
int64_t cursor_search_near;
+ int64_t cursor_truncate;
int64_t cursor_update;
int64_t cursor_update_bytes;
int64_t lsm_checkpoint_throttle;
@@ -481,6 +485,7 @@ struct __wt_dsrc_stats {
int64_t rec_overflow_key_leaf;
int64_t rec_overflow_value;
int64_t rec_page_delete;
+ int64_t rec_page_delete_fast;
int64_t rec_page_match;
int64_t rec_pages;
int64_t rec_pages_eviction;
@@ -491,4 +496,14 @@ struct __wt_dsrc_stats {
int64_t txn_update_conflict;
};
+/*
+ * Statistics entries for join cursors.
+ */
+#define WT_JOIN_STATS_BASE 3000
+struct __wt_join_stats {
+ int64_t accesses;
+ int64_t actual_count;
+ int64_t bloom_false_positive;
+};
+
/* Statistics section: END */
diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in
index c46883af9ce..08f73386090 100644
--- a/src/include/wiredtiger.in
+++ b/src/include/wiredtiger.in
@@ -574,11 +574,12 @@ struct __wt_cursor {
#define WT_CURSTD_KEY_EXT 0x0020 /* Key points out of the tree. */
#define WT_CURSTD_KEY_INT 0x0040 /* Key points into the tree. */
#define WT_CURSTD_KEY_SET (WT_CURSTD_KEY_EXT | WT_CURSTD_KEY_INT)
-#define WT_CURSTD_OPEN 0x0080
-#define WT_CURSTD_OVERWRITE 0x0100
-#define WT_CURSTD_RAW 0x0200
-#define WT_CURSTD_VALUE_EXT 0x0400 /* Value points out of the tree. */
-#define WT_CURSTD_VALUE_INT 0x0800 /* Value points into the tree. */
+#define WT_CURSTD_JOINED 0x0080
+#define WT_CURSTD_OPEN 0x0100
+#define WT_CURSTD_OVERWRITE 0x0200
+#define WT_CURSTD_RAW 0x0400
+#define WT_CURSTD_VALUE_EXT 0x0800 /* Value points out of the tree. */
+#define WT_CURSTD_VALUE_INT 0x1000 /* Value points into the tree. */
#define WT_CURSTD_VALUE_SET (WT_CURSTD_VALUE_EXT | WT_CURSTD_VALUE_INT)
uint32_t flags;
#endif
@@ -1236,6 +1237,61 @@ struct __wt_session {
const char *name, const char *config);
/*!
+ * Join a join cursor with a reference cursor.
+ *
+ * @snippet ex_schema.c Join cursors
+ *
+ * @param session the session handle
+ * @param join_cursor a cursor that was opened using a
+ * \c "join:" URI. It may not have been used for any operations
+ * other than other join calls.
+ * @param ref_cursor either an index cursor having the same base table
+ * as the join_cursor, or a table cursor open on the same base table.
+ * The ref_cursor must be positioned.
+ *
+ * The ref_cursor limits the results seen by iterating the
+ * join_cursor to table items referred to by the key in this
+ * index. The set of keys referred to is modified by the compare
+ * config option.
+ *
+ * Multiple join calls builds up a set of ref_cursors, and the
+ * results seen by iteration are the intersection of the cursor
+ * ranges participating in the join.
+ *
+ * After the join call completes, the ref_cursor cursor may not be
+ * used for any purpose other than get_key and get_value. Any other
+ * cursor method (e.g. next, prev,close) will fail. When the
+ * join_cursor is closed, the ref_cursor is made available for
+ * general use again. The application should close ref_cursor when
+ * finished with it, although not before the join_cursor is closed.
+ *
+ * @configstart{WT_SESSION.join, see dist/api_data.py}
+ * @config{bloom_bit_count, the number of bits used per item for the
+ * bloom filter., an integer between 2 and 1000; default \c 16.}
+ * @config{bloom_hash_count, the number of hash values per item for the
+ * bloom filter., an integer between 2 and 100; default \c 8.}
+ * @config{compare, modifies the set of items to be returned so that the
+ * index key satisfies the given comparison relative to the key set in
+ * this cursor., a string\, chosen from the following options: \c "eq"\,
+ * \c "ge"\, \c "gt"\, \c "le"\, \c "lt"; default \c "eq".}
+ * @config{count, set an approximate count of the elements that would be
+ * included in the join. This is used in sizing the bloom filter\, and
+ * also influences evaluation order for cursors in the join. When the
+ * count is equal for multiple bloom filters in a composition of joins\,
+ * the bloom filter may be shared., an integer; default \c .}
+ * @config{strategy, when set to bloom\, a bloom filter is created and
+ * populated for this index. This has an up front cost but may reduce
+ * the number of accesses to the main table when iterating the joined
+ * cursor. The bloom setting requires that count be set., a string\,
+ * chosen from the following options: \c "bloom"\, \c "default"; default
+ * empty.}
+ * @configend
+ * @errors
+ */
+ int __F(join)(WT_SESSION *session, WT_CURSOR *join_cursor,
+ WT_CURSOR *ref_cursor, const char *config);
+
+ /*!
* Flush the log.
*
* @param session the session handle
@@ -3765,181 +3821,187 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection);
#define WT_STAT_CONN_CURSOR_SEARCH 1069
/*! cursor: cursor search near calls */
#define WT_STAT_CONN_CURSOR_SEARCH_NEAR 1070
+/*! cursor: truncate calls */
+#define WT_STAT_CONN_CURSOR_TRUNCATE 1071
/*! cursor: cursor update calls */
-#define WT_STAT_CONN_CURSOR_UPDATE 1071
+#define WT_STAT_CONN_CURSOR_UPDATE 1072
/*! data-handle: connection data handles currently active */
-#define WT_STAT_CONN_DH_CONN_HANDLE_COUNT 1072
+#define WT_STAT_CONN_DH_CONN_HANDLE_COUNT 1073
/*! data-handle: session dhandles swept */
-#define WT_STAT_CONN_DH_SESSION_HANDLES 1073
+#define WT_STAT_CONN_DH_SESSION_HANDLES 1074
/*! data-handle: session sweep attempts */
-#define WT_STAT_CONN_DH_SESSION_SWEEPS 1074
+#define WT_STAT_CONN_DH_SESSION_SWEEPS 1075
/*! data-handle: connection sweep dhandles closed */
-#define WT_STAT_CONN_DH_SWEEP_CLOSE 1075
+#define WT_STAT_CONN_DH_SWEEP_CLOSE 1076
/*! data-handle: connection sweep candidate became referenced */
-#define WT_STAT_CONN_DH_SWEEP_REF 1076
+#define WT_STAT_CONN_DH_SWEEP_REF 1077
/*! data-handle: connection sweep dhandles removed from hash list */
-#define WT_STAT_CONN_DH_SWEEP_REMOVE 1077
+#define WT_STAT_CONN_DH_SWEEP_REMOVE 1078
/*! data-handle: connection sweep time-of-death sets */
-#define WT_STAT_CONN_DH_SWEEP_TOD 1078
+#define WT_STAT_CONN_DH_SWEEP_TOD 1079
/*! data-handle: connection sweeps */
-#define WT_STAT_CONN_DH_SWEEPS 1079
+#define WT_STAT_CONN_DH_SWEEPS 1080
/*! connection: files currently open */
-#define WT_STAT_CONN_FILE_OPEN 1080
+#define WT_STAT_CONN_FILE_OPEN 1081
/*! log: total log buffer size */
-#define WT_STAT_CONN_LOG_BUFFER_SIZE 1081
+#define WT_STAT_CONN_LOG_BUFFER_SIZE 1082
/*! log: log bytes of payload data */
-#define WT_STAT_CONN_LOG_BYTES_PAYLOAD 1082
+#define WT_STAT_CONN_LOG_BYTES_PAYLOAD 1083
/*! log: log bytes written */
-#define WT_STAT_CONN_LOG_BYTES_WRITTEN 1083
+#define WT_STAT_CONN_LOG_BYTES_WRITTEN 1084
/*! log: yields waiting for previous log file close */
-#define WT_STAT_CONN_LOG_CLOSE_YIELDS 1084
+#define WT_STAT_CONN_LOG_CLOSE_YIELDS 1085
/*! log: total size of compressed records */
-#define WT_STAT_CONN_LOG_COMPRESS_LEN 1085
+#define WT_STAT_CONN_LOG_COMPRESS_LEN 1086
/*! log: total in-memory size of compressed records */
-#define WT_STAT_CONN_LOG_COMPRESS_MEM 1086
+#define WT_STAT_CONN_LOG_COMPRESS_MEM 1087
/*! log: log records too small to compress */
-#define WT_STAT_CONN_LOG_COMPRESS_SMALL 1087
+#define WT_STAT_CONN_LOG_COMPRESS_SMALL 1088
/*! log: log records not compressed */
-#define WT_STAT_CONN_LOG_COMPRESS_WRITE_FAILS 1088
+#define WT_STAT_CONN_LOG_COMPRESS_WRITE_FAILS 1089
/*! log: log records compressed */
-#define WT_STAT_CONN_LOG_COMPRESS_WRITES 1089
+#define WT_STAT_CONN_LOG_COMPRESS_WRITES 1090
/*! log: log flush operations */
-#define WT_STAT_CONN_LOG_FLUSH 1090
+#define WT_STAT_CONN_LOG_FLUSH 1091
/*! log: maximum log file size */
-#define WT_STAT_CONN_LOG_MAX_FILESIZE 1091
+#define WT_STAT_CONN_LOG_MAX_FILESIZE 1092
/*! log: pre-allocated log files prepared */
-#define WT_STAT_CONN_LOG_PREALLOC_FILES 1092
+#define WT_STAT_CONN_LOG_PREALLOC_FILES 1093
/*! log: number of pre-allocated log files to create */
-#define WT_STAT_CONN_LOG_PREALLOC_MAX 1093
+#define WT_STAT_CONN_LOG_PREALLOC_MAX 1094
/*! log: pre-allocated log files not ready and missed */
-#define WT_STAT_CONN_LOG_PREALLOC_MISSED 1094
+#define WT_STAT_CONN_LOG_PREALLOC_MISSED 1095
/*! log: pre-allocated log files used */
-#define WT_STAT_CONN_LOG_PREALLOC_USED 1095
+#define WT_STAT_CONN_LOG_PREALLOC_USED 1096
/*! log: log release advances write LSN */
-#define WT_STAT_CONN_LOG_RELEASE_WRITE_LSN 1096
+#define WT_STAT_CONN_LOG_RELEASE_WRITE_LSN 1097
/*! log: records processed by log scan */
-#define WT_STAT_CONN_LOG_SCAN_RECORDS 1097
+#define WT_STAT_CONN_LOG_SCAN_RECORDS 1098
/*! log: log scan records requiring two reads */
-#define WT_STAT_CONN_LOG_SCAN_REREADS 1098
+#define WT_STAT_CONN_LOG_SCAN_REREADS 1099
/*! log: log scan operations */
-#define WT_STAT_CONN_LOG_SCANS 1099
+#define WT_STAT_CONN_LOG_SCANS 1100
/*! log: consolidated slot closures */
-#define WT_STAT_CONN_LOG_SLOT_CLOSES 1100
+#define WT_STAT_CONN_LOG_SLOT_CLOSES 1101
/*! log: written slots coalesced */
-#define WT_STAT_CONN_LOG_SLOT_COALESCED 1101
+#define WT_STAT_CONN_LOG_SLOT_COALESCED 1102
/*! log: logging bytes consolidated */
-#define WT_STAT_CONN_LOG_SLOT_CONSOLIDATED 1102
+#define WT_STAT_CONN_LOG_SLOT_CONSOLIDATED 1103
/*! log: consolidated slot joins */
-#define WT_STAT_CONN_LOG_SLOT_JOINS 1103
+#define WT_STAT_CONN_LOG_SLOT_JOINS 1104
/*! log: consolidated slot join races */
-#define WT_STAT_CONN_LOG_SLOT_RACES 1104
+#define WT_STAT_CONN_LOG_SLOT_RACES 1105
/*! log: busy returns attempting to switch slots */
-#define WT_STAT_CONN_LOG_SLOT_SWITCH_BUSY 1105
+#define WT_STAT_CONN_LOG_SLOT_SWITCH_BUSY 1106
/*! log: consolidated slot join transitions */
-#define WT_STAT_CONN_LOG_SLOT_TRANSITIONS 1106
+#define WT_STAT_CONN_LOG_SLOT_TRANSITIONS 1107
/*! log: consolidated slot unbuffered writes */
-#define WT_STAT_CONN_LOG_SLOT_UNBUFFERED 1107
+#define WT_STAT_CONN_LOG_SLOT_UNBUFFERED 1108
/*! log: log sync operations */
-#define WT_STAT_CONN_LOG_SYNC 1108
+#define WT_STAT_CONN_LOG_SYNC 1109
/*! log: log sync_dir operations */
-#define WT_STAT_CONN_LOG_SYNC_DIR 1109
+#define WT_STAT_CONN_LOG_SYNC_DIR 1110
/*! log: log server thread advances write LSN */
-#define WT_STAT_CONN_LOG_WRITE_LSN 1110
+#define WT_STAT_CONN_LOG_WRITE_LSN 1111
/*! log: log write operations */
-#define WT_STAT_CONN_LOG_WRITES 1111
+#define WT_STAT_CONN_LOG_WRITES 1112
/*! log: log files manually zero-filled */
-#define WT_STAT_CONN_LOG_ZERO_FILLS 1112
+#define WT_STAT_CONN_LOG_ZERO_FILLS 1113
/*! LSM: sleep for LSM checkpoint throttle */
-#define WT_STAT_CONN_LSM_CHECKPOINT_THROTTLE 1113
+#define WT_STAT_CONN_LSM_CHECKPOINT_THROTTLE 1114
/*! LSM: sleep for LSM merge throttle */
-#define WT_STAT_CONN_LSM_MERGE_THROTTLE 1114
+#define WT_STAT_CONN_LSM_MERGE_THROTTLE 1115
/*! LSM: rows merged in an LSM tree */
-#define WT_STAT_CONN_LSM_ROWS_MERGED 1115
+#define WT_STAT_CONN_LSM_ROWS_MERGED 1116
/*! LSM: application work units currently queued */
-#define WT_STAT_CONN_LSM_WORK_QUEUE_APP 1116
+#define WT_STAT_CONN_LSM_WORK_QUEUE_APP 1117
/*! LSM: merge work units currently queued */
-#define WT_STAT_CONN_LSM_WORK_QUEUE_MANAGER 1117
+#define WT_STAT_CONN_LSM_WORK_QUEUE_MANAGER 1118
/*! LSM: tree queue hit maximum */
-#define WT_STAT_CONN_LSM_WORK_QUEUE_MAX 1118
+#define WT_STAT_CONN_LSM_WORK_QUEUE_MAX 1119
/*! LSM: switch work units currently queued */
-#define WT_STAT_CONN_LSM_WORK_QUEUE_SWITCH 1119
+#define WT_STAT_CONN_LSM_WORK_QUEUE_SWITCH 1120
/*! LSM: tree maintenance operations scheduled */
-#define WT_STAT_CONN_LSM_WORK_UNITS_CREATED 1120
+#define WT_STAT_CONN_LSM_WORK_UNITS_CREATED 1121
/*! LSM: tree maintenance operations discarded */
-#define WT_STAT_CONN_LSM_WORK_UNITS_DISCARDED 1121
+#define WT_STAT_CONN_LSM_WORK_UNITS_DISCARDED 1122
/*! LSM: tree maintenance operations executed */
-#define WT_STAT_CONN_LSM_WORK_UNITS_DONE 1122
+#define WT_STAT_CONN_LSM_WORK_UNITS_DONE 1123
/*! connection: memory allocations */
-#define WT_STAT_CONN_MEMORY_ALLOCATION 1123
+#define WT_STAT_CONN_MEMORY_ALLOCATION 1124
/*! connection: memory frees */
-#define WT_STAT_CONN_MEMORY_FREE 1124
+#define WT_STAT_CONN_MEMORY_FREE 1125
/*! connection: memory re-allocations */
-#define WT_STAT_CONN_MEMORY_GROW 1125
+#define WT_STAT_CONN_MEMORY_GROW 1126
/*! thread-yield: page acquire busy blocked */
-#define WT_STAT_CONN_PAGE_BUSY_BLOCKED 1126
+#define WT_STAT_CONN_PAGE_BUSY_BLOCKED 1127
/*! thread-yield: page acquire eviction blocked */
-#define WT_STAT_CONN_PAGE_FORCIBLE_EVICT_BLOCKED 1127
+#define WT_STAT_CONN_PAGE_FORCIBLE_EVICT_BLOCKED 1128
/*! thread-yield: page acquire locked blocked */
-#define WT_STAT_CONN_PAGE_LOCKED_BLOCKED 1128
+#define WT_STAT_CONN_PAGE_LOCKED_BLOCKED 1129
/*! thread-yield: page acquire read blocked */
-#define WT_STAT_CONN_PAGE_READ_BLOCKED 1129
+#define WT_STAT_CONN_PAGE_READ_BLOCKED 1130
/*! thread-yield: page acquire time sleeping (usecs) */
-#define WT_STAT_CONN_PAGE_SLEEP 1130
+#define WT_STAT_CONN_PAGE_SLEEP 1131
/*! connection: total read I/Os */
-#define WT_STAT_CONN_READ_IO 1131
+#define WT_STAT_CONN_READ_IO 1132
+/*! reconciliation: pages deleted */
+#define WT_STAT_CONN_REC_PAGE_DELETE 1133
+/*! reconciliation: fast-path pages deleted */
+#define WT_STAT_CONN_REC_PAGE_DELETE_FAST 1134
/*! reconciliation: page reconciliation calls */
-#define WT_STAT_CONN_REC_PAGES 1132
+#define WT_STAT_CONN_REC_PAGES 1135
/*! reconciliation: page reconciliation calls for eviction */
-#define WT_STAT_CONN_REC_PAGES_EVICTION 1133
+#define WT_STAT_CONN_REC_PAGES_EVICTION 1136
/*! reconciliation: split bytes currently awaiting free */
-#define WT_STAT_CONN_REC_SPLIT_STASHED_BYTES 1134
+#define WT_STAT_CONN_REC_SPLIT_STASHED_BYTES 1137
/*! reconciliation: split objects currently awaiting free */
-#define WT_STAT_CONN_REC_SPLIT_STASHED_OBJECTS 1135
+#define WT_STAT_CONN_REC_SPLIT_STASHED_OBJECTS 1138
/*! connection: pthread mutex shared lock read-lock calls */
-#define WT_STAT_CONN_RWLOCK_READ 1136
+#define WT_STAT_CONN_RWLOCK_READ 1139
/*! connection: pthread mutex shared lock write-lock calls */
-#define WT_STAT_CONN_RWLOCK_WRITE 1137
+#define WT_STAT_CONN_RWLOCK_WRITE 1140
/*! session: open cursor count */
-#define WT_STAT_CONN_SESSION_CURSOR_OPEN 1138
+#define WT_STAT_CONN_SESSION_CURSOR_OPEN 1141
/*! session: open session count */
-#define WT_STAT_CONN_SESSION_OPEN 1139
+#define WT_STAT_CONN_SESSION_OPEN 1142
/*! transaction: transaction begins */
-#define WT_STAT_CONN_TXN_BEGIN 1140
+#define WT_STAT_CONN_TXN_BEGIN 1143
/*! transaction: transaction checkpoints */
-#define WT_STAT_CONN_TXN_CHECKPOINT 1141
+#define WT_STAT_CONN_TXN_CHECKPOINT 1144
/*! transaction: transaction checkpoint generation */
-#define WT_STAT_CONN_TXN_CHECKPOINT_GENERATION 1142
+#define WT_STAT_CONN_TXN_CHECKPOINT_GENERATION 1145
/*! transaction: transaction checkpoint currently running */
-#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1143
+#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1146
/*! transaction: transaction checkpoint max time (msecs) */
-#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1144
+#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1147
/*! transaction: transaction checkpoint min time (msecs) */
-#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1145
+#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1148
/*! transaction: transaction checkpoint most recent time (msecs) */
-#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1146
+#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1149
/*! transaction: transaction checkpoint total time (msecs) */
-#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1147
+#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1150
/*! transaction: transactions committed */
-#define WT_STAT_CONN_TXN_COMMIT 1148
+#define WT_STAT_CONN_TXN_COMMIT 1151
/*! transaction: transaction failures due to cache overflow */
-#define WT_STAT_CONN_TXN_FAIL_CACHE 1149
+#define WT_STAT_CONN_TXN_FAIL_CACHE 1152
/*! transaction: transaction range of IDs currently pinned by a checkpoint */
-#define WT_STAT_CONN_TXN_PINNED_CHECKPOINT_RANGE 1150
+#define WT_STAT_CONN_TXN_PINNED_CHECKPOINT_RANGE 1153
/*! transaction: transaction range of IDs currently pinned */
-#define WT_STAT_CONN_TXN_PINNED_RANGE 1151
+#define WT_STAT_CONN_TXN_PINNED_RANGE 1154
/*! transaction: transaction range of IDs currently pinned by named
* snapshots */
-#define WT_STAT_CONN_TXN_PINNED_SNAPSHOT_RANGE 1152
+#define WT_STAT_CONN_TXN_PINNED_SNAPSHOT_RANGE 1155
/*! transaction: transactions rolled back */
-#define WT_STAT_CONN_TXN_ROLLBACK 1153
+#define WT_STAT_CONN_TXN_ROLLBACK 1156
/*! transaction: number of named snapshots created */
-#define WT_STAT_CONN_TXN_SNAPSHOTS_CREATED 1154
+#define WT_STAT_CONN_TXN_SNAPSHOTS_CREATED 1157
/*! transaction: number of named snapshots dropped */
-#define WT_STAT_CONN_TXN_SNAPSHOTS_DROPPED 1155
+#define WT_STAT_CONN_TXN_SNAPSHOTS_DROPPED 1158
/*! transaction: transaction sync calls */
-#define WT_STAT_CONN_TXN_SYNC 1156
+#define WT_STAT_CONN_TXN_SYNC 1159
/*! connection: total write I/Os */
-#define WT_STAT_CONN_WRITE_IO 1157
+#define WT_STAT_CONN_WRITE_IO 1160
/*!
* @}
@@ -4095,54 +4157,71 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection);
#define WT_STAT_DSRC_CURSOR_SEARCH 2072
/*! cursor: search near calls */
#define WT_STAT_DSRC_CURSOR_SEARCH_NEAR 2073
+/*! cursor: truncate calls */
+#define WT_STAT_DSRC_CURSOR_TRUNCATE 2074
/*! cursor: update calls */
-#define WT_STAT_DSRC_CURSOR_UPDATE 2074
+#define WT_STAT_DSRC_CURSOR_UPDATE 2075
/*! cursor: cursor-update value bytes updated */
-#define WT_STAT_DSRC_CURSOR_UPDATE_BYTES 2075
+#define WT_STAT_DSRC_CURSOR_UPDATE_BYTES 2076
/*! LSM: sleep for LSM checkpoint throttle */
-#define WT_STAT_DSRC_LSM_CHECKPOINT_THROTTLE 2076
+#define WT_STAT_DSRC_LSM_CHECKPOINT_THROTTLE 2077
/*! LSM: chunks in the LSM tree */
-#define WT_STAT_DSRC_LSM_CHUNK_COUNT 2077
+#define WT_STAT_DSRC_LSM_CHUNK_COUNT 2078
/*! LSM: highest merge generation in the LSM tree */
-#define WT_STAT_DSRC_LSM_GENERATION_MAX 2078
+#define WT_STAT_DSRC_LSM_GENERATION_MAX 2079
/*! LSM: queries that could have benefited from a Bloom filter that did
* not exist */
-#define WT_STAT_DSRC_LSM_LOOKUP_NO_BLOOM 2079
+#define WT_STAT_DSRC_LSM_LOOKUP_NO_BLOOM 2080
/*! LSM: sleep for LSM merge throttle */
-#define WT_STAT_DSRC_LSM_MERGE_THROTTLE 2080
+#define WT_STAT_DSRC_LSM_MERGE_THROTTLE 2081
/*! reconciliation: dictionary matches */
-#define WT_STAT_DSRC_REC_DICTIONARY 2081
+#define WT_STAT_DSRC_REC_DICTIONARY 2082
/*! reconciliation: internal page multi-block writes */
-#define WT_STAT_DSRC_REC_MULTIBLOCK_INTERNAL 2082
+#define WT_STAT_DSRC_REC_MULTIBLOCK_INTERNAL 2083
/*! reconciliation: leaf page multi-block writes */
-#define WT_STAT_DSRC_REC_MULTIBLOCK_LEAF 2083
+#define WT_STAT_DSRC_REC_MULTIBLOCK_LEAF 2084
/*! reconciliation: maximum blocks required for a page */
-#define WT_STAT_DSRC_REC_MULTIBLOCK_MAX 2084
+#define WT_STAT_DSRC_REC_MULTIBLOCK_MAX 2085
/*! reconciliation: internal-page overflow keys */
-#define WT_STAT_DSRC_REC_OVERFLOW_KEY_INTERNAL 2085
+#define WT_STAT_DSRC_REC_OVERFLOW_KEY_INTERNAL 2086
/*! reconciliation: leaf-page overflow keys */
-#define WT_STAT_DSRC_REC_OVERFLOW_KEY_LEAF 2086
+#define WT_STAT_DSRC_REC_OVERFLOW_KEY_LEAF 2087
/*! reconciliation: overflow values written */
-#define WT_STAT_DSRC_REC_OVERFLOW_VALUE 2087
+#define WT_STAT_DSRC_REC_OVERFLOW_VALUE 2088
/*! reconciliation: pages deleted */
-#define WT_STAT_DSRC_REC_PAGE_DELETE 2088
+#define WT_STAT_DSRC_REC_PAGE_DELETE 2089
+/*! reconciliation: fast-path pages deleted */
+#define WT_STAT_DSRC_REC_PAGE_DELETE_FAST 2090
/*! reconciliation: page checksum matches */
-#define WT_STAT_DSRC_REC_PAGE_MATCH 2089
+#define WT_STAT_DSRC_REC_PAGE_MATCH 2091
/*! reconciliation: page reconciliation calls */
-#define WT_STAT_DSRC_REC_PAGES 2090
+#define WT_STAT_DSRC_REC_PAGES 2092
/*! reconciliation: page reconciliation calls for eviction */
-#define WT_STAT_DSRC_REC_PAGES_EVICTION 2091
+#define WT_STAT_DSRC_REC_PAGES_EVICTION 2093
/*! reconciliation: leaf page key bytes discarded using prefix compression */
-#define WT_STAT_DSRC_REC_PREFIX_COMPRESSION 2092
+#define WT_STAT_DSRC_REC_PREFIX_COMPRESSION 2094
/*! reconciliation: internal page key bytes discarded using suffix
* compression */
-#define WT_STAT_DSRC_REC_SUFFIX_COMPRESSION 2093
+#define WT_STAT_DSRC_REC_SUFFIX_COMPRESSION 2095
/*! session: object compaction */
-#define WT_STAT_DSRC_SESSION_COMPACT 2094
+#define WT_STAT_DSRC_SESSION_COMPACT 2096
/*! session: open cursor count */
-#define WT_STAT_DSRC_SESSION_CURSOR_OPEN 2095
+#define WT_STAT_DSRC_SESSION_CURSOR_OPEN 2097
/*! transaction: update conflicts */
-#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2096
+#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2098
+
+/*!
+ * @}
+ * @name Statistics for join cursors
+ * @anchor statistics_join
+ * @{
+ */
+/*! : accesses */
+#define WT_STAT_JOIN_ACCESSES 3000
+/*! : actual count of items */
+#define WT_STAT_JOIN_ACTUAL_COUNT 3001
+/*! : bloom filter false positives */
+#define WT_STAT_JOIN_BLOOM_FALSE_POSITIVE 3002
/*! @} */
/*
* Statistics section: END
diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h
index 3f4e0ada7f1..0a1e143ce70 100644
--- a/src/include/wt_internal.h
+++ b/src/include/wt_internal.h
@@ -136,6 +136,14 @@ struct __wt_cursor_dump;
typedef struct __wt_cursor_dump WT_CURSOR_DUMP;
struct __wt_cursor_index;
typedef struct __wt_cursor_index WT_CURSOR_INDEX;
+struct __wt_cursor_join;
+ typedef struct __wt_cursor_join WT_CURSOR_JOIN;
+struct __wt_cursor_join_endpoint;
+ typedef struct __wt_cursor_join_endpoint WT_CURSOR_JOIN_ENDPOINT;
+struct __wt_cursor_join_entry;
+ typedef struct __wt_cursor_join_entry WT_CURSOR_JOIN_ENTRY;
+struct __wt_cursor_join_iter;
+ typedef struct __wt_cursor_join_iter WT_CURSOR_JOIN_ITER;
struct __wt_cursor_json;
typedef struct __wt_cursor_json WT_CURSOR_JSON;
struct __wt_cursor_log;
@@ -178,6 +186,10 @@ struct __wt_insert;
typedef struct __wt_insert WT_INSERT;
struct __wt_insert_head;
typedef struct __wt_insert_head WT_INSERT_HEAD;
+struct __wt_join_stats;
+ typedef struct __wt_join_stats WT_JOIN_STATS;
+struct __wt_join_stats_group;
+ typedef struct __wt_join_stats_group WT_JOIN_STATS_GROUP;
struct __wt_keyed_encryptor;
typedef struct __wt_keyed_encryptor WT_KEYED_ENCRYPTOR;
struct __wt_log;
diff --git a/src/lsm/lsm_stat.c b/src/lsm/lsm_stat.c
index 4381ca0df00..c1eb7a2a389 100644
--- a/src/lsm/lsm_stat.c
+++ b/src/lsm/lsm_stat.c
@@ -77,12 +77,12 @@ __curstat_lsm_init(
*/
WT_ERR(__wt_buf_fmt(
session, uribuf, "statistics:%s", chunk->uri));
- ret = __wt_curstat_open(session, uribuf->data,
+ ret = __wt_curstat_open(session, uribuf->data, NULL,
F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) ? disk_cfg : cfg,
&stat_cursor);
if (ret == WT_NOTFOUND && F_ISSET(chunk, WT_LSM_CHUNK_ONDISK))
ret = __wt_curstat_open(
- session, uribuf->data, cfg, &stat_cursor);
+ session, uribuf->data, NULL, cfg, &stat_cursor);
WT_ERR(ret);
/*
@@ -107,7 +107,7 @@ __curstat_lsm_init(
WT_ERR(__wt_buf_fmt(
session, uribuf, "statistics:%s", chunk->bloom_uri));
WT_ERR(__wt_curstat_open(
- session, uribuf->data, cfg, &stat_cursor));
+ session, uribuf->data, NULL, cfg, &stat_cursor));
/*
* The underlying statistics have now been initialized; fill in
diff --git a/src/packing/pack_impl.c b/src/packing/pack_impl.c
index 3a4428eae15..447c887dc6f 100644
--- a/src/packing/pack_impl.c
+++ b/src/packing/pack_impl.c
@@ -105,3 +105,108 @@ __wt_struct_unpack(WT_SESSION_IMPL *session,
return (ret);
}
+
+/*
+ * __wt_struct_unpack_size --
+ * Determine the packed size of a buffer matching the format.
+ */
+int
+__wt_struct_unpack_size(WT_SESSION_IMPL *session,
+ const void *buffer, size_t size, const char *fmt, size_t *resultp)
+{
+ WT_DECL_PACK_VALUE(pv);
+ WT_DECL_RET;
+ WT_PACK pack;
+ const uint8_t *p, *end;
+
+ p = buffer;
+ end = p + size;
+
+ WT_RET(__pack_init(session, &pack, fmt));
+ while ((ret = __pack_next(&pack, &pv)) == 0)
+ WT_RET(__unpack_read(session, &pv, &p, (size_t)(end - p)));
+
+ /* Be paranoid - __pack_write should never overflow. */
+ WT_ASSERT(session, p <= end);
+
+ if (ret != WT_NOTFOUND)
+ return (ret);
+
+ *resultp = WT_PTRDIFF(p, buffer);
+ return (0);
+}
+
+/*
+ * __wt_struct_repack --
+ * Return the subset of the packed buffer that represents part of
+ * the format. If the result is not contiguous in the existing
+ * buffer, a buffer is reallocated and filled.
+ */
+int
+__wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt,
+ const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf,
+ void **reallocp)
+{
+ WT_DECL_PACK_VALUE(pvin);
+ WT_DECL_PACK_VALUE(pvout);
+ WT_DECL_RET;
+ WT_PACK packin, packout;
+ const uint8_t *before, *end, *p;
+ uint8_t *newbuf, *pout;
+ size_t len;
+ const void *start;
+
+ start = newbuf = NULL;
+ p = inbuf->data;
+ end = p + inbuf->size;
+
+ /*
+ * Handle this non-contiguous case: 'U' -> 'u' at the end of the buf.
+ * The former case has the size embedded before the item, the latter
+ * does not.
+ */
+ if ((len = strlen(outfmt)) > 1 && outfmt[len - 1] == 'u' &&
+ strlen(infmt) > len && infmt[len - 1] == 'U') {
+ WT_ERR(__wt_realloc(session, NULL, inbuf->size, reallocp));
+ pout = *reallocp;
+ } else
+ pout = NULL;
+
+ WT_ERR(__pack_init(session, &packout, outfmt));
+ WT_ERR(__pack_init(session, &packin, infmt));
+
+ /* Outfmt should complete before infmt */
+ while ((ret = __pack_next(&packout, &pvout)) == 0) {
+ WT_ERR(__pack_next(&packin, &pvin));
+ before = p;
+ WT_ERR(__unpack_read(session, &pvin, &p, (size_t)(end - p)));
+ if (pvout.type != pvin.type) {
+ if (pvout.type == 'u' && pvin.type == 'U') {
+ /* Skip the prefixed size, we don't need it */
+ WT_ERR(__wt_struct_unpack_size(session, before,
+ (size_t)(end - before), "I", &len));
+ before += len;
+ } else
+ WT_ERR(ENOTSUP);
+ }
+ if (pout != NULL) {
+ memcpy(pout, before, WT_PTRDIFF(p, before));
+ pout += p - before;
+ } else if (start == NULL)
+ start = before;
+ }
+ WT_ERR_NOTFOUND_OK(ret);
+
+ /* Be paranoid - __pack_write should never overflow. */
+ WT_ASSERT(session, p <= end);
+
+ if (pout != NULL) {
+ outbuf->data = *reallocp;
+ outbuf->size = WT_PTRDIFF(pout, *reallocp);
+ } else {
+ outbuf->data = start;
+ outbuf->size = WT_PTRDIFF(p, start);
+ }
+
+err: return (ret);
+}
diff --git a/src/reconcile/rec_write.c b/src/reconcile/rec_write.c
index fe60cc16063..6d53230e9e0 100644
--- a/src/reconcile/rec_write.c
+++ b/src/reconcile/rec_write.c
@@ -5470,6 +5470,7 @@ __rec_write_wrapup(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page)
case 0: /* Page delete */
WT_RET(__wt_verbose(
session, WT_VERB_RECONCILE, "page %p empty", page));
+ WT_STAT_FAST_CONN_INCR(session, rec_page_delete);
WT_STAT_FAST_DATA_INCR(session, rec_page_delete);
/* If this is the root page, we need to create a sync point. */
diff --git a/src/schema/schema_stat.c b/src/schema/schema_stat.c
index d73d66cd399..82c2e2a15dc 100644
--- a/src/schema/schema_stat.c
+++ b/src/schema/schema_stat.c
@@ -24,7 +24,7 @@ __wt_curstat_colgroup_init(WT_SESSION_IMPL *session,
WT_RET(__wt_scr_alloc(session, 0, &buf));
WT_ERR(__wt_buf_fmt(session, buf, "statistics:%s", colgroup->source));
- ret = __wt_curstat_init(session, buf->data, cfg, cst);
+ ret = __wt_curstat_init(session, buf->data, NULL, cfg, cst);
err: __wt_scr_free(session, &buf);
return (ret);
@@ -46,7 +46,7 @@ __wt_curstat_index_init(WT_SESSION_IMPL *session,
WT_RET(__wt_scr_alloc(session, 0, &buf));
WT_ERR(__wt_buf_fmt(session, buf, "statistics:%s", idx->source));
- ret = __wt_curstat_init(session, buf->data, cfg, cst);
+ ret = __wt_curstat_init(session, buf->data, NULL, cfg, cst);
err: __wt_scr_free(session, &buf);
return (ret);
@@ -159,7 +159,7 @@ __wt_curstat_table_init(WT_SESSION_IMPL *session,
WT_ERR(__wt_buf_fmt(
session, buf, "statistics:%s", table->cgroups[i]->name));
WT_ERR(__wt_curstat_open(
- session, buf->data, cfg, &stat_cursor));
+ session, buf->data, NULL, cfg, &stat_cursor));
new = (WT_DSRC_STATS *)WT_CURSOR_STATS(stat_cursor);
if (i == 0)
*stats = *new;
@@ -174,7 +174,7 @@ __wt_curstat_table_init(WT_SESSION_IMPL *session,
WT_ERR(__wt_buf_fmt(
session, buf, "statistics:%s", table->indices[i]->name));
WT_ERR(__wt_curstat_open(
- session, buf->data, cfg, &stat_cursor));
+ session, buf->data, NULL, cfg, &stat_cursor));
new = (WT_DSRC_STATS *)WT_CURSOR_STATS(stat_cursor);
__wt_stat_dsrc_aggregate_single(new, stats);
WT_ERR(stat_cursor->close(stat_cursor));
diff --git a/src/schema/schema_truncate.c b/src/schema/schema_truncate.c
index 03a991a9aba..c39bba4753c 100644
--- a/src/schema/schema_truncate.c
+++ b/src/schema/schema_truncate.c
@@ -26,6 +26,7 @@ __truncate_file(WT_SESSION_IMPL *session, const char *uri)
/* Open and lock the file. */
WT_RET(__wt_session_get_btree(
session, uri, NULL, NULL, WT_DHANDLE_EXCLUSIVE));
+ WT_STAT_FAST_DATA_INCR(session, cursor_truncate);
/* Get the allocation size. */
allocsize = S2BT(session)->allocsize;
@@ -56,6 +57,7 @@ __truncate_table(WT_SESSION_IMPL *session, const char *uri, const char *cfg[])
u_int i;
WT_RET(__wt_schema_get_table(session, uri, strlen(uri), false, &table));
+ WT_STAT_FAST_DATA_INCR(session, cursor_truncate);
/* Truncate the column groups. */
for (i = 0; i < WT_COLGROUPS(table); i++)
@@ -90,6 +92,7 @@ __truncate_dsrc(WT_SESSION_IMPL *session, const char *uri)
while ((ret = cursor->next(cursor)) == 0)
WT_ERR(cursor->remove(cursor));
WT_ERR_NOTFOUND_OK(ret);
+ WT_STAT_FAST_DATA_INCR(session, cursor_truncate);
err: WT_TRET(cursor->close(cursor));
return (ret);
diff --git a/src/session/session_api.c b/src/session/session_api.c
index f9af3c2d5f1..db81623c613 100644
--- a/src/session/session_api.c
+++ b/src/session/session_api.c
@@ -240,12 +240,12 @@ err: API_END_RET_NOTFOUND_MAP(session, ret);
}
/*
- * __wt_open_cursor --
- * Internal version of WT_SESSION::open_cursor.
+ * __session_open_cursor_int --
+ * Internal version of WT_SESSION::open_cursor, with second cursor arg.
*/
-int
-__wt_open_cursor(WT_SESSION_IMPL *session,
- const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp)
+static int
+__session_open_cursor_int(WT_SESSION_IMPL *session, const char *uri,
+ WT_CURSOR *owner, WT_CURSOR *other, const char *cfg[], WT_CURSOR **cursorp)
{
WT_COLGROUP *colgroup;
WT_DATA_SOURCE *dsrc;
@@ -267,7 +267,8 @@ __wt_open_cursor(WT_SESSION_IMPL *session,
*/
case 't':
if (WT_PREFIX_MATCH(uri, "table:"))
- WT_RET(__wt_curtable_open(session, uri, cfg, cursorp));
+ WT_RET(__wt_curtable_open(
+ session, uri, owner, cfg, cursorp));
break;
case 'c':
if (WT_PREFIX_MATCH(uri, "colgroup:")) {
@@ -288,6 +289,11 @@ __wt_open_cursor(WT_SESSION_IMPL *session,
WT_RET(__wt_curindex_open(
session, uri, owner, cfg, cursorp));
break;
+ case 'j':
+ if (WT_PREFIX_MATCH(uri, "join:"))
+ WT_RET(__wt_curjoin_open(
+ session, uri, owner, cfg, cursorp));
+ break;
case 'l':
if (WT_PREFIX_MATCH(uri, "lsm:"))
WT_RET(__wt_clsm_open(
@@ -316,7 +322,8 @@ __wt_open_cursor(WT_SESSION_IMPL *session,
break;
case 's':
if (WT_PREFIX_MATCH(uri, "statistics:"))
- WT_RET(__wt_curstat_open(session, uri, cfg, cursorp));
+ WT_RET(__wt_curstat_open(session, uri, other, cfg,
+ cursorp));
break;
default:
break;
@@ -346,6 +353,18 @@ __wt_open_cursor(WT_SESSION_IMPL *session,
}
/*
+ * __wt_open_cursor --
+ * Internal version of WT_SESSION::open_cursor.
+ */
+int
+__wt_open_cursor(WT_SESSION_IMPL *session,
+ const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp)
+{
+ return (__session_open_cursor_int(session, uri, owner, NULL, cfg,
+ cursorp));
+}
+
+/*
* __session_open_cursor --
* WT_SESSION->open_cursor method.
*/
@@ -356,18 +375,22 @@ __session_open_cursor(WT_SESSION *wt_session,
WT_CURSOR *cursor;
WT_DECL_RET;
WT_SESSION_IMPL *session;
+ bool statjoin;
cursor = *cursorp = NULL;
session = (WT_SESSION_IMPL *)wt_session;
SESSION_API_CALL(session, open_cursor, config, cfg);
- if ((to_dup == NULL && uri == NULL) || (to_dup != NULL && uri != NULL))
+ statjoin = (to_dup != NULL && uri != NULL &&
+ WT_STREQ(uri, "statistics:join"));
+ if ((to_dup == NULL && uri == NULL) ||
+ (to_dup != NULL && uri != NULL && !statjoin))
WT_ERR_MSG(session, EINVAL,
"should be passed either a URI or a cursor to duplicate, "
"but not both");
- if (to_dup != NULL) {
+ if (to_dup != NULL && !statjoin) {
uri = to_dup->uri;
if (!WT_PREFIX_MATCH(uri, "colgroup:") &&
!WT_PREFIX_MATCH(uri, "index:") &&
@@ -379,8 +402,9 @@ __session_open_cursor(WT_SESSION *wt_session,
WT_ERR(__wt_bad_object_type(session, uri));
}
- WT_ERR(__wt_open_cursor(session, uri, NULL, cfg, &cursor));
- if (to_dup != NULL)
+ WT_ERR(__session_open_cursor_int(session, uri, NULL,
+ statjoin ? to_dup : NULL, cfg, &cursor));
+ if (to_dup != NULL && !statjoin)
WT_ERR(__wt_cursor_dup_position(to_dup, cursor));
*cursorp = cursor;
@@ -614,6 +638,123 @@ err: /* Note: drop operations cannot be unrolled (yet?). */
}
/*
+ * __session_join --
+ * WT_SESSION->join method.
+ */
+static int
+__session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor,
+ WT_CURSOR *ref_cursor, const char *config)
+{
+ WT_CONFIG_ITEM cval;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ WT_CURSOR_INDEX *cindex;
+ WT_CURSOR_JOIN *cjoin;
+ WT_CURSOR_TABLE *ctable;
+ WT_INDEX *idx;
+ WT_TABLE *table;
+ uint32_t flags, range;
+ uint64_t count;
+ uint64_t bloom_bit_count, bloom_hash_count;
+
+ count = 0;
+ session = (WT_SESSION_IMPL *)wt_session;
+ SESSION_API_CALL(session, join, config, cfg);
+ table = NULL;
+
+ if (!WT_PREFIX_MATCH(join_cursor->uri, "join:")) {
+ __wt_errx(session, "not a join cursor");
+ WT_ERR(EINVAL);
+ }
+
+ if (WT_PREFIX_MATCH(ref_cursor->uri, "index:")) {
+ cindex = (WT_CURSOR_INDEX *)ref_cursor;
+ idx = cindex->index;
+ table = cindex->table;
+ WT_CURSOR_CHECKKEY(ref_cursor);
+ } else if (WT_PREFIX_MATCH(ref_cursor->uri, "table:")) {
+ idx = NULL;
+ ctable = (WT_CURSOR_TABLE *)ref_cursor;
+ table = ctable->table;
+ WT_CURSOR_CHECKKEY(ctable->cg_cursors[0]);
+ } else {
+ __wt_errx(session, "not an index or table cursor");
+ WT_ERR(EINVAL);
+ }
+
+ cjoin = (WT_CURSOR_JOIN *)join_cursor;
+ if (cjoin->table != table) {
+ __wt_errx(session, "table for join cursor does not match "
+ "table for index");
+ WT_ERR(EINVAL);
+ }
+ if (F_ISSET(ref_cursor, WT_CURSTD_JOINED)) {
+ __wt_errx(session, "index cursor already used in a join");
+ WT_ERR(EINVAL);
+ }
+
+ /* "ge" is the default */
+ range = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ;
+ flags = 0;
+ WT_ERR(__wt_config_gets(session, cfg, "compare", &cval));
+ if (cval.len != 0) {
+ if (WT_STRING_MATCH("gt", cval.str, cval.len))
+ range = WT_CURJOIN_END_GT;
+ else if (WT_STRING_MATCH("lt", cval.str, cval.len))
+ range = WT_CURJOIN_END_LT;
+ else if (WT_STRING_MATCH("le", cval.str, cval.len))
+ range = WT_CURJOIN_END_LE;
+ else if (WT_STRING_MATCH("eq", cval.str, cval.len))
+ range = WT_CURJOIN_END_EQ;
+ else if (!WT_STRING_MATCH("ge", cval.str, cval.len))
+ WT_ERR(EINVAL);
+ }
+ WT_ERR(__wt_config_gets(session, cfg, "count", &cval));
+ if (cval.len != 0)
+ count = (uint64_t)cval.val;
+
+ WT_ERR(__wt_config_gets(session, cfg, "strategy", &cval));
+ if (cval.len != 0) {
+ if (WT_STRING_MATCH("bloom", cval.str, cval.len))
+ LF_SET(WT_CURJOIN_ENTRY_BLOOM);
+ else if (!WT_STRING_MATCH("default", cval.str, cval.len))
+ WT_ERR(EINVAL);
+ }
+ WT_ERR(__wt_config_gets(session, cfg, "bloom_bit_count", &cval));
+ bloom_bit_count = (uint64_t)cval.val;
+ WT_ERR(__wt_config_gets(session, cfg, "bloom_hash_count", &cval));
+ bloom_hash_count = (uint64_t)cval.val;
+ if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM)) {
+ if (count == 0) {
+ __wt_errx(session, "count must be nonzero when "
+ "strategy=bloom");
+ WT_ERR(EINVAL);
+ }
+ if (cjoin->entries_next == 0) {
+ __wt_errx(session, "the first joined cursor cannot "
+ "specify strategy=bloom");
+ WT_ERR(EINVAL);
+ }
+ }
+ WT_ERR(__wt_curjoin_join(session, cjoin, idx, ref_cursor, flags,
+ range, count, bloom_bit_count, bloom_hash_count));
+ /*
+ * There's an implied ownership ordering that isn't
+ * known when the cursors are created: the join cursor
+ * must be closed before any of the indices. Enforce
+ * that here by reordering.
+ */
+ if (TAILQ_FIRST(&session->cursors) != join_cursor) {
+ TAILQ_REMOVE(&session->cursors, join_cursor, q);
+ TAILQ_INSERT_HEAD(&session->cursors, join_cursor, q);
+ }
+ /* Disable the reference cursor for regular operations */
+ F_SET(ref_cursor, WT_CURSTD_JOINED);
+
+err: API_END_RET_NOTFOUND_MAP(session, ret);
+}
+
+/*
* __session_salvage --
* WT_SESSION->salvage method.
*/
@@ -657,6 +798,7 @@ __session_truncate(WT_SESSION *wt_session,
session = (WT_SESSION_IMPL *)wt_session;
SESSION_TXN_API_CALL(session, truncate, config, cfg);
+ WT_STAT_FAST_CONN_INCR(session, cursor_truncate);
/*
* If the URI is specified, we don't need a start/stop, if start/stop
@@ -1144,6 +1286,7 @@ __open_session(WT_CONNECTION_IMPL *conn,
__session_create,
__wt_session_compact,
__session_drop,
+ __session_join,
__session_log_flush,
__session_log_printf,
__session_rename,
diff --git a/src/support/stat.c b/src/support/stat.c
index 015b9a065ca..cee380eced6 100644
--- a/src/support/stat.c
+++ b/src/support/stat.c
@@ -77,6 +77,7 @@ static const char * const __stats_dsrc_desc[] = {
"cursor: restarted searches",
"cursor: search calls",
"cursor: search near calls",
+ "cursor: truncate calls",
"cursor: update calls",
"cursor: cursor-update value bytes updated",
"LSM: sleep for LSM checkpoint throttle",
@@ -92,6 +93,7 @@ static const char * const __stats_dsrc_desc[] = {
"reconciliation: leaf-page overflow keys",
"reconciliation: overflow values written",
"reconciliation: pages deleted",
+ "reconciliation: fast-path pages deleted",
"reconciliation: page checksum matches",
"reconciliation: page reconciliation calls",
"reconciliation: page reconciliation calls for eviction",
@@ -102,10 +104,12 @@ static const char * const __stats_dsrc_desc[] = {
"transaction: update conflicts",
};
-const char *
-__wt_stat_dsrc_desc(int slot)
+int
+__wt_stat_dsrc_desc(WT_CURSOR_STAT *cst, int slot, const char **p)
{
- return (__stats_dsrc_desc[slot]);
+ WT_UNUSED(cst);
+ *p = __stats_dsrc_desc[slot];
+ return (0);
}
void
@@ -196,6 +200,7 @@ __wt_stat_dsrc_clear_single(WT_DSRC_STATS *stats)
stats->cursor_restart = 0;
stats->cursor_search = 0;
stats->cursor_search_near = 0;
+ stats->cursor_truncate = 0;
stats->cursor_update = 0;
stats->bloom_false_positive = 0;
stats->bloom_hit = 0;
@@ -210,6 +215,7 @@ __wt_stat_dsrc_clear_single(WT_DSRC_STATS *stats)
stats->lsm_merge_throttle = 0;
stats->bloom_size = 0;
stats->rec_dictionary = 0;
+ stats->rec_page_delete_fast = 0;
stats->rec_suffix_compression = 0;
stats->rec_multiblock_internal = 0;
stats->rec_overflow_key_internal = 0;
@@ -315,6 +321,7 @@ __wt_stat_dsrc_aggregate_single(
to->cursor_restart += from->cursor_restart;
to->cursor_search += from->cursor_search;
to->cursor_search_near += from->cursor_search_near;
+ to->cursor_truncate += from->cursor_truncate;
to->cursor_update += from->cursor_update;
to->bloom_false_positive += from->bloom_false_positive;
to->bloom_hit += from->bloom_hit;
@@ -330,6 +337,7 @@ __wt_stat_dsrc_aggregate_single(
to->lsm_merge_throttle += from->lsm_merge_throttle;
to->bloom_size += from->bloom_size;
to->rec_dictionary += from->rec_dictionary;
+ to->rec_page_delete_fast += from->rec_page_delete_fast;
to->rec_suffix_compression += from->rec_suffix_compression;
to->rec_multiblock_internal += from->rec_multiblock_internal;
to->rec_overflow_key_internal += from->rec_overflow_key_internal;
@@ -449,6 +457,7 @@ __wt_stat_dsrc_aggregate(
to->cursor_restart += WT_STAT_READ(from, cursor_restart);
to->cursor_search += WT_STAT_READ(from, cursor_search);
to->cursor_search_near += WT_STAT_READ(from, cursor_search_near);
+ to->cursor_truncate += WT_STAT_READ(from, cursor_truncate);
to->cursor_update += WT_STAT_READ(from, cursor_update);
to->bloom_false_positive += WT_STAT_READ(from, bloom_false_positive);
to->bloom_hit += WT_STAT_READ(from, bloom_hit);
@@ -466,6 +475,7 @@ __wt_stat_dsrc_aggregate(
to->lsm_merge_throttle += WT_STAT_READ(from, lsm_merge_throttle);
to->bloom_size += WT_STAT_READ(from, bloom_size);
to->rec_dictionary += WT_STAT_READ(from, rec_dictionary);
+ to->rec_page_delete_fast += WT_STAT_READ(from, rec_page_delete_fast);
to->rec_suffix_compression +=
WT_STAT_READ(from, rec_suffix_compression);
to->rec_multiblock_internal +=
@@ -562,6 +572,7 @@ static const char * const __stats_connection_desc[] = {
"cursor: cursor restarted searches",
"cursor: cursor search calls",
"cursor: cursor search near calls",
+ "cursor: truncate calls",
"cursor: cursor update calls",
"data-handle: connection data handles currently active",
"data-handle: session dhandles swept",
@@ -623,6 +634,8 @@ static const char * const __stats_connection_desc[] = {
"thread-yield: page acquire read blocked",
"thread-yield: page acquire time sleeping (usecs)",
"connection: total read I/Os",
+ "reconciliation: pages deleted",
+ "reconciliation: fast-path pages deleted",
"reconciliation: page reconciliation calls",
"reconciliation: page reconciliation calls for eviction",
"reconciliation: split bytes currently awaiting free",
@@ -651,10 +664,12 @@ static const char * const __stats_connection_desc[] = {
"connection: total write I/Os",
};
-const char *
-__wt_stat_connection_desc(int slot)
+int
+__wt_stat_connection_desc(WT_CURSOR_STAT *cst, int slot, const char **p)
{
- return (__stats_connection_desc[slot]);
+ WT_UNUSED(cst);
+ *p = __stats_connection_desc[slot];
+ return (0);
}
void
@@ -757,6 +772,7 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats)
stats->cursor_search = 0;
stats->cursor_search_near = 0;
stats->cursor_update = 0;
+ stats->cursor_truncate = 0;
/* not clearing dh_conn_handle_count */
stats->dh_sweep_ref = 0;
stats->dh_sweep_close = 0;
@@ -807,8 +823,10 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats)
stats->lsm_work_units_done = 0;
stats->lsm_work_units_created = 0;
stats->lsm_work_queue_max = 0;
+ stats->rec_page_delete_fast = 0;
stats->rec_pages = 0;
stats->rec_pages_eviction = 0;
+ stats->rec_page_delete = 0;
/* not clearing rec_split_stashed_bytes */
/* not clearing rec_split_stashed_objects */
/* not clearing session_cursor_open */
@@ -948,6 +966,7 @@ __wt_stat_connection_aggregate(
to->cursor_search += WT_STAT_READ(from, cursor_search);
to->cursor_search_near += WT_STAT_READ(from, cursor_search_near);
to->cursor_update += WT_STAT_READ(from, cursor_update);
+ to->cursor_truncate += WT_STAT_READ(from, cursor_truncate);
to->dh_conn_handle_count += WT_STAT_READ(from, dh_conn_handle_count);
to->dh_sweep_ref += WT_STAT_READ(from, dh_sweep_ref);
to->dh_sweep_close += WT_STAT_READ(from, dh_sweep_close);
@@ -1006,8 +1025,10 @@ __wt_stat_connection_aggregate(
to->lsm_work_units_created +=
WT_STAT_READ(from, lsm_work_units_created);
to->lsm_work_queue_max += WT_STAT_READ(from, lsm_work_queue_max);
+ to->rec_page_delete_fast += WT_STAT_READ(from, rec_page_delete_fast);
to->rec_pages += WT_STAT_READ(from, rec_pages);
to->rec_pages_eviction += WT_STAT_READ(from, rec_pages_eviction);
+ to->rec_page_delete += WT_STAT_READ(from, rec_page_delete);
to->rec_split_stashed_bytes +=
WT_STAT_READ(from, rec_split_stashed_bytes);
to->rec_split_stashed_objects +=
@@ -1048,3 +1069,49 @@ __wt_stat_connection_aggregate(
to->txn_commit += WT_STAT_READ(from, txn_commit);
to->txn_rollback += WT_STAT_READ(from, txn_rollback);
}
+
+static const char * const __stats_join_desc[] = {
+ ": accesses",
+ ": actual count of items",
+ ": bloom filter false positives",
+};
+
+int
+__wt_stat_join_desc(WT_CURSOR_STAT *cst, int slot, const char **p)
+{
+ WT_UNUSED(cst);
+ *p = __stats_join_desc[slot];
+ return (0);
+}
+
+void
+__wt_stat_join_init_single(WT_JOIN_STATS *stats)
+{
+ memset(stats, 0, sizeof(*stats));
+}
+
+void
+__wt_stat_join_clear_single(WT_JOIN_STATS *stats)
+{
+ stats->accesses = 0;
+ stats->actual_count = 0;
+ stats->bloom_false_positive = 0;
+}
+
+void
+__wt_stat_join_clear_all(WT_JOIN_STATS **stats)
+{
+ u_int i;
+
+ for (i = 0; i < WT_COUNTER_SLOTS; ++i)
+ __wt_stat_join_clear_single(stats[i]);
+}
+
+void
+__wt_stat_join_aggregate(
+ WT_JOIN_STATS **from, WT_JOIN_STATS *to)
+{
+ to->accesses += WT_STAT_READ(from, accesses);
+ to->actual_count += WT_STAT_READ(from, actual_count);
+ to->bloom_false_positive += WT_STAT_READ(from, bloom_false_positive);
+}
diff --git a/test/suite/test_join01.py b/test/suite/test_join01.py
new file mode 100644
index 00000000000..ca6e5fbcabb
--- /dev/null
+++ b/test/suite/test_join01.py
@@ -0,0 +1,396 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2015 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+import wiredtiger, wttest
+from wtscenario import check_scenarios, multiply_scenarios, number_scenarios
+
+# test_join01.py
+# Join operations
+# Basic tests for join
+class test_join01(wttest.WiredTigerTestCase):
+ table_name1 = 'test_join01'
+ nentries = 100
+
+ scenarios = [
+ ('table', dict(ref='table')),
+ ('index', dict(ref='index'))
+ ]
+
+ # Override WiredTigerTestCase, we have statistics tests.
+ def setUpConnectionOpen(self, dir):
+ conn = wiredtiger.wiredtiger_open(dir,
+ 'create,statistics=(all),' + 'error_prefix="%s: "' % self.shortid())
+ return conn
+
+ def gen_key(self, i):
+ return [ i + 1 ]
+
+ def gen_values(self, i):
+ s = str(i)
+ rs = s[::-1]
+ sort3 = (self.nentries * (i % 3)) + i # multiples of 3 sort first
+ return [s, rs, sort3]
+
+ # Common function for testing iteration of join cursors
+ def iter_common(self, jc, do_proj):
+ # See comments in join_common()
+ expect = [73, 82, 62, 83, 92]
+ while jc.next() == 0:
+ [k] = jc.get_keys()
+ i = k - 1
+ if do_proj: # our projection test simply reverses the values
+ [v2,v1,v0] = jc.get_values()
+ else:
+ [v0,v1,v2] = jc.get_values()
+ self.assertEquals(self.gen_values(i), [v0,v1,v2])
+ if len(expect) == 0 or i != expect[0]:
+ self.tty(' result ' + str(i) + ' is not in: ' + str(expect))
+ self.assertTrue(i == expect[0])
+ expect.remove(i)
+ self.assertEquals(0, len(expect))
+
+ # Stats are collected twice: after iterating
+ # through the join cursor once, and secondly after resetting
+ # the join cursor and iterating again.
+ def stats(self, jc, which):
+ statcur = self.session.open_cursor('statistics:join', jc, None)
+ self.check_stats(statcur, 0, 'join: index:join01:index1: ' +
+ 'bloom filter false positives')
+ statcur.close()
+
+ def statstr_to_int(self, str):
+ """
+ Convert a statistics value string, which may be in either form:
+ '12345' or '33M (33604836)'
+ """
+ parts = str.rpartition('(')
+ return int(parts[2].rstrip(')'))
+
+ # string should appear with a minimum value of least "min".
+ def check_stats(self, statcursor, min, lookfor):
+ stringclass = ''.__class__
+ intclass = (0).__class__
+
+ # Reset the cursor, we're called multiple times.
+ statcursor.reset()
+
+ found = False
+ foundval = 0
+ self.printVerbose(3, 'statistics:')
+ for id, desc, valstr, val in statcursor:
+ self.assertEqual(type(desc), stringclass)
+ self.assertEqual(type(valstr), stringclass)
+ self.assertEqual(type(val), intclass)
+ self.assertEqual(val, self.statstr_to_int(valstr))
+ self.printVerbose(3, ' stat: \'' + desc + '\', \'' +
+ valstr + '\', ' + str(val))
+ if desc == lookfor:
+ found = True
+ foundval = val
+
+ self.assertTrue(found, 'in stats, did not see: ' + lookfor)
+ self.assertTrue(foundval >= min)
+
+ # Common function for testing the most basic functionality
+ # of joins
+ def join_common(self, joincfg0, joincfg1, do_proj, do_stats):
+ #self.tty('join_common(' + joincfg0 + ',' + joincfg1 + ',' +
+ # str(do_proj) + ')')
+ self.session.create('table:join01', 'key_format=r' +
+ ',value_format=SSi,columns=(k,v0,v1,v2)')
+ self.session.create('index:join01:index0','columns=(v0)')
+ self.session.create('index:join01:index1','columns=(v1)')
+ self.session.create('index:join01:index2','columns=(v2)')
+
+ c = self.session.open_cursor('table:join01', None, None)
+ for i in range(0, self.nentries):
+ c.set_key(*self.gen_key(i))
+ c.set_value(*self.gen_values(i))
+ c.insert()
+ c.close()
+
+ if do_proj:
+ proj_suffix = '(v2,v1,v0)' # Reversed values
+ else:
+ proj_suffix = '' # Default projection (v0,v1,v2)
+
+ # We join on index2 first, not using bloom indices.
+ # This defines the order that items are returned.
+ # index2 is sorts multiples of 3 first (see gen_values())
+ # and by using 'gt' and key 99, we'll skip multiples of 3,
+ # and examine primary keys 2,5,8,...,95,98,1,4,7,...,94,97.
+ jc = self.session.open_cursor('join:table:join01' + proj_suffix,
+ None, None)
+ c2 = self.session.open_cursor('index:join01:index2', None, None)
+ c2.set_key(99) # skips all entries w/ primary key divisible by three
+ self.assertEquals(0, c2.search())
+ self.session.join(jc, c2, 'compare=gt')
+
+ # Then select all the numbers 0-99 whose string representation
+ # sort >= '60'.
+ if self.ref == 'index':
+ c0 = self.session.open_cursor('index:join01:index0', None, None)
+ c0.set_key('60')
+ else:
+ c0 = self.session.open_cursor('table:join01', None, None)
+ c0.set_key(60)
+ self.assertEquals(0, c0.search())
+ self.session.join(jc, c0, 'compare=ge' + joincfg0)
+
+ # Then select all numbers whose reverse string representation
+ # is in '20' < x < '40'.
+ c1a = self.session.open_cursor('index:join01:index1', None, None)
+ c1a.set_key('21')
+ self.assertEquals(0, c1a.search())
+ self.session.join(jc, c1a, 'compare=gt' + joincfg1)
+
+ c1b = self.session.open_cursor('index:join01:index1', None, None)
+ c1b.set_key('41')
+ self.assertEquals(0, c1b.search())
+ self.session.join(jc, c1b, 'compare=lt' + joincfg1)
+
+ # Numbers that satisfy these 3 conditions (with ordering implied by c2):
+ # [73, 82, 62, 83, 92].
+ #
+ # After iterating, we should be able to reset and iterate again.
+ if do_stats:
+ self.stats(jc, 0)
+ self.iter_common(jc, do_proj)
+ if do_stats:
+ self.stats(jc, 1)
+ jc.reset()
+ self.iter_common(jc, do_proj)
+ if do_stats:
+ self.stats(jc, 2)
+ jc.reset()
+ self.iter_common(jc, do_proj)
+
+ jc.close()
+ c2.close()
+ c1a.close()
+ c1b.close()
+ c0.close()
+ self.session.drop('table:join01')
+
+ # Test joins with basic functionality
+ def test_join(self):
+ bloomcfg1000 = ',strategy=bloom,count=1000'
+ bloomcfg10000 = ',strategy=bloom,count=10000'
+ for cfga in [ '', bloomcfg1000, bloomcfg10000 ]:
+ for cfgb in [ '', bloomcfg1000, bloomcfg10000 ]:
+ for do_proj in [ False, True ]:
+ #self.tty('cfga=' + cfga +
+ # ', cfgb=' + cfgb +
+ # ', doproj=' + str(do_proj))
+ self.join_common(cfga, cfgb, do_proj, False)
+
+ def test_join_errors(self):
+ self.session.create('table:join01', 'key_format=r,value_format=SS'
+ ',columns=(k,v0,v1)')
+ self.session.create('table:join01B', 'key_format=r,value_format=SS'
+ ',columns=(k,v0,v1)')
+ self.session.create('index:join01:index0','columns=(v0)')
+ self.session.create('index:join01:index1','columns=(v1)')
+ self.session.create('index:join01B:index0','columns=(v0)')
+ jc = self.session.open_cursor('join:table:join01', None, None)
+ tc = self.session.open_cursor('table:join01', None, None)
+ fc = self.session.open_cursor('file:join01.wt', None, None)
+ ic0 = self.session.open_cursor('index:join01:index0', None, None)
+ ic0again = self.session.open_cursor('index:join01:index0', None, None)
+ ic1 = self.session.open_cursor('index:join01:index1', None, None)
+ icB = self.session.open_cursor('index:join01B:index0', None, None)
+ tcB = self.session.open_cursor('table:join01B', None, None)
+
+ tc.set_key(1)
+ tc.set_value('val1', 'val1')
+ tc.insert()
+ tcB.set_key(1)
+ tcB.set_value('val1', 'val1')
+ tcB.insert()
+ fc.next()
+
+ # Joining using a non join-cursor
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(tc, ic0, 'compare=ge'),
+ '/not a join cursor/')
+ # Joining a table cursor, not index
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, fc, 'compare=ge'),
+ '/not an index or table cursor/')
+ # Joining a non positioned cursor
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0, 'compare=ge'),
+ '/requires key be set/')
+
+ # minimally position the cursors now
+ ic0.next()
+ ic0again.next()
+ icB.next()
+
+ # Joining non matching index
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, icB, 'compare=ge'),
+ '/table for join cursor does not match/')
+
+ # The cursor must be positioned
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic1, 'compare=ge'),
+ '/requires key be set/')
+ ic1.next()
+
+ # The first cursor joined cannot be bloom
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic1,
+ 'compare=ge,strategy=bloom,count=1000'),
+ '/first joined cursor cannot specify strategy=bloom/')
+
+ # This succeeds.
+ self.session.join(jc, ic1, 'compare=ge'),
+
+ # With bloom filters, a count is required
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0, 'compare=ge,strategy=bloom'),
+ '/count must be nonzero/')
+
+ # This succeeds.
+ self.session.join(jc, ic0, 'compare=ge,strategy=bloom,count=1000'),
+
+ bloom_config = ',strategy=bloom,count=1000'
+ # Cannot use the same index cursor
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0,
+ 'compare=le' + bloom_config),
+ '/index cursor already used in a join/')
+
+ # When joining with the same index, need compatible compares
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0again, 'compare=ge' + bloom_config),
+ '/join has overlapping ranges/')
+
+ # Another incompatible compare
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0again, 'compare=gt' + bloom_config),
+ '/join has overlapping ranges/')
+
+ # Compare is compatible, but bloom args need to match
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0again, 'compare=le'),
+ '/join has incompatible strategy/')
+
+ # Counts need to match for bloom filters
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: self.session.join(jc, ic0again, 'compare=le,strategy=bloom,'
+ 'count=100'), '/count.* does not match previous count/')
+
+ # This succeeds
+ self.session.join(jc, ic0again, 'compare=le,strategy=bloom,count=1000')
+
+ # Need to do initial next() before getting key/values
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: jc.get_keys(),
+ '/join cursor must be advanced with next/')
+
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: jc.get_values(),
+ '/join cursor must be advanced with next/')
+
+ # Operations on the joined cursor are frozen until the join is closed.
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: ic0.next(),
+ '/index cursor is being used in a join/')
+
+ # Operations on the joined cursor are frozen until the join is closed.
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: ic0.prev(),
+ '/index cursor is being used in a join/')
+
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda: ic0.reset(),
+ '/index cursor is being used in a join/')
+
+ # Only a small number of operations allowed on a join cursor
+ self.assertRaises(wiredtiger.WiredTigerError,
+ lambda: jc.search())
+
+ self.assertRaises(wiredtiger.WiredTigerError,
+ lambda: jc.prev())
+
+ self.assertEquals(jc.next(), 0)
+ self.assertEquals(jc.next(), wiredtiger.WT_NOTFOUND)
+
+ # Only after the join cursor is closed can we use the index cursor
+ # normally
+ jc.close()
+ self.assertEquals(ic0.next(), wiredtiger.WT_NOTFOUND)
+ self.assertEquals(ic0.prev(), 0)
+
+ # common code for making sure that cursors can be
+ # implicitly closed, no matter the order they are created
+ def cursor_close_common(self, joinfirst):
+ self.session.create('table:join01', 'key_format=r' +
+ ',value_format=SS,columns=(k,v0,v1)')
+ self.session.create('index:join01:index0','columns=(v0)')
+ self.session.create('index:join01:index1','columns=(v1)')
+ c = self.session.open_cursor('table:join01', None, None)
+ for i in range(0, self.nentries):
+ c.set_key(*self.gen_key(i))
+ c.set_value(*self.gen_values(i))
+ c.insert()
+ c.close()
+
+ if joinfirst:
+ jc = self.session.open_cursor('join:table:join01', None, None)
+ c0 = self.session.open_cursor('index:join01:index0', None, None)
+ c1 = self.session.open_cursor('index:join01:index1', None, None)
+ c0.next() # index cursors must be positioned
+ c1.next()
+ if not joinfirst:
+ jc = self.session.open_cursor('join:table:join01', None, None)
+ self.session.join(jc, c0, 'compare=ge')
+ self.session.join(jc, c1, 'compare=ge')
+ self.session.close()
+ self.session = None
+
+ def test_cursor_close1(self):
+ self.cursor_close_common(True)
+
+ def test_cursor_close2(self):
+ self.cursor_close_common(False)
+
+ def test_stats(self):
+ bloomcfg1000 = ',strategy=bloom,count=1000'
+ bloomcfg10 = ',strategy=bloom,count=10'
+ self.join_common(bloomcfg1000, bloomcfg1000, False, True)
+
+ # Intentially run with an underconfigured Bloom filter,
+ # statistics should pick up some false positives.
+ self.join_common(bloomcfg10, bloomcfg10, False, True)
+
+
+if __name__ == '__main__':
+ wttest.run()
diff --git a/test/suite/test_join02.py b/test/suite/test_join02.py
new file mode 100644
index 00000000000..bf376575103
--- /dev/null
+++ b/test/suite/test_join02.py
@@ -0,0 +1,290 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2015 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+import wiredtiger, wttest, suite_random
+from wtscenario import check_scenarios, multiply_scenarios, number_scenarios
+
+# test_join02.py
+# Join operations
+# Join several indices together, trying all comparison combinations
+class test_join02(wttest.WiredTigerTestCase):
+ table_name1 = 'test_join02'
+ nentries = 1000
+
+ keyscen = [
+ ('key-r', dict(keyformat='r')),
+ ('key-S', dict(keyformat='S')),
+ ('key-i', dict(keyformat='i')),
+ ('key-iS', dict(keyformat='iS'))
+ ]
+
+ bloomscen = [
+ ('bloom', dict(usebloom=True)),
+ ('nobloom', dict(usebloom=False))
+ ]
+
+ scenarios = number_scenarios(multiply_scenarios('.', keyscen, bloomscen))
+
+ # Start our range from 1, since WT record numbers start at 1,
+ # it makes things work out nicer.
+ def range(self):
+ return range(1, self.nentries + 1)
+
+ def gen_key(self, i):
+ if self.keyformat == 'S':
+ return [ 'key%06d' % i ] # zero pad so it sorts expectedly
+ elif self.keyformat == 'iS':
+ return [ i, 'key%06d' % i ]
+ else:
+ return [ i ]
+
+ def gen_values(self, i):
+ s = str(i)
+ x = 'x' * i
+ rs = s[::-1]
+ f = int(s[0:1])
+ return [i, s, x, rs, f]
+
+ def reinit_joinconfig(self):
+ self.rand = suite_random.suite_random(self.seed)
+ self.seed += 1
+
+ def get_joinconfig(self):
+ # When we're running the bloom scenario, make it so the
+ # bloom filters are often shared. Make the number of
+ # hashes and number of bits per item so they don't always
+ # match up; WT should allow it.
+ if self.usebloom:
+ c = 10000 if (self.rand.rand32() % 3) != 0 else 100000
+ k = 8 if (self.rand.rand32() % 10) != 0 else 10
+ b = 16 if (self.rand.rand32() % 11) != 0 else 12
+ return \
+ ',strategy=bloom,count=' + str(c) + \
+ ',bloom_bit_count=' + str(b) + \
+ ',bloom_hash_count=' + str(k)
+ else:
+ return ''
+
+ def do_join(self, jc, curleft, curright, choice, mbr):
+ c0 = choice[0]
+ if c0 == None:
+ return mbr
+ # The first join cannot use a bloom filter
+ if jc.first_join:
+ joinconfig = ''
+ jc.first_join = False
+ else:
+ joinconfig = self.get_joinconfig()
+ if c0 != None:
+ #self.tty('join(jc, ' + curleft.name + ' ' + c0 +
+ # ' ' + str(curleft.low) + ')')
+ curleft.reset()
+ curleft.set_key(*curleft.low)
+ self.assertEquals(0, curleft.search())
+ self.session.join(jc, curleft, 'compare=' + c0 + joinconfig)
+ if c0 == 'eq':
+ mbr = mbr.intersection(curleft.eqmembers)
+ elif c0 == 'ge':
+ mbr = mbr.intersection(
+ set(curleft.eqmembers.union(curleft.gtmembers)))
+ elif c0 == 'gt':
+ mbr = mbr.intersection(curleft.gtmembers)
+ c1 = choice[1] if len(choice) > 1 else None
+ if c1 != None:
+ #self.tty('join(jc, ' + curright.name + ' ' + c1 +
+ # ' ' + str(curright.high) + ')')
+ curright.reset()
+ curright.set_key(*curright.high)
+ self.assertEquals(0, curright.search())
+ self.session.join(jc, curright, 'compare=' + c1 + joinconfig)
+ if c1 == 'le':
+ mbr = mbr.intersection(
+ set(curright.eqmembers.union(curright.ltmembers)))
+ elif c1 == 'lt':
+ mbr = mbr.intersection(curright.ltmembers)
+ return mbr
+
+ def iterate(self, jc, mbr):
+ #self.tty('iteration expects ' + str(len(mbr)) +
+ # ' entries: ' + str(mbr))
+ while jc.next() == 0:
+ keys = jc.get_keys()
+ [v0,v1,v2,v3,v4] = jc.get_values()
+ k0 = keys[0]
+ k1 = keys[1] if len(keys) > 1 else None
+ if self.keyformat == 'S':
+ i = int(str(k0[3:]))
+ elif self.keyformat == 'iS':
+ i = k0
+ self.assertEquals(i, int(str(k1[3:])))
+ else:
+ i = k0
+ #self.tty(' iteration got key: ' + str(k0) + ',' + str(k1))
+ #self.tty(' iteration got values: ' + str([v0,v1,v2,v3,v4]))
+ #self.tty(' iteration expects values: ' + str(self.gen_values(i)))
+ self.assertEquals(self.gen_values(i), [v0,v1,v2,v3,v4])
+ if not i in mbr:
+ self.tty(' result ' + str(i) + ' is not in: ' + str(mbr))
+ self.assertTrue(i in mbr)
+ mbr.remove(i)
+ self.assertEquals(0, len(mbr))
+
+ def mkmbr(self, expr):
+ return frozenset([x for x in self.range() if expr(x)])
+
+ def test_basic_join(self):
+ self.seed = 1
+ if self.keyformat == 'iS':
+ keycols = 'k0,k1'
+ else:
+ keycols = 'k'
+ self.session.create('table:join02', 'key_format=' + self.keyformat +
+ ',value_format=iSuSi,columns=(' + keycols +
+ ',v0,v1,v2,v3,v4)')
+ self.session.create('index:join02:index0','columns=(v0)')
+ self.session.create('index:join02:index1','columns=(v1)')
+ self.session.create('index:join02:index2','columns=(v2)')
+ self.session.create('index:join02:index3','columns=(v3)')
+ self.session.create('index:join02:index4','columns=(v4)')
+ c = self.session.open_cursor('table:join02', None, None)
+ for i in self.range():
+ c.set_key(*self.gen_key(i))
+ c.set_value(*self.gen_values(i))
+ c.insert()
+ c.close()
+
+ # Use the primary table in one of the joins.
+ c0a = self.session.open_cursor('table:join02', None, None)
+ c0b = self.session.open_cursor('table:join02', None, None)
+ c1a = self.session.open_cursor('index:join02:index1', None, None)
+ c1b = self.session.open_cursor('index:join02:index1', None, None)
+ c2a = self.session.open_cursor('index:join02:index2', None, None)
+ c2b = self.session.open_cursor('index:join02:index2', None, None)
+ c3a = self.session.open_cursor('index:join02:index3', None, None)
+ c3b = self.session.open_cursor('index:join02:index3', None, None)
+ c4a = self.session.open_cursor('index:join02:index4', None, None)
+
+ # Attach extra properties to each cursor. For cursors that
+ # may appear on the 'left' side of a range CA < x < CB,
+ # we give a low value of the range, and calculate the members
+ # of the set we expect to see for a 'gt' comparison, as well
+ # as the 'eq' comparison. For cursors that appear on the
+ # 'right side of the range, we give a high value of the range,
+ # and calculate membership sets for 'lt' and 'eq'.
+ #
+ # We've defined the low/high values so that there's a lot of
+ # overlap between the values when we're doing ranges.
+ c0a.name = 'c0a'
+ c0b.name = 'c0b'
+ if self.keyformat == 'i' or self.keyformat == 'r':
+ c0a.low = [ 205 ]
+ c0b.high = [ 990 ]
+ elif self.keyformat == 'S':
+ c0a.low = [ 'key000205' ]
+ c0b.high = [ 'key000990' ]
+ elif self.keyformat == 'iS':
+ c0a.low = [ 205, 'key000205' ]
+ c0b.high = [ 990, 'key000990' ]
+ c0a.gtmembers = self.mkmbr(lambda x: x > 205)
+ c0a.eqmembers = self.mkmbr(lambda x: x == 205)
+ c0b.ltmembers = self.mkmbr(lambda x: x < 990)
+ c0b.eqmembers = self.mkmbr(lambda x: x == 990)
+
+ c1a.low = [ '150' ]
+ c1a.gtmembers = self.mkmbr(lambda x: str(x) > '150')
+ c1a.eqmembers = self.mkmbr(lambda x: str(x) == '150')
+ c1a.name = 'c1a'
+ c1b.high = [ '733' ]
+ c1b.ltmembers = self.mkmbr(lambda x: str(x) < '733')
+ c1b.eqmembers = self.mkmbr(lambda x: str(x) == '733')
+ c1b.name = 'c1b'
+
+ c2a.low = [ 'x' * 321 ]
+ c2a.gtmembers = self.mkmbr(lambda x: x > 321)
+ c2a.eqmembers = self.mkmbr(lambda x: x == 321)
+ c2a.name = 'c2a'
+ c2b.high = [ 'x' * 765 ]
+ c2b.ltmembers = self.mkmbr(lambda x: x < 765)
+ c2b.eqmembers = self.mkmbr(lambda x: x == 765)
+ c2b.name = 'c2b'
+
+ c3a.low = [ '432' ]
+ c3a.gtmembers = self.mkmbr(lambda x: str(x)[::-1] > '432')
+ c3a.eqmembers = self.mkmbr(lambda x: str(x)[::-1] == '432')
+ c3a.name = 'c3a'
+ c3b.high = [ '876' ]
+ c3b.ltmembers = self.mkmbr(lambda x: str(x)[::-1] < '876')
+ c3b.eqmembers = self.mkmbr(lambda x: str(x)[::-1] == '876')
+ c3b.name = 'c3b'
+
+ c4a.low = [ 4 ]
+ c4a.gtmembers = self.mkmbr(lambda x: str(x)[0:1] > '4')
+ c4a.eqmembers = self.mkmbr(lambda x: str(x)[0:1] == '4')
+ c4a.name = 'c4a'
+
+ choices = [[None], ['eq'], ['ge'], ['gt'], [None, 'le'], [None, 'lt'],
+ ['ge', 'le' ], ['ge', 'lt' ], ['gt', 'le' ], ['gt', 'lt' ]]
+ smallchoices = [[None], ['eq'], ['ge'], ['gt', 'le' ]]
+ for i0 in smallchoices:
+ for i1 in choices:
+ for i2 in smallchoices:
+ for i3 in smallchoices:
+ for i4 in [[None], ['eq'], ['ge'], ['gt']]:
+ if i0[0] == None and i1[0] == None and \
+ i2[0] == None and i3[0] == None and \
+ i4[0] == None:
+ continue
+ self.reinit_joinconfig()
+ #self.tty('Begin test: ' +
+ # ','.join([str(i0),str(i1),str(i2),
+ # str(i3),str(i4)]))
+ jc = self.session.open_cursor('join:table:join02',
+ None, None)
+ jc.first_join = True
+ mbr = set(self.range())
+
+ # It shouldn't matter the order of the joins
+ mbr = self.do_join(jc, c3a, c3b, i3, mbr)
+ mbr = self.do_join(jc, c2a, c2b, i2, mbr)
+ mbr = self.do_join(jc, c4a, None, i4, mbr)
+ mbr = self.do_join(jc, c1a, c1b, i1, mbr)
+ mbr = self.do_join(jc, c0a, c0b, i0, mbr)
+ self.iterate(jc, mbr)
+ jc.close()
+ c0a.close()
+ c0b.close()
+ c1a.close()
+ c1b.close()
+ c2a.close()
+ c2b.close()
+ c3a.close()
+ c3b.close()
+ c4a.close()
+
+if __name__ == '__main__':
+ wttest.run()
diff --git a/test/suite/test_join03.py b/test/suite/test_join03.py
new file mode 100644
index 00000000000..552e3b41748
--- /dev/null
+++ b/test/suite/test_join03.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2015 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+import os
+import wiredtiger, wttest, run
+from wtscenario import check_scenarios, multiply_scenarios, number_scenarios
+
+# test_join03.py
+# Join operations
+# Joins with a custom extractor
+class test_join03(wttest.WiredTigerTestCase):
+ table_name1 = 'test_join03'
+ nentries = 100
+
+ # Return the wiredtiger_open extension argument for a shared library.
+ def extensionArg(self, exts):
+ extfiles = []
+ for ext in exts:
+ (dirname, name, libname) = ext
+ if name != None and name != 'none':
+ testdir = os.path.dirname(__file__)
+ extdir = os.path.join(run.wt_builddir, 'ext', dirname)
+ extfile = os.path.join(
+ extdir, name, '.libs', 'libwiredtiger_' + libname + '.so')
+ if not os.path.exists(extfile):
+ self.skipTest('extension "' + extfile + '" not built')
+ if not extfile in extfiles:
+ extfiles.append(extfile)
+ if len(extfiles) == 0:
+ return ''
+ else:
+ return ',extensions=["' + '","'.join(extfiles) + '"]'
+
+ # Override WiredTigerTestCase, we have extensions.
+ def setUpConnectionOpen(self, dir):
+ extarg = self.extensionArg([('extractors', 'csv', 'csv_extractor')])
+ connarg = 'create,error_prefix="{0}: ",{1}'.format(
+ self.shortid(), extarg)
+ conn = wiredtiger.wiredtiger_open(dir, connarg)
+ self.pr(`conn`)
+ return conn
+
+ def gen_key(self, i):
+ return [ i + 1 ]
+
+ def gen_values(self, i):
+ s = str(i)
+ rs = s[::-1].lstrip('0')
+ return [ s + ',' + rs ]
+
+ # Common function for testing iteration of join cursors
+ def iter_common(self, jc):
+ mbr = set([62, 63, 72, 73, 82, 83, 92, 93])
+ while jc.next() == 0:
+ [k] = jc.get_keys()
+ i = k - 1
+ [v] = jc.get_values()
+ self.assertEquals(self.gen_values(i), [v])
+ if not i in mbr:
+ self.tty(' result ' + str(i) + ' is not in: ' + str(mbr))
+ self.assertTrue(i in mbr)
+ mbr.remove(i)
+ self.assertEquals(0, len(mbr))
+
+ # Common function for testing the most basic functionality
+ # of joins
+ def join(self, csvformat, args0, args1):
+ self.session.create('table:join03', 'key_format=r' +
+ ',value_format=S,columns=(k,v)')
+ fmt = csvformat[0]
+ self.session.create('index:join03:index0','key_format=' + fmt + ',' +
+ 'extractor=csv,app_metadata={"format" : "' +
+ fmt + '","field" : "0"}')
+ fmt = csvformat[1]
+ self.session.create('index:join03:index1','key_format=' + fmt + ',' +
+ 'extractor=csv,app_metadata={"format" : "' +
+ fmt + '","field" : "1"}')
+
+ c = self.session.open_cursor('table:join03', None, None)
+ for i in range(0, self.nentries):
+ c.set_key(*self.gen_key(i))
+ c.set_value(*self.gen_values(i))
+ c.insert()
+ c.close()
+
+ jc = self.session.open_cursor('join:table:join03', None, None)
+
+ # All the numbers 0-99 whose string representation
+ # sort >= '60' and whose reverse string representation
+ # is in '20' < x < '40'. That is: [62, 63, 72, 73, 82, 83, 92, 93]
+ c0 = self.session.open_cursor('index:join03:index0', None, None)
+ if csvformat[0] == 'S':
+ c0.set_key('60')
+ else:
+ c0.set_key(60)
+ self.assertEquals(0, c0.search())
+ self.session.join(jc, c0, 'compare=ge' + args0)
+
+ c1a = self.session.open_cursor('index:join03:index1', None, None)
+ if csvformat[1] == 'S':
+ c1a.set_key('21')
+ else:
+ c1a.set_key(21)
+ self.assertEquals(0, c1a.search())
+ self.session.join(jc, c1a, 'compare=gt' + args1)
+
+ c1b = self.session.open_cursor('index:join03:index1', None, None)
+ if csvformat[1] == 'S':
+ c1b.set_key('41')
+ else:
+ c1b.set_key(41)
+ self.assertEquals(0, c1b.search())
+ self.session.join(jc, c1b, 'compare=lt' + args1)
+
+ # Iterate, and make sure that reset allows us to iterate again.
+ self.iter_common(jc)
+
+ jc.close()
+ c1a.close()
+ c1b.close()
+ c0.close()
+ self.session.drop('table:join03')
+
+ # Test joins using CSV fields that are interpreted as different types
+ # to make sure all the extractor plumbing used in joins is working.
+ def test_join(self):
+ for extraargs in [ '', ',strategy=bloom,count=1000' ]:
+ for csvformat in [ 'SS', 'ii', 'Si', 'iS' ]:
+ self.join(csvformat, '', extraargs)
+
+
+if __name__ == '__main__':
+ wttest.run()
diff --git a/test/suite/test_schema05.py b/test/suite/test_schema05.py
index 2a7bc042c80..b0562f7983c 100644
--- a/test/suite/test_schema05.py
+++ b/test/suite/test_schema05.py
@@ -89,9 +89,10 @@ class test_schema05(wttest.WiredTigerTestCase):
# Create self.nindices index files, each with a column from the CSV
for i in range(0, self.nindices):
si = str(i)
- self.session.create("index:schema05:x" + si,
- "key_format=S,columns=(key),"
- "extractor=csv,app_metadata=" + si)
+ self.session.create('index:schema05:x' + si,
+ 'key_format=S,columns=(key),'
+ 'extractor=csv,app_metadata={"format" : "S",' +
+ '"field" : "' + si + '"}')
def drop_indices(self):
for i in range(0, self.nindices):