From 4d204910b2cfbdb01a6d79be2a88cf04c3a58f44 Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Tue, 3 Nov 2015 13:22:11 -0500 Subject: WT-1315. Added support for join cursors: - "join:" URI handling added to WT_SESSION::open_cursor, added WT_SESSION::join. - Added support for in-memory bloom filters. - Minor fix to cur_dump_close CURSOR_API_CALL. - Some refactoring in __wt_{curindex,curtable}_get_value() to allow them to be called internally. - Use new macro JOINABLE_CURSOR_API_CALL() instead of CURSOR_API_CALL() to trap illegal calls to joined cursor. - __wt_curtable_open now takes an owning cursor arg. - Added __wt_struct_unpack_size() and __wt_struct_repack() --- build_win/filelist.win | 1 + dist/api_data.py | 27 ++ dist/filelist | 1 + dist/s_string.ok | 11 + src/bloom/bloom.c | 41 ++ src/config/config_def.c | 18 + src/cursor/cur_dump.c | 2 +- src/cursor/cur_index.c | 54 ++- src/cursor/cur_join.c | 999 ++++++++++++++++++++++++++++++++++++++++++++++ src/cursor/cur_table.c | 48 +-- src/include/api.h | 5 + src/include/config.h | 45 ++- src/include/cursor.h | 53 +++ src/include/cursor.i | 64 +++ src/include/extern.h | 9 +- src/include/wiredtiger.in | 66 ++- src/include/wt_internal.h | 8 + src/packing/pack_impl.c | 105 +++++ src/session/session_api.c | 126 +++++- 19 files changed, 1592 insertions(+), 91 deletions(-) create mode 100644 src/cursor/cur_join.c diff --git a/build_win/filelist.win b/build_win/filelist.win index 9d0ee10d305..af6ddf98da9 100644 --- a/build_win/filelist.win +++ b/build_win/filelist.win @@ -72,6 +72,7 @@ src/cursor/cur_ds.c src/cursor/cur_dump.c src/cursor/cur_file.c src/cursor/cur_index.c +src/cursor/cur_join.c src/cursor/cur_json.c src/cursor/cur_log.c src/cursor/cur_metadata.c diff --git a/dist/api_data.py b/dist/api_data.py index 6fd7dcd0093..024bea25545 100644 --- a/dist/api_data.py +++ b/dist/api_data.py @@ -731,6 +731,33 @@ methods = { type='boolean'), ]), +'WT_SESSION.join' : Method([ + Config('compare', '"eq"', r''' + modifies the set of items to be returned so that the index key + satisfies the given comparison relative to the key set in this + cursor''', + choices=['eq', 'ge', 'gt', 'le', 'lt']), + Config('count', '', r''' + set an approximate count of the elements that would be included in + the join. This is used in sizing the bloom filter, and also influences + evaluation order for cursors in the join. When the count is equal + for multiple bloom filters in a composition of joins, the bloom + filter may be shared''', + type='int'), + Config('bloom_bit_count', '16', r''' + the number of bits used per item for the bloom filter''', + min='2', max='1000'), + Config('bloom_hash_count', '8', r''' + the number of hash values per item for the bloom filter''', + min='2', max='100'), + Config('strategy', '', r''' + when set to bloom, a bloom filter is created and populated for + this index. This has an up front cost but may reduce the number + of accesses to the main table when iterating the joined cursor. + The bloom setting requires that count be set''', + choices=['bloom', 'default']), +]), + 'WT_SESSION.log_flush' : Method([ Config('sync', 'on', r''' forcibly flush the log and wait for it to achieve the synchronization diff --git a/dist/filelist b/dist/filelist index f33f0e9a962..52af87c2a68 100644 --- a/dist/filelist +++ b/dist/filelist @@ -72,6 +72,7 @@ src/cursor/cur_ds.c src/cursor/cur_dump.c src/cursor/cur_file.c src/cursor/cur_index.c +src/cursor/cur_join.c src/cursor/cur_json.c src/cursor/cur_log.c src/cursor/cur_metadata.c diff --git a/dist/s_string.ok b/dist/s_string.ok index d234a3c101f..90759008ccd 100644 --- a/dist/s_string.ok +++ b/dist/s_string.ok @@ -220,6 +220,7 @@ OUTBUFF OVFL ObWgfvgw Obama +Outfmt PARAM POSIX PREDEFINE @@ -350,6 +351,7 @@ allocfile allocsize amd ao +ap api arg argc @@ -420,6 +422,7 @@ checksums chk chongo cip +cjoin ckpt ckptfrag ckptlist @@ -463,6 +466,7 @@ curdump curextract curfile curindex +curjoin curlog curmetadata cursoring @@ -539,6 +543,7 @@ enqueue enqueued env eof +eq equalp errhandler errno @@ -590,6 +595,7 @@ ftruncate func gcc gdb +ge getenv getline getone @@ -605,6 +611,7 @@ goesc gostring gostruct goutf +gt hashval havesize hdr @@ -630,6 +637,7 @@ indirects indx infeasible inflateInit +infmt init initn initsize @@ -648,6 +656,7 @@ io ip islocked ispo +iter iteratively jnr jrx @@ -666,6 +675,7 @@ latencies lbrace lbracket ld +le len lenp level's @@ -713,6 +723,7 @@ memalign membar memcpy memmove +memget memset memsize metaconf diff --git a/src/bloom/bloom.c b/src/bloom/bloom.c index 9225b9fe3b5..e3a21f25dc1 100644 --- a/src/bloom/bloom.c +++ b/src/bloom/bloom.c @@ -313,6 +313,47 @@ __wt_bloom_get(WT_BLOOM *bloom, WT_ITEM *key) return (__wt_bloom_hash_get(bloom, &bhash)); } +/* + * __wt_bloom_inmem_get -- + * Tests whether the given key is in the Bloom filter. + * This can be used in place of __wt_bloom_get + * for Bloom filters that are memory only. + */ +int +__wt_bloom_inmem_get(WT_BLOOM *bloom, WT_ITEM *key) +{ + uint64_t h1, h2; + uint32_t i; + + h1 = __wt_hash_fnv64(key->data, key->size); + h2 = __wt_hash_city64(key->data, key->size); + for (i = 0; i < bloom->k; i++, h1 += h2) { + if (!__bit_test(bloom->bitstring, h1 % bloom->m)) + return (WT_NOTFOUND); + } + return (0); +} + +/* + * __wt_bloom_intersection -- + * Modify the Bloom filter to contain the intersection of this + * filter with another. + */ +int +__wt_bloom_intersection(WT_BLOOM *bloom, WT_BLOOM *other) +{ + uint64_t i, nbytes; + + if (bloom->k != other->k || bloom->factor != other->factor || + bloom->m != other->m || bloom->n != other->n) + return (EINVAL); + + nbytes = __bitstr_size(bloom->m); + for (i = 0; i < nbytes; i++) + bloom->bitstring[i] &= other->bitstring[i]; + return (0); +} + /* * __wt_bloom_close -- * Close the Bloom filter, release any resources. diff --git a/src/config/config_def.c b/src/config/config_def.c index 419f4124133..081ab1e35a8 100644 --- a/src/config/config_def.c +++ b/src/config/config_def.c @@ -295,6 +295,19 @@ static const WT_CONFIG_CHECK confchk_WT_SESSION_drop[] = { { NULL, NULL, NULL, NULL, NULL, 0 } }; +static const WT_CONFIG_CHECK confchk_WT_SESSION_join[] = { + { "bloom_bit_count", "int", NULL, "min=2,max=1000", NULL, 0 }, + { "bloom_hash_count", "int", NULL, "min=2,max=100", NULL, 0 }, + { "compare", "string", + NULL, "choices=[\"eq\",\"ge\",\"gt\",\"le\",\"lt\"]", + NULL, 0 }, + { "count", "int", NULL, NULL, NULL, 0 }, + { "strategy", "string", + NULL, "choices=[\"bloom\",\"default\"]", + NULL, 0 }, + { NULL, NULL, NULL, NULL, NULL, 0 } +}; + static const WT_CONFIG_CHECK confchk_WT_SESSION_log_flush[] = { { "sync", "string", NULL, "choices=[\"background\",\"off\",\"on\"]", @@ -877,6 +890,11 @@ static const WT_CONFIG_ENTRY config_entries[] = { "force=0,remove_files=", confchk_WT_SESSION_drop, 2 }, + { "WT_SESSION.join", + "bloom_bit_count=16,bloom_hash_count=8,compare=\"eq\",count=," + "strategy=", + confchk_WT_SESSION_join, 5 + }, { "WT_SESSION.log_flush", "sync=on", confchk_WT_SESSION_log_flush, 1 diff --git a/src/cursor/cur_dump.c b/src/cursor/cur_dump.c index 6c11c4b407e..e5799fbad05 100644 --- a/src/cursor/cur_dump.c +++ b/src/cursor/cur_dump.c @@ -329,7 +329,7 @@ __curdump_close(WT_CURSOR *cursor) cdump = (WT_CURSOR_DUMP *)cursor; child = cdump->child; - CURSOR_API_CALL(cursor, session, get_key, NULL); + CURSOR_API_CALL(cursor, session, close, NULL); if (child != NULL) WT_TRET(child->close(child)); /* We shared the child's URI. */ diff --git a/src/cursor/cur_index.c b/src/cursor/cur_index.c index fd2a6cd7480..5819bb04cf2 100644 --- a/src/cursor/cur_index.c +++ b/src/cursor/cur_index.c @@ -8,6 +8,20 @@ #include "wt_internal.h" + /* + * __wt_curindex_joined -- + * Produce an error that this cursor is being used in a join call. + */ +int +__wt_curindex_joined(WT_CURSOR *cursor) +{ + WT_SESSION_IMPL *session; + + session = (WT_SESSION_IMPL *)cursor->session; + __wt_errx(session, "index cursor is being used in a join"); + return (ENOTSUP); +} + /* * __curindex_get_value -- * WT_CURSOR->get_value implementation for index cursors. @@ -15,32 +29,16 @@ static int __curindex_get_value(WT_CURSOR *cursor, ...) { - WT_CURSOR_INDEX *cindex; WT_DECL_RET; - WT_ITEM *item; WT_SESSION_IMPL *session; va_list ap; - cindex = (WT_CURSOR_INDEX *)cursor; - CURSOR_API_CALL(cursor, session, get_value, NULL); - WT_CURSOR_NEEDVALUE(cursor); - va_start(ap, cursor); - if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) { - ret = __wt_schema_project_merge(session, - cindex->cg_cursors, cindex->value_plan, - cursor->value_format, &cursor->value); - if (ret == 0) { - item = va_arg(ap, WT_ITEM *); - item->data = cursor->value.data; - item->size = cursor->value.size; - } - } else - ret = __wt_schema_project_out(session, - cindex->cg_cursors, cindex->value_plan, ap); - va_end(ap); + JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL); + WT_ERR(__wt_curindex_get_value_ap(cursor, ap)); -err: API_END_RET(session, ret); +err: va_end(ap); + API_END_RET(session, ret); } /* @@ -53,7 +51,7 @@ __curindex_set_value(WT_CURSOR *cursor, ...) WT_DECL_RET; WT_SESSION_IMPL *session; - CURSOR_API_CALL(cursor, session, set_value, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, set_value, NULL); ret = ENOTSUP; err: cursor->saved_err = ret; F_CLR(cursor, WT_CURSTD_VALUE_SET); @@ -72,7 +70,7 @@ __curindex_compare(WT_CURSOR *a, WT_CURSOR *b, int *cmpp) WT_SESSION_IMPL *session; cindex = (WT_CURSOR_INDEX *)a; - CURSOR_API_CALL(a, session, compare, NULL); + JOINABLE_CURSOR_API_CALL(a, session, compare, NULL); /* Check both cursors are "index:" type. */ if (!WT_PREFIX_MATCH(a->uri, "index:") || @@ -150,7 +148,7 @@ __curindex_next(WT_CURSOR *cursor) WT_SESSION_IMPL *session; cindex = (WT_CURSOR_INDEX *)cursor; - CURSOR_API_CALL(cursor, session, next, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL); F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); if ((ret = cindex->child->next(cindex->child)) == 0) @@ -171,7 +169,7 @@ __curindex_prev(WT_CURSOR *cursor) WT_SESSION_IMPL *session; cindex = (WT_CURSOR_INDEX *)cursor; - CURSOR_API_CALL(cursor, session, prev, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, prev, NULL); F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); if ((ret = cindex->child->prev(cindex->child)) == 0) @@ -194,7 +192,7 @@ __curindex_reset(WT_CURSOR *cursor) u_int i; cindex = (WT_CURSOR_INDEX *)cursor; - CURSOR_API_CALL(cursor, session, reset, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL); F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); WT_TRET(cindex->child->reset(cindex->child)); @@ -225,7 +223,7 @@ __curindex_search(WT_CURSOR *cursor) cindex = (WT_CURSOR_INDEX *)cursor; child = cindex->child; - CURSOR_API_CALL(cursor, session, search, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, search, NULL); /* * We are searching using the application-specified key, which @@ -284,7 +282,7 @@ __curindex_search_near(WT_CURSOR *cursor, int *exact) WT_SESSION_IMPL *session; cindex = (WT_CURSOR_INDEX *)cursor; - CURSOR_API_CALL(cursor, session, search_near, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, search_near, NULL); __wt_cursor_set_raw_key(cindex->child, &cursor->key); if ((ret = cindex->child->search_near(cindex->child, exact)) == 0) ret = __curindex_move(cindex); @@ -311,7 +309,7 @@ __curindex_close(WT_CURSOR *cursor) cindex = (WT_CURSOR_INDEX *)cursor; idx = cindex->index; - CURSOR_API_CALL(cursor, session, close, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL); if ((cp = cindex->cg_cursors) != NULL) for (i = 0, cp = cindex->cg_cursors; diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c new file mode 100644 index 00000000000..fdc741d0dbd --- /dev/null +++ b/src/cursor/cur_join.c @@ -0,0 +1,999 @@ +/*- + * Copyright (c) 2014-2015 MongoDB, Inc. + * Copyright (c) 2008-2014 WiredTiger, Inc. + * All rights reserved. + * + * See the file LICENSE for redistribution information. + */ + +#include "wt_internal.h" + +/* + * __curjoin_entry_iter_init -- + * Initialize an iteration for the index managed by a join entry. + * + */ +static int +__curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ITER **iterp) +{ + WT_CURSOR *newcur; + WT_CURSOR *to_dup; + WT_DECL_RET; + const char *raw_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), "raw", NULL }; + const char *def_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), NULL }; + const char *uri; + const char **config; + char *uribuf; + WT_CURSOR_JOIN_ITER *iter; + size_t size; + + iter = NULL; + uribuf = NULL; + if (entry->ends[0].cursor != NULL) + to_dup = entry->ends[0].cursor; + else + to_dup = entry->ends[1].cursor; + + uri = to_dup->uri; + if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW)) + config = &raw_cfg[0]; + else + config = &def_cfg[0]; + + if (cjoin->projection != NULL) { + size = strlen(uri) + strlen(cjoin->projection) + 1; + WT_ERR(__wt_calloc(session, size, 1, &uribuf)); + snprintf(uribuf, size, "%s%s", uri, cjoin->projection); + uri = uribuf; + } + WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, config, + &newcur)); + WT_ERR(__wt_cursor_dup_position(to_dup, newcur)); + WT_ERR(__wt_calloc_one(session, &iter)); + iter->cjoin = cjoin; + iter->session = session; + iter->entry = entry; + iter->cursor = newcur; + iter->advance = false; + *iterp = iter; + + if (0) { +err: __wt_free(session, iter); + } + __wt_free(session, uribuf); + return (ret); +} + +/* + * __curjoin_pack_recno -- + * Pack the given recno into a buffer; prepare an item referencing it. + * + */ +static int +__curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf, + size_t bufsize, WT_ITEM *item) +{ + WT_DECL_RET; + WT_SESSION *wtsession; + size_t sz; + + wtsession = (WT_SESSION *)session; + WT_ERR(wiredtiger_struct_size(wtsession, &sz, "r", r)); + WT_ASSERT(session, sz < bufsize); + WT_ERR(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r)); + item->size = sz; + item->data = buf; + +err: return (ret); +} + +/* + * __curjoin_entry_iter_next -- + * Get the next item in an iteration. + * + */ +static int +__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey, + uint64_t *rp) +{ + WT_CURSOR *firstcg_cur; + WT_CURSOR_JOIN *cjoin; + WT_DECL_RET; + uint64_t r; + + if (iter->advance) + WT_ERR(iter->cursor->next(iter->cursor)); + else + iter->advance = true; + + cjoin = iter->cjoin; + + /* + * Set our key to the primary key, we'll also need this + * to check membership. + */ + if (iter->entry->index != NULL) + firstcg_cur = ((WT_CURSOR_INDEX *)iter->cursor)->cg_cursors[0]; + else + firstcg_cur = ((WT_CURSOR_TABLE *)iter->cursor)->cg_cursors[0]; + if (WT_CURSOR_RECNO(&cjoin->iface)) { + r = *(uint64_t *)firstcg_cur->key.data; + WT_ERR(__curjoin_pack_recno(iter->session, r, cjoin->recno_buf, + sizeof(cjoin->recno_buf), primkey)); + *rp = r; + } else { + WT_ITEM_SET(*primkey, firstcg_cur->key); + *rp = 0; + } + iter->curkey = primkey; + +err: return (ret); +} + +/* + * __curjoin_entry_iter_reset -- + * Reset an iteration to the starting point. + * + */ +static int +__curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter) +{ + WT_CURSOR *to_dup; + WT_CURSOR_JOIN_ENTRY *entry; + WT_DECL_RET; + + if (iter->advance) { + WT_ERR(iter->cursor->reset(iter->cursor)); + entry = &iter->cjoin->entries[0]; + if (entry->ends[0].cursor != NULL) + to_dup = entry->ends[0].cursor; + else + to_dup = entry->ends[1].cursor; + WT_ERR(__wt_cursor_dup_position(to_dup, iter->cursor)); + iter->advance = false; + } + +err: return (ret); +} + +/* + * __curjoin_entry_iter_ready -- + * The iterator is positioned. + * + */ +static bool +__curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter) +{ + return (iter->advance); +} + +/* + * __curjoin_entry_iter_close -- + * Close the iteration, release resources. + * + */ +static int +__curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *iter) +{ + if (iter->cursor != NULL) + return (iter->cursor->close(iter->cursor)); + else + return (0); + __wt_free(iter->session, iter); +} + +/* + * __curjoin_get_key -- + * WT_CURSOR->get_key for join cursors. + */ +static int +__curjoin_get_key(WT_CURSOR *cursor, ...) +{ + WT_CURSOR_JOIN *cjoin; + WT_DECL_RET; + WT_SESSION_IMPL *session; + va_list ap; + + cjoin = (WT_CURSOR_JOIN *)cursor; + + va_start(ap, cursor); + CURSOR_API_CALL(cursor, session, get_key, NULL); + + if (!F_ISSET(cjoin, WT_CJ_INITIALIZED) || + !__curjoin_entry_iter_ready(cjoin->iter)) { + __wt_errx(session, "join cursor must be advanced with next()"); + WT_ERR(EINVAL); + } + WT_ERR(__wt_cursor_get_keyv(cursor, cursor->flags, ap)); + +err: va_end(ap); + API_END_RET(session, ret); +} + +/* + * __curjoin_get_value -- + * WT_CURSOR->get_value for join cursors. + */ +static int +__curjoin_get_value(WT_CURSOR *cursor, ...) +{ + WT_CURSOR_JOIN *cjoin; + WT_CURSOR_JOIN_ITER *iter; + WT_DECL_RET; + WT_SESSION_IMPL *session; + va_list ap; + + cjoin = (WT_CURSOR_JOIN *)cursor; + iter = cjoin->iter; + + va_start(ap, cursor); + CURSOR_API_CALL(cursor, session, get_value, NULL); + + if (!F_ISSET(cjoin, WT_CJ_INITIALIZED) || + !__curjoin_entry_iter_ready(iter)) { + __wt_errx(session, "join cursor must be advanced with next()"); + WT_ERR(EINVAL); + } + if (iter->entry->index != NULL) + WT_ERR(__wt_curindex_get_value_ap(iter->cursor, ap)); + else + WT_ERR(__wt_curtable_get_value_ap(iter->cursor, ap)); + +err: va_end(ap); + API_END_RET(session, ret); +} + +/* + * __curjoin_init_bloom -- + * Populate Bloom filters + */ +static int +__curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_CURSOR_JOIN_ENTRY *entry, WT_BLOOM *bloom) +{ + WT_COLLATOR *collator; + WT_CURSOR *c; + WT_CURSOR_INDEX *cindex; + WT_DECL_RET; + WT_ITEM curkey, curvalue, *k; + WT_TABLE *maintable; + bool skip_left; + char *uri; + const char *raw_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), "raw", NULL }; + const char *mainkey_str, *p; + const void *buf; + int cmp, mainkey_len; + size_t size; + u_int i; + void *allocbuf; + + c = NULL; + buf = NULL; + allocbuf = NULL; + + if (entry->index != NULL) { + /* + * Open a cursor having a projection of the keys of the + * index we're comparing against. Open it raw, we're + * going to compare it to the raw keys of the + * reference cursors. + */ + maintable = ((WT_CURSOR_TABLE *)entry->main)->table; + mainkey_str = maintable->colconf.str + 1; + for (p = mainkey_str, i = 0; + p != NULL && i < maintable->nkey_columns; i++) + p = strchr(p + 1, ','); + WT_ASSERT(session, p != 0); + mainkey_len = p - mainkey_str; + size = strlen(entry->index->name) + mainkey_len + 3; + WT_ERR(__wt_calloc(session, size, 1, &uri)); + snprintf(uri, size, "%s(%.*s)", entry->index->name, + (int)mainkey_len, mainkey_str); + } else { + /* + * For joins on the main table, we just need the primary + * key for comparison, we don't need any values. + */ + size = strlen(cjoin->table->name) + 3; + WT_ERR(__wt_calloc(session, size, 1, &uri)); + snprintf(uri, size, "%s()", cjoin->table->name); + } + WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, raw_cfg, &c)); + if (entry->ends[0].cursor != NULL) + WT_ERR(__wt_cursor_dup_position(entry->ends[0].cursor, c)); + + skip_left = (entry->ends[0].cursor == NULL) || + entry->ends[0].flags == (WT_CJE_ENDPOINT_GT | WT_CJE_ENDPOINT_EQ); + collator = (entry->index == NULL) ? NULL : entry->index->collator; + while (ret == 0) { + c->get_key(c, &curkey); + if (entry->index != NULL) { + cindex = (WT_CURSOR_INDEX *)c; + if (cindex->index->extractor == NULL) { + /* + * Repack so it's comparable to the + * reference endpoints. + */ + k = &cindex->child->key; + WT_ERR(__wt_struct_repack(session, + cindex->child->key_format, + entry->main->value_format, k, &curkey, + &allocbuf)); + } else + curkey = cindex->child->key; + } + if (!skip_left) { + WT_ERR(__wt_compare(session, collator, &curkey, + &entry->ends[0].key, &cmp)); + if (cmp < 0 || (cmp == 0 && + !F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_EQ))) + goto advance; + if (cmp > 0) { + if (F_ISSET(&entry->ends[0], + WT_CJE_ENDPOINT_GT)) + skip_left = true; + else + break; + } + } + if (entry->ends[1].cursor != NULL) { + WT_ERR(__wt_compare(session, collator, &curkey, + &entry->ends[1].key, &cmp)); + if (cmp > 0 || (cmp == 0 && + !F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_EQ))) + break; + } + if (entry->index != NULL) + c->get_value(c, &curvalue); + else + c->get_key(c, &curvalue); + WT_ERR(__wt_bloom_insert(bloom, &curvalue)); +advance: + if ((ret = c->next(c)) == WT_NOTFOUND) + break; + } + WT_ERR_NOTFOUND_OK(ret); + +err: if (c != NULL) + WT_TRET(c->close(c)); + __wt_free(session, allocbuf); + return (ret); +} + +/* + * __curjoin_endpoint_init_key -- + * Set the key in the reference endpoint. + */ +static int +__curjoin_endpoint_init_key(WT_SESSION_IMPL *session, + WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ENDPOINT *endpoint) +{ + WT_CURSOR *cursor; + WT_CURSOR_INDEX *cindex; + WT_DECL_RET; + WT_ITEM *k; + uint64_t r; + void *allocbuf; + + allocbuf = NULL; + if ((cursor = endpoint->cursor) != NULL) { + if (entry->index != NULL) { + cindex = (WT_CURSOR_INDEX *)endpoint->cursor; + if (cindex->index->extractor == NULL) { + WT_ERR(__wt_struct_repack(session, + cindex->child->key_format, + entry->main->value_format, + &cindex->child->key, &endpoint->key, + &allocbuf)); + if (allocbuf != NULL) + F_SET(endpoint, WT_CJE_ENDPOINT_OWNKEY); + } else + endpoint->key = cindex->child->key; + } else { + k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key; + if (WT_CURSOR_RECNO(cursor)) { + r = *(uint64_t *)k->data; + WT_ERR(__curjoin_pack_recno(session, r, + endpoint->recno_buf, + sizeof(endpoint->recno_buf), + &endpoint->key)); + } + else + endpoint->key = *k; + } + } + if (0) { +err: __wt_free(session, allocbuf); + } + return (ret); +} + +/* + * __curjoin_init_iter -- + * Initialize before any iteration. + */ +static int +__curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) +{ + WT_BLOOM *bloom; + WT_DECL_RET; + WT_CURSOR *to_dup; + WT_CURSOR_JOIN_ENTRY *je, *jeend, *je2; + uint64_t k, m; + + if (cjoin->entries_next == 0) { + __wt_errx(session, "join cursor has not yet been joined " + "with any other cursors"); + WT_ERR(EINVAL); + } + + je = &cjoin->entries[0]; + WT_ERR(__curjoin_entry_iter_init(session, cjoin, je, &cjoin->iter)); + if (je->ends[0].cursor != NULL) + to_dup = je->ends[0].cursor; + else + to_dup = je->ends[1].cursor; + + jeend = &cjoin->entries[cjoin->entries_next]; + for (je = cjoin->entries; je < jeend; je++) { + WT_ERR(__curjoin_endpoint_init_key(session, je, + &je->ends[0])); + WT_ERR(__curjoin_endpoint_init_key(session, je, + &je->ends[1])); + + /* + * The first entry is iterated as the 'outermost' cursor. + * For the common GE case, we don't have to test against + * the left reference key, we know it will be true since + * the btree is ordered. + */ + if (je == cjoin->entries && je->ends[0].flags == + (WT_CJE_ENDPOINT_GT | WT_CJE_ENDPOINT_EQ)) + F_SET(cjoin, WT_CJ_SKIP_FIRST_LEFT); + + if (F_ISSET(je, WT_CJE_BLOOM)) { + if (je->bloom == NULL) { + /* + * Look for compatible filters to be shared, + * pick compatible numbers for bit counts + * and number of hashes. + */ + m = je->bloom_bit_count; + k = je->bloom_hash_count; + for (je2 = je + 1; je2 < jeend; je2++) + if (F_ISSET(je2, WT_CJE_BLOOM) && + je2->count == je->count) { + m = WT_MAX( + je2->bloom_bit_count, m); + k = WT_MAX( + je2->bloom_hash_count, k); + } + je->bloom_bit_count = m; + je->bloom_hash_count = k; + WT_ERR(__wt_bloom_create(session, NULL, + NULL, je->count, m, k, &je->bloom)); + F_SET(je, WT_CJE_OWN_BLOOM); + WT_ERR(__curjoin_init_bloom(session, cjoin, + je, je->bloom)); + /* + * Share the Bloom filter, making all + * config info consistent. + */ + for (je2 = je + 1; je2 < jeend; je2++) + if (F_ISSET(je2, WT_CJE_BLOOM) && + je2->count == je->count) { + WT_ASSERT(session, + je2->bloom == NULL); + je2->bloom = je->bloom; + je2->bloom_bit_count = m; + je2->bloom_hash_count = k; + } + } else { + /* + * Create a temporary filter that we'll + * merge into the shared one. The Bloom + * parameters of the two filters must match. + */ + WT_ERR(__wt_bloom_create(session, NULL, + NULL, je->count, je->bloom_bit_count, + je->bloom_hash_count, &bloom)); + WT_ERR(__curjoin_init_bloom(session, cjoin, + je, bloom)); + WT_ERR(__wt_bloom_intersection(je->bloom, + bloom)); + WT_ERR(__wt_bloom_close(bloom)); + } + } + } + F_SET(cjoin, WT_CJ_INITIALIZED); + +err: + return (ret); +} + +typedef struct { + WT_CURSOR iface; + WT_CURSOR_JOIN_ENTRY *entry; + int ismember; +} WT_CURJOIN_EXTRACTOR; + +/* + * __curjoin_entry_in_range -- + * Check if a key is in the range specified by the entry. + * Return WT_NOTFOUND if not. + */ +static int +__curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, + WT_ITEM *curkey, bool skip_left) +{ + WT_COLLATOR *collator; + WT_DECL_RET; + int cmp; + + collator = (entry->index != NULL) ? entry->index->collator : NULL; + if (!skip_left && entry->ends[0].cursor != NULL) { + WT_ERR(__wt_compare(session, collator, curkey, + &entry->ends[0].key, &cmp)); + if (cmp < 0 || + (cmp == 0 && + !F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_EQ)) || + (cmp > 0 && !F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_GT))) + WT_ERR(WT_NOTFOUND); + } + if (entry->ends[1].cursor != NULL) { + WT_ERR(__wt_compare(session, collator, curkey, + &entry->ends[1].key, &cmp)); + if (cmp > 0 || + (cmp == 0 && + !F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_EQ)) || + (cmp < 0 && !F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_LT))) + WT_ERR(WT_NOTFOUND); + } + +err: return (ret); +} + +/* + * __curjoin_extract_insert -- + * Handle a key produced by a custom extractor. + */ +static int +__curjoin_extract_insert(WT_CURSOR *cursor) { + WT_CURJOIN_EXTRACTOR *cextract; + WT_DECL_RET; + WT_ITEM ikey; + WT_SESSION_IMPL *session; + + cextract = (WT_CURJOIN_EXTRACTOR *)cursor; + /* + * This insert method may be called multiple times during a single + * extraction. If we already have a definitive answer to the + * membership question, exit early. + */ + if (cextract->ismember) + return (0); + + session = (WT_SESSION_IMPL *)cursor->session; + + WT_ITEM_SET(ikey, cursor->key); + /* + * We appended a padding byte to the key to avoid rewriting the last + * column. Strip that away here. + */ + WT_ASSERT(session, ikey.size > 0); + --ikey.size; + + ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, 0); + if (ret == WT_NOTFOUND) { + cextract->ismember = 0; + ret = 0; + } else + cextract->ismember = 1; + + return (ret); +} + +/* + * __curjoin_entry_member -- + * Do a membership check for a particular index that was joined, + * if not a member, returns WT_NOTFOUND. + */ +static int +__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_CURSOR_JOIN_ENTRY *entry, bool skip_left) +{ + WT_CURJOIN_EXTRACTOR extract_cursor; + WT_CURSOR *main; + WT_CURSOR_STATIC_INIT(iface, + __wt_cursor_get_key, /* get-key */ + __wt_cursor_get_value, /* get-value */ + __wt_cursor_set_key, /* set-key */ + __wt_cursor_set_value, /* set-value */ + __wt_cursor_notsup, /* compare */ + __wt_cursor_notsup, /* equals */ + __wt_cursor_notsup, /* next */ + __wt_cursor_notsup, /* prev */ + __wt_cursor_notsup, /* reset */ + __wt_cursor_notsup, /* search */ + __wt_cursor_notsup, /* search-near */ + __curjoin_extract_insert, /* insert */ + __wt_cursor_notsup, /* update */ + __wt_cursor_notsup, /* reconfigure */ + __wt_cursor_notsup, /* remove */ + __wt_cursor_notsup); /* close */ + WT_DECL_RET; + WT_INDEX *index; + WT_SESSION *wtsession; + WT_ITEM *key, v; + + wtsession = (WT_SESSION *)session; + key = cjoin->iter->curkey; + + if (entry->bloom != NULL) { + /* + * If we don't own the Bloom filter, we must be sharing one + * in a previous entry. So the shared filter has already + * been checked and passed. + */ + if (!F_ISSET(entry, WT_CJE_OWN_BLOOM)) + return (0); + + /* + * If the item is not in the Bloom filter, we return + * immediately, otherwise, we still need to check the + * long way. + */ + WT_ERR(__wt_bloom_inmem_get(entry->bloom, key)); + } + + if (entry->index != NULL) { + main = entry->main; + main->set_key(main, key); + if ((ret = main->search(main)) == 0) + ret = main->get_value(main, &v); + else if (ret == WT_NOTFOUND) + WT_ERR_MSG(session, WT_ERROR, + "main table for join is missing entry."); + main->reset(main); + WT_ERR(ret); + } else + v = *key; + + if ((index = entry->index) != NULL && index->extractor) { + extract_cursor.iface = iface; + extract_cursor.iface.session = &session->iface; + extract_cursor.iface.key_format = index->exkey_format; + extract_cursor.ismember = 0; + extract_cursor.entry = entry; + WT_ERR(index->extractor->extract(index->extractor, + &session->iface, key, &v, &extract_cursor.iface)); + if (!extract_cursor.ismember) + WT_ERR(WT_NOTFOUND); + } else + WT_ERR(__curjoin_entry_in_range(session, entry, &v, skip_left)); +err: + return (ret); +} + +/* + * __curjoin_next -- + * WT_CURSOR::next for join cursors. + */ +static int +__curjoin_next(WT_CURSOR *cursor) +{ + WT_CURSOR_JOIN *cjoin; + WT_DECL_RET; + WT_SESSION_IMPL *session; + bool skip_left; + size_t count; + + cjoin = (WT_CURSOR_JOIN *)cursor; + + CURSOR_API_CALL(cursor, session, next, NULL); + + if (F_ISSET(cjoin, WT_CJ_ERROR)) { + __wt_errx(session, "join cursor encountered previous error"); + WT_ERR(WT_ERROR); + } + if (!F_ISSET(cjoin, WT_CJ_INITIALIZED)) + WT_ERR(__curjoin_init_iter(session, cjoin)); + +nextkey: + if ((ret = __curjoin_entry_iter_next(cjoin->iter, &cursor->key, + &cursor->recno)) == 0) { + F_SET(cursor, WT_CURSTD_KEY_EXT); + + /* + * We may have already established membership for the + * 'left' case for the first entry, since we're + * using that in our iteration. + */ + skip_left = F_ISSET(cjoin, WT_CJ_SKIP_FIRST_LEFT); + for (count = 0; count < cjoin->entries_next; count++) { + ret = __curjoin_entry_member(session, cjoin, + &cjoin->entries[count], skip_left); + if (ret == WT_NOTFOUND) + goto nextkey; + skip_left = false; + WT_ERR(ret); + } + } + + if (0) { +err: F_SET(cjoin, WT_CJ_ERROR); + } + API_END_RET(session, ret); +} + +/* + * __curjoin_reset -- + * WT_CURSOR::reset for join cursors. + */ +static int +__curjoin_reset(WT_CURSOR *cursor) +{ + WT_CURSOR_JOIN *cjoin; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + cjoin = (WT_CURSOR_JOIN *)cursor; + + CURSOR_API_CALL(cursor, session, reset, NULL); + + if (F_ISSET(cjoin, WT_CJ_INITIALIZED)) + WT_ERR(__curjoin_entry_iter_reset(cjoin->iter)); + +err: API_END_RET(session, ret); +} + +/* + * __curjoin_close -- + * WT_CURSOR::close for join cursors. + */ +static int +__curjoin_close(WT_CURSOR *cursor) +{ + WT_CURSOR_JOIN *cjoin; + WT_CURSOR_JOIN_ENTRY *entry; + WT_DECL_RET; + WT_SESSION_IMPL *session; + size_t i; + + cjoin = (WT_CURSOR_JOIN *)cursor; + + CURSOR_API_CALL(cursor, session, close, NULL); + + __wt_schema_release_table(session, cjoin->table); + /* These are owned by the table */ + cursor->internal_uri = NULL; + cursor->key_format = NULL; + if (cjoin->projection != NULL) { + __wt_free(session, cjoin->projection); + __wt_free(session, cursor->value_format); + } + + for (entry = cjoin->entries, i = 0; i < cjoin->entries_next; + entry++, i++) { + if (entry->main != NULL) + WT_TRET(entry->main->close(entry->main)); + if (entry->ends[0].cursor != NULL) + F_CLR(entry->ends[0].cursor, WT_CURSTD_JOINED); + if (entry->ends[1].cursor != NULL) + F_CLR(entry->ends[1].cursor, WT_CURSTD_JOINED); + if (F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_OWNKEY)) + __wt_free(session, entry->ends[0].key.data); + if (F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_OWNKEY)) + __wt_free(session, entry->ends[1].key.data); + if (F_ISSET(entry, WT_CJE_OWN_BLOOM)) + WT_TRET(__wt_bloom_close(entry->bloom)); + } + + if (cjoin->iter != NULL) + WT_TRET(__curjoin_entry_iter_close(cjoin->iter)); + __wt_free(session, cjoin->entries); + WT_TRET(__wt_cursor_close(cursor)); + +err: API_END_RET(session, ret); +} + +/* + * __wt_curjoin_open -- + * Initialize a join cursor. + * + * Join cursors are read-only. + */ +int +__wt_curjoin_open(WT_SESSION_IMPL *session, + const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp) +{ + WT_CURSOR_STATIC_INIT(iface, + __curjoin_get_key, /* get-key */ + __curjoin_get_value, /* get-value */ + __wt_cursor_notsup, /* set-key */ + __wt_cursor_notsup, /* set-value */ + __wt_cursor_notsup, /* compare */ + __wt_cursor_notsup, /* equals */ + __curjoin_next, /* next */ + __wt_cursor_notsup, /* prev */ + __curjoin_reset, /* reset */ + __wt_cursor_notsup, /* search */ + __wt_cursor_notsup, /* search-near */ + __wt_cursor_notsup, /* insert */ + __wt_cursor_notsup, /* update */ + __wt_cursor_notsup, /* remove */ + __wt_cursor_notsup, /* reconfigure */ + __curjoin_close); /* close */ + WT_CURSOR *cursor; + WT_CURSOR_JOIN *cjoin; + WT_DECL_ITEM(tmp); + WT_DECL_RET; + WT_TABLE *table; + size_t size; + const char *tablename, *columns; + + WT_STATIC_ASSERT(offsetof(WT_CURSOR_JOIN, iface) == 0); + + if (!WT_PREFIX_SKIP(uri, "join:")) + return (EINVAL); + tablename = uri; + if (!WT_PREFIX_SKIP(tablename, "table:")) + return (EINVAL); + + columns = strchr(tablename, '('); + if (columns == NULL) + size = strlen(tablename); + else + size = WT_PTRDIFF(columns, tablename); + WT_RET(__wt_schema_get_table(session, tablename, size, 0, &table)); + + WT_RET(__wt_calloc_one(session, &cjoin)); + cursor = &cjoin->iface; + *cursor = iface; + cursor->session = &session->iface; + cursor->internal_uri = table->name; + cursor->key_format = table->key_format; + cursor->value_format = table->value_format; + cjoin->table = table; + + /* Handle projections. */ + WT_ERR(__wt_scr_alloc(session, 0, &tmp)); + if (columns != NULL) { + WT_ERR(__wt_struct_reformat(session, table, + columns, strlen(columns), NULL, 1, tmp)); + WT_ERR(__wt_strndup( + session, tmp->data, tmp->size, &cursor->value_format)); + WT_ERR(__wt_strdup(session, columns, &cjoin->projection)); + } + + if (owner != NULL) + WT_ERR(EINVAL); + + WT_ERR(__wt_cursor_init(cursor, uri, owner, cfg, cursorp)); + + if (0) { +err: WT_TRET(__curjoin_close(cursor)); + *cursorp = NULL; + } + + __wt_scr_free(session, &tmp); + return (ret); +} + +/* + * __wt_curjoin_join -- + * Add a new join to a join cursor. + */ +int +__wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_INDEX *index, WT_CURSOR *ref_cursor, uint32_t flags, uint32_t range, + uint64_t count, uint64_t bloom_bit_count, uint64_t bloom_hash_count) +{ + WT_CURSOR_JOIN_ENTRY *entry; + WT_DECL_RET; + WT_CURSOR_JOIN_ENDPOINT *endpoint; + int nonbloom; + size_t i; + const char *raw_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), "raw", NULL }; + char *main_uri; + size_t namesize, newsize; + + entry = NULL; + main_uri = NULL; + nonbloom = -1; + namesize = strlen(cjoin->table->name); + for (i = 0; i < cjoin->entries_next; i++) { + if (cjoin->entries[i].index == index) { + entry = &cjoin->entries[i]; + break; + } + if (nonbloom == -1 && i > 0 && + !F_ISSET(&cjoin->entries[i], WT_CJE_BLOOM)) + nonbloom = i; + } + if (entry == NULL) { + WT_ERR(__wt_realloc_def(session, &cjoin->entries_allocated, + cjoin->entries_next + 1, &cjoin->entries)); + if (LF_ISSET(WT_CJE_BLOOM) && nonbloom != -1) { + /* + * Reorder the list so that after the first entry, + * the Bloom filtered entries come next, followed by + * the non-Bloom entries. Once the Bloom filters + * are built, determining membership via Bloom is + * faster than without Bloom, so we can answer + * membership questions more quickly, and with less + * I/O, with the Bloom entries first. + */ + entry = &cjoin->entries[nonbloom]; + memmove(entry + 1, entry, + (cjoin->entries_next - nonbloom) * + sizeof(WT_CURSOR_JOIN_ENTRY)); + memset(entry, 0, sizeof(WT_CURSOR_JOIN_ENTRY)); + } + else + entry = &cjoin->entries[cjoin->entries_next]; + entry->index = index; + entry->flags = flags; + entry->count = count; + entry->bloom_bit_count = bloom_bit_count; + entry->bloom_hash_count = bloom_hash_count; + ++cjoin->entries_next; + } else { + /* Merge the join into an existing entry for this index */ + if (count != 0 && entry->count != 0 && entry->count != count) { + __wt_errx(session, "count=%" PRIu64 " does not match " + "previous count=%" PRIu64 " for this index", + count, entry->count); + WT_ERR(EINVAL); + } + if (LF_ISSET(WT_CJE_BLOOM) != F_ISSET(entry, WT_CJE_BLOOM)) { + __wt_errx(session, "join has incompatible strategy " + "values for the same index"); + WT_ERR(EINVAL); + } + /* Check flag combinations */ + if ((F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_GT) && + (range & WT_CJE_ENDPOINT_GT) != 0) || + (F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_LT) && + (range & WT_CJE_ENDPOINT_LT) != 0) || + ((F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_EQ) || + F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_EQ)) && + (range == WT_CJE_ENDPOINT_EQ))) { + __wt_errx(session, "join has overlapping ranges"); + WT_ERR(EINVAL); + } + /* All checks completed, merge any new configuration now */ + entry->count = count; + entry->bloom_bit_count = + WT_MAX(entry->bloom_bit_count, bloom_bit_count); + entry->bloom_hash_count = + WT_MAX(entry->bloom_hash_count, bloom_hash_count); + } + if (range & WT_CJE_ENDPOINT_LT) + endpoint = &entry->ends[1]; + else + endpoint = &entry->ends[0]; + endpoint->cursor = ref_cursor; + F_SET(endpoint, range); + + /* Open the main file with a projection of the indexed columns. */ + if (entry->main == NULL && entry->index != NULL) { + namesize = strlen(cjoin->table->name); + newsize = namesize + entry->index->colconf.len + 1; + WT_ERR(__wt_calloc(session, 1, newsize, &main_uri)); + snprintf(main_uri, newsize, "%s%.*s", + cjoin->table->name, (int)entry->index->colconf.len, + entry->index->colconf.str); + WT_ERR(__wt_open_cursor(session, main_uri, + (WT_CURSOR *)cjoin, raw_cfg, &entry->main)); + } + +err: if (main_uri != NULL) + __wt_free(session, main_uri); + return (ret); +} diff --git a/src/cursor/cur_table.c b/src/cursor/cur_table.c index 01d1fdd1886..2bdddadff2a 100644 --- a/src/cursor/cur_table.c +++ b/src/cursor/cur_table.c @@ -186,34 +186,16 @@ __wt_curtable_get_key(WT_CURSOR *cursor, ...) int __wt_curtable_get_value(WT_CURSOR *cursor, ...) { - WT_CURSOR *primary; - WT_CURSOR_TABLE *ctable; WT_DECL_RET; - WT_ITEM *item; WT_SESSION_IMPL *session; va_list ap; - ctable = (WT_CURSOR_TABLE *)cursor; - primary = *ctable->cg_cursors; - CURSOR_API_CALL(cursor, session, get_value, NULL); - WT_CURSOR_NEEDVALUE(primary); - va_start(ap, cursor); - if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) { - ret = __wt_schema_project_merge(session, - ctable->cg_cursors, ctable->plan, - cursor->value_format, &cursor->value); - if (ret == 0) { - item = va_arg(ap, WT_ITEM *); - item->data = cursor->value.data; - item->size = cursor->value.size; - } - } else - ret = __wt_schema_project_out(session, - ctable->cg_cursors, ctable->plan, ap); - va_end(ap); + JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL); + WT_ERR(__wt_curtable_get_value_ap(cursor, ap)); -err: API_END_RET(session, ret); +err: va_end(ap); + API_END_RET(session, ret); } /* @@ -264,7 +246,7 @@ __wt_curtable_set_value(WT_CURSOR *cursor, ...) u_int i; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, set_value, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, set_value, NULL); va_start(ap, cursor); if (F_ISSET(cursor, WT_CURSOR_RAW_OK | WT_CURSTD_DUMP_JSON)) { @@ -332,7 +314,7 @@ __curtable_compare(WT_CURSOR *a, WT_CURSOR *b, int *cmpp) WT_DECL_RET; WT_SESSION_IMPL *session; - CURSOR_API_CALL(a, session, compare, NULL); + JOINABLE_CURSOR_API_CALL(a, session, compare, NULL); /* * Confirm both cursors refer to the same source and have keys, then @@ -362,7 +344,7 @@ __curtable_next(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, next, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL); APPLY_CG(ctable, next); err: API_END_RET(session, ret); @@ -383,7 +365,7 @@ __curtable_next_random(WT_CURSOR *cursor) u_int i; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, next, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL); cp = ctable->cg_cursors; /* Split out the first next, it retrieves the random record. */ @@ -414,7 +396,7 @@ __curtable_prev(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, prev, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, prev, NULL); APPLY_CG(ctable, prev); err: API_END_RET(session, ret); @@ -432,7 +414,7 @@ __curtable_reset(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, reset, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL); APPLY_CG(ctable, reset); err: API_END_RET(session, ret); @@ -450,7 +432,7 @@ __curtable_search(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, search, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, search, NULL); APPLY_CG(ctable, search); err: API_END_RET(session, ret); @@ -470,7 +452,7 @@ __curtable_search_near(WT_CURSOR *cursor, int *exact) u_int i; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, search_near, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, search_near, NULL); cp = ctable->cg_cursors; primary = *cp; WT_ERR(primary->search_near(primary, exact)); @@ -721,7 +703,7 @@ __curtable_close(WT_CURSOR *cursor) u_int i; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, close, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL); if (ctable->cg_cursors != NULL) for (i = 0, cp = ctable->cg_cursors; @@ -844,7 +826,7 @@ __curtable_open_indices(WT_CURSOR_TABLE *ctable) */ int __wt_curtable_open(WT_SESSION_IMPL *session, - const char *uri, const char *cfg[], WT_CURSOR **cursorp) + const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp) { WT_CURSOR_STATIC_INIT(iface, __wt_curtable_get_key, /* get-key */ @@ -935,7 +917,7 @@ __wt_curtable_open(WT_SESSION_IMPL *session, } WT_ERR(__wt_cursor_init( - cursor, cursor->internal_uri, NULL, cfg, cursorp)); + cursor, cursor->internal_uri, owner, cfg, cursorp)); if (F_ISSET(cursor, WT_CURSTD_DUMP_JSON)) WT_ERR(__wt_json_column_init(cursor, table->key_format, diff --git a/src/include/api.h b/src/include/api.h index 8679b9510a8..862d8451ada 100644 --- a/src/include/api.h +++ b/src/include/api.h @@ -116,6 +116,11 @@ API_CALL_NOCONF(s, WT_CURSOR, n, cur, \ ((bt) == NULL) ? NULL : ((WT_BTREE *)(bt))->dhandle) +#define JOINABLE_CURSOR_API_CALL(cur, s, n, bt) \ + CURSOR_API_CALL(cur, s, n, bt); \ + if (F_ISSET(cur, WT_CURSTD_JOINED)) \ + WT_ERR(__wt_curindex_joined(cur)) + #define CURSOR_UPDATE_API_CALL(cur, s, n, bt) \ (s) = (WT_SESSION_IMPL *)(cur)->session; \ TXN_API_CALL_NOCONF(s, WT_CURSOR, n, cur, \ diff --git a/src/include/config.h b/src/include/config.h index 408639ab2a9..e836abaccba 100644 --- a/src/include/config.h +++ b/src/include/config.h @@ -68,28 +68,29 @@ struct __wt_config_parser_impl { #define WT_CONFIG_ENTRY_WT_SESSION_compact 16 #define WT_CONFIG_ENTRY_WT_SESSION_create 17 #define WT_CONFIG_ENTRY_WT_SESSION_drop 18 -#define WT_CONFIG_ENTRY_WT_SESSION_log_flush 19 -#define WT_CONFIG_ENTRY_WT_SESSION_log_printf 20 -#define WT_CONFIG_ENTRY_WT_SESSION_open_cursor 21 -#define WT_CONFIG_ENTRY_WT_SESSION_reconfigure 22 -#define WT_CONFIG_ENTRY_WT_SESSION_rename 23 -#define WT_CONFIG_ENTRY_WT_SESSION_reset 24 -#define WT_CONFIG_ENTRY_WT_SESSION_rollback_transaction 25 -#define WT_CONFIG_ENTRY_WT_SESSION_salvage 26 -#define WT_CONFIG_ENTRY_WT_SESSION_snapshot 27 -#define WT_CONFIG_ENTRY_WT_SESSION_strerror 28 -#define WT_CONFIG_ENTRY_WT_SESSION_transaction_sync 29 -#define WT_CONFIG_ENTRY_WT_SESSION_truncate 30 -#define WT_CONFIG_ENTRY_WT_SESSION_upgrade 31 -#define WT_CONFIG_ENTRY_WT_SESSION_verify 32 -#define WT_CONFIG_ENTRY_colgroup_meta 33 -#define WT_CONFIG_ENTRY_file_meta 34 -#define WT_CONFIG_ENTRY_index_meta 35 -#define WT_CONFIG_ENTRY_table_meta 36 -#define WT_CONFIG_ENTRY_wiredtiger_open 37 -#define WT_CONFIG_ENTRY_wiredtiger_open_all 38 -#define WT_CONFIG_ENTRY_wiredtiger_open_basecfg 39 -#define WT_CONFIG_ENTRY_wiredtiger_open_usercfg 40 +#define WT_CONFIG_ENTRY_WT_SESSION_join 19 +#define WT_CONFIG_ENTRY_WT_SESSION_log_flush 20 +#define WT_CONFIG_ENTRY_WT_SESSION_log_printf 21 +#define WT_CONFIG_ENTRY_WT_SESSION_open_cursor 22 +#define WT_CONFIG_ENTRY_WT_SESSION_reconfigure 23 +#define WT_CONFIG_ENTRY_WT_SESSION_rename 24 +#define WT_CONFIG_ENTRY_WT_SESSION_reset 25 +#define WT_CONFIG_ENTRY_WT_SESSION_rollback_transaction 26 +#define WT_CONFIG_ENTRY_WT_SESSION_salvage 27 +#define WT_CONFIG_ENTRY_WT_SESSION_snapshot 28 +#define WT_CONFIG_ENTRY_WT_SESSION_strerror 29 +#define WT_CONFIG_ENTRY_WT_SESSION_transaction_sync 30 +#define WT_CONFIG_ENTRY_WT_SESSION_truncate 31 +#define WT_CONFIG_ENTRY_WT_SESSION_upgrade 32 +#define WT_CONFIG_ENTRY_WT_SESSION_verify 33 +#define WT_CONFIG_ENTRY_colgroup_meta 34 +#define WT_CONFIG_ENTRY_file_meta 35 +#define WT_CONFIG_ENTRY_index_meta 36 +#define WT_CONFIG_ENTRY_table_meta 37 +#define WT_CONFIG_ENTRY_wiredtiger_open 38 +#define WT_CONFIG_ENTRY_wiredtiger_open_all 39 +#define WT_CONFIG_ENTRY_wiredtiger_open_basecfg 40 +#define WT_CONFIG_ENTRY_wiredtiger_open_usercfg 41 /* * configuration section: END * DO NOT EDIT: automatically built by dist/flags.py. diff --git a/src/include/cursor.h b/src/include/cursor.h index 1cbe76216b1..61ee81c96ff 100644 --- a/src/include/cursor.h +++ b/src/include/cursor.h @@ -264,6 +264,59 @@ struct __wt_cursor_index { uint8_t *cg_needvalue; }; +struct __wt_cursor_join_iter { + WT_SESSION_IMPL *session; + WT_CURSOR_JOIN *cjoin; + WT_CURSOR_JOIN_ENTRY *entry; + WT_CURSOR *cursor; + WT_ITEM *curkey; + bool advance; +}; + +struct __wt_cursor_join_endpoint { + WT_ITEM key; + uint8_t recno_buf[10]; /* holds packed recno */ + WT_CURSOR *cursor; + +#define WT_CJE_ENDPOINT_LT 0x01 /* include values < cursor */ +#define WT_CJE_ENDPOINT_EQ 0x02 /* include values == cursor */ +#define WT_CJE_ENDPOINT_GT 0x04 /* include values > cursor */ +#define WT_CJE_ENDPOINT_OWNKEY 0x08 /* must free key's data */ + uint8_t flags; /* range for this endpoint */ +}; + +struct __wt_cursor_join_entry { + WT_INDEX *index; + WT_CURSOR *main; /* raw main table cursor */ + WT_BLOOM *bloom; /* Bloom filter handle */ + uint64_t bloom_bit_count; /* bits per item in bloom */ + uint64_t bloom_hash_count; /* hash functions in bloom */ + uint64_t count; /* approx number of matches */ + +#define WT_CJE_BLOOM 0x01 /* use a bloom filter */ +#define WT_CJE_OWN_BLOOM 0x02 /* this entry owns the bloom */ + uint8_t flags; + + WT_CURSOR_JOIN_ENDPOINT ends[2]; /* reference endpoints */ +}; + +struct __wt_cursor_join { + WT_CURSOR iface; + + WT_TABLE *table; + const char *projection; + WT_CURSOR_JOIN_ITER *iter; + WT_CURSOR_JOIN_ENTRY *entries; + size_t entries_allocated; + size_t entries_next; + uint8_t recno_buf[10]; /* holds packed recno */ + +#define WT_CJ_ERROR 0x01 /* Error during initialization */ +#define WT_CJ_INITIALIZED 0x02 /* Successful initialization */ +#define WT_CJ_SKIP_FIRST_LEFT 0x04 + uint8_t flags; +}; + struct __wt_cursor_json { char *key_buf; /* JSON formatted string */ char *value_buf; /* JSON formatted string */ diff --git a/src/include/cursor.i b/src/include/cursor.i index c6ce04cab6f..7f90e43ed6c 100644 --- a/src/include/cursor.i +++ b/src/include/cursor.i @@ -138,6 +138,70 @@ __curfile_leave(WT_CURSOR_BTREE *cbt) return (ret); } +/* + * __wt_curindex_get_value_ap -- + * Internal implementation of WT_CURSOR->get_value for index cursors + */ +static inline int +__wt_curindex_get_value_ap(WT_CURSOR *cursor, va_list ap) +{ + WT_CURSOR_INDEX *cindex; + WT_DECL_RET; + WT_ITEM *item; + WT_SESSION_IMPL *session; + + cindex = (WT_CURSOR_INDEX *)cursor; + session = (WT_SESSION_IMPL *)cursor->session; + WT_CURSOR_NEEDVALUE(cursor); + + if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) { + ret = __wt_schema_project_merge(session, + cindex->cg_cursors, cindex->value_plan, + cursor->value_format, &cursor->value); + if (ret == 0) { + item = va_arg(ap, WT_ITEM *); + item->data = cursor->value.data; + item->size = cursor->value.size; + } + } else + ret = __wt_schema_project_out(session, + cindex->cg_cursors, cindex->value_plan, ap); +err: return (ret); +} + +/* + * __wt_curtable_get_value_ap -- + * Internal implementation of WT_CURSOR->get_value for table cursors. + */ +static inline int +__wt_curtable_get_value_ap(WT_CURSOR *cursor, va_list ap) +{ + WT_CURSOR *primary; + WT_CURSOR_TABLE *ctable; + WT_DECL_RET; + WT_ITEM *item; + WT_SESSION_IMPL *session; + + ctable = (WT_CURSOR_TABLE *)cursor; + session = (WT_SESSION_IMPL *)cursor->session; + primary = *ctable->cg_cursors; + WT_CURSOR_NEEDVALUE(primary); + + if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) { + ret = __wt_schema_project_merge(session, + ctable->cg_cursors, ctable->plan, + cursor->value_format, &cursor->value); + if (ret == 0) { + item = va_arg(ap, WT_ITEM *); + item->data = cursor->value.data; + item->size = cursor->value.size; + } + } else + ret = __wt_schema_project_out(session, + ctable->cg_cursors, ctable->plan, ap); +err: return (ret); +} + /* * __wt_cursor_dhandle_incr_use -- * Increment the in-use counter in cursor's data source. diff --git a/src/include/extern.h b/src/include/extern.h index bb70ac57806..dd0ad6369d9 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -83,6 +83,8 @@ extern int __wt_bloom_finalize(WT_BLOOM *bloom); extern int __wt_bloom_hash(WT_BLOOM *bloom, WT_ITEM *key, WT_BLOOM_HASH *bhash); extern int __wt_bloom_hash_get(WT_BLOOM *bloom, WT_BLOOM_HASH *bhash); extern int __wt_bloom_get(WT_BLOOM *bloom, WT_ITEM *key); +extern int __wt_bloom_inmem_get(WT_BLOOM *bloom, WT_ITEM *key); +extern int __wt_bloom_intersection(WT_BLOOM *bloom, WT_BLOOM *other); extern int __wt_bloom_close(WT_BLOOM *bloom); extern int __wt_bloom_drop(WT_BLOOM *bloom, const char *config); extern int __wt_compact(WT_SESSION_IMPL *session, const char *cfg[]); @@ -273,7 +275,10 @@ extern int __wt_curdump_create(WT_CURSOR *child, WT_CURSOR *owner, WT_CURSOR **c extern int __wt_curfile_update_check(WT_CURSOR *cursor); extern int __wt_curfile_create(WT_SESSION_IMPL *session, WT_CURSOR *owner, const char *cfg[], bool bulk, bool bitmap, WT_CURSOR **cursorp); extern int __wt_curfile_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp); +extern int __wt_curindex_joined(WT_CURSOR *cursor); extern int __wt_curindex_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp); +extern int __wt_curjoin_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp); +extern int __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_INDEX *index, WT_CURSOR *ref_cursor, uint32_t flags, uint32_t range, uint64_t count, uint64_t bloom_bit_count, uint64_t bloom_hash_count); extern int __wt_json_alloc_unpack(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, WT_CURSOR_JSON *json, bool iskey, va_list ap); extern void __wt_json_close(WT_SESSION_IMPL *session, WT_CURSOR *cursor); extern size_t __wt_json_unpack_char(char ch, u_char *buf, size_t bufsz, bool force_unicode); @@ -315,7 +320,7 @@ extern int __wt_curtable_get_value(WT_CURSOR *cursor, ...); extern void __wt_curtable_set_key(WT_CURSOR *cursor, ...); extern void __wt_curtable_set_value(WT_CURSOR *cursor, ...); extern int __wt_table_range_truncate(WT_CURSOR_TABLE *start, WT_CURSOR_TABLE *stop); -extern int __wt_curtable_open(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR **cursorp); +extern int __wt_curtable_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp); extern int __wt_evict_file(WT_SESSION_IMPL *session, int syncop); extern void __wt_evict_list_clear_page(WT_SESSION_IMPL *session, WT_REF *ref); extern int __wt_evict_server_wake(WT_SESSION_IMPL *session); @@ -531,6 +536,8 @@ extern int __wt_struct_confchk(WT_SESSION_IMPL *session, WT_CONFIG_ITEM *v); extern int __wt_struct_size(WT_SESSION_IMPL *session, size_t *sizep, const char *fmt, ...); extern int __wt_struct_pack(WT_SESSION_IMPL *session, void *buffer, size_t size, const char *fmt, ...); extern int __wt_struct_unpack(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, ...); +extern int __wt_struct_unpack_size(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, size_t *resultp); +extern int __wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt, const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf, void **reallocp); extern int __wt_ovfl_discard_add(WT_SESSION_IMPL *session, WT_PAGE *page, WT_CELL *cell); extern void __wt_ovfl_discard_free(WT_SESSION_IMPL *session, WT_PAGE *page); extern int __wt_ovfl_reuse_search(WT_SESSION_IMPL *session, WT_PAGE *page, uint8_t **addrp, size_t *addr_sizep, const void *value, size_t value_size); diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in index b7ebb8fbc14..2c4b477f39c 100644 --- a/src/include/wiredtiger.in +++ b/src/include/wiredtiger.in @@ -574,11 +574,12 @@ struct __wt_cursor { #define WT_CURSTD_KEY_EXT 0x0020 /* Key points out of the tree. */ #define WT_CURSTD_KEY_INT 0x0040 /* Key points into the tree. */ #define WT_CURSTD_KEY_SET (WT_CURSTD_KEY_EXT | WT_CURSTD_KEY_INT) -#define WT_CURSTD_OPEN 0x0080 -#define WT_CURSTD_OVERWRITE 0x0100 -#define WT_CURSTD_RAW 0x0200 -#define WT_CURSTD_VALUE_EXT 0x0400 /* Value points out of the tree. */ -#define WT_CURSTD_VALUE_INT 0x0800 /* Value points into the tree. */ +#define WT_CURSTD_JOINED 0x0080 +#define WT_CURSTD_OPEN 0x0100 +#define WT_CURSTD_OVERWRITE 0x0200 +#define WT_CURSTD_RAW 0x0400 +#define WT_CURSTD_VALUE_EXT 0x0800 /* Value points out of the tree. */ +#define WT_CURSTD_VALUE_INT 0x1000 /* Value points into the tree. */ #define WT_CURSTD_VALUE_SET (WT_CURSTD_VALUE_EXT | WT_CURSTD_VALUE_INT) uint32_t flags; #endif @@ -1235,6 +1236,61 @@ struct __wt_session { int __F(drop)(WT_SESSION *session, const char *name, const char *config); + /*! + * Join a join cursor with a reference cursor. + * + * @snippet ex_schema.c Join cursors + * + * @param session the session handle + * @param join_cursor a cursor that was opened using a + * \c "join:" URI. It may not have been used for any operations + * other than other join calls. + * @param ref_cursor either an index cursor having the same base table + * as the join_cursor, or a table cursor open on the same base table. + * The ref_cursor must be positioned. + * + * The ref_cursor limits the results seen by iterating the + * join_cursor to table items referred to by the key in this + * index. The set of keys referred to is modified by the compare + * config option. + * + * Multiple join calls builds up a set of ref_cursors, and the + * results seen by iteration are the intersection of the cursor + * ranges participating in the join. + * + * After the join call completes, the ref_cursor cursor may not be + * used for any purpose other than get_key and get_value. Any other + * cursor method (e.g. next, prev,close) will fail. When the + * join_cursor is closed, the ref_cursor is made available for + * general use again. The application should close ref_cursor when + * finished with it, although not before the join_cursor is closed. + * + * @configstart{WT_SESSION.join, see dist/api_data.py} + * @config{bloom_bit_count, the number of bits used per item for the + * bloom filter., an integer between 2 and 1000; default \c 16.} + * @config{bloom_hash_count, the number of hash values per item for the + * bloom filter., an integer between 2 and 100; default \c 8.} + * @config{compare, modifies the set of items to be returned so that the + * index key satisfies the given comparison relative to the key set in + * this cursor., a string\, chosen from the following options: \c "eq"\, + * \c "ge"\, \c "gt"\, \c "le"\, \c "lt"; default \c "eq".} + * @config{count, set an approximate count of the elements that would be + * included in the join. This is used in sizing the bloom filter\, and + * also influences evaluation order for cursors in the join. When the + * count is equal for multiple bloom filters in a composition of joins\, + * the bloom filter may be shared., an integer; default \c .} + * @config{strategy, when set to bloom\, a bloom filter is created and + * populated for this index. This has an up front cost but may reduce + * the number of accesses to the main table when iterating the joined + * cursor. The bloom setting requires that count be set., a string\, + * chosen from the following options: \c "bloom"\, \c "default"; default + * empty.} + * @configend + * @errors + */ + int __F(join)(WT_SESSION *session, WT_CURSOR *join_cursor, + WT_CURSOR *ref_cursor, const char *config); + /*! * Flush the log. * diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h index 3f4e0ada7f1..9dcc70042cc 100644 --- a/src/include/wt_internal.h +++ b/src/include/wt_internal.h @@ -136,6 +136,14 @@ struct __wt_cursor_dump; typedef struct __wt_cursor_dump WT_CURSOR_DUMP; struct __wt_cursor_index; typedef struct __wt_cursor_index WT_CURSOR_INDEX; +struct __wt_cursor_join; + typedef struct __wt_cursor_join WT_CURSOR_JOIN; +struct __wt_cursor_join_endpoint; + typedef struct __wt_cursor_join_endpoint WT_CURSOR_JOIN_ENDPOINT; +struct __wt_cursor_join_entry; + typedef struct __wt_cursor_join_entry WT_CURSOR_JOIN_ENTRY; +struct __wt_cursor_join_iter; + typedef struct __wt_cursor_join_iter WT_CURSOR_JOIN_ITER; struct __wt_cursor_json; typedef struct __wt_cursor_json WT_CURSOR_JSON; struct __wt_cursor_log; diff --git a/src/packing/pack_impl.c b/src/packing/pack_impl.c index 3a4428eae15..99fa0d54869 100644 --- a/src/packing/pack_impl.c +++ b/src/packing/pack_impl.c @@ -105,3 +105,108 @@ __wt_struct_unpack(WT_SESSION_IMPL *session, return (ret); } + +/* + * __wt_struct_unpack_size -- + * Determine the packed size of a buffer matching the format. + */ +int +__wt_struct_unpack_size(WT_SESSION_IMPL *session, + const void *buffer, size_t size, const char *fmt, size_t *resultp) +{ + WT_DECL_PACK_VALUE(pv); + WT_DECL_RET; + WT_PACK pack; + const uint8_t *p, *end; + + p = buffer; + end = p + size; + + WT_RET(__pack_init(session, &pack, fmt)); + while ((ret = __pack_next(&pack, &pv)) == 0) + WT_RET(__unpack_read(session, &pv, &p, (size_t)(end - p))); + + /* Be paranoid - __pack_write should never overflow. */ + WT_ASSERT(session, p <= end); + + if (ret != WT_NOTFOUND) + return (ret); + + *resultp = (p - (uint8_t *)buffer); + return (0); +} + +/* + * __wt_struct_repack -- + * Return the subset of the packed buffer that represents part of + * the format. If the result is not contiguous in the existing + * buffer, a buffer is reallocated and filled. + */ +int +__wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt, + const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf, + void **reallocp) +{ + WT_DECL_PACK_VALUE(pvin); + WT_DECL_PACK_VALUE(pvout); + WT_DECL_RET; + WT_PACK packin, packout; + const uint8_t *before, *end, *p; + uint8_t *newbuf, *pout; + size_t len; + const void *start; + + start = newbuf = NULL; + p = inbuf->data; + end = p + inbuf->size; + + /* + * Handle this non-contiguous case: 'U' -> 'u' at the end of the buf. + * The former case has the size embedded before the item, the latter + * does not. + */ + if ((len = strlen(outfmt)) > 1 && outfmt[len - 1] == 'u' && + strlen(infmt) > len && infmt[len - 1] == 'U') { + WT_ERR(__wt_realloc(session, NULL, inbuf->size, reallocp)); + pout = *reallocp; + } else + pout = NULL; + + WT_ERR(__pack_init(session, &packout, outfmt)); + WT_ERR(__pack_init(session, &packin, infmt)); + + /* Outfmt should complete before infmt */ + while ((ret = __pack_next(&packout, &pvout)) == 0) { + WT_ERR(__pack_next(&packin, &pvin)); + before = p; + WT_ERR(__unpack_read(session, &pvin, &p, (size_t)(end - p))); + if (pvout.type != pvin.type) { + if (pvout.type == 'u' && pvin.type == 'U') { + /* Skip the prefixed size, we don't need it */ + WT_ERR(__wt_struct_unpack_size(session, before, + (size_t)(end - before), "I", &len)); + before += len; + } else + WT_ERR(ENOTSUP); + } + if (pout != NULL) { + memcpy(pout, before, p - before); + pout += p - before; + } else if (start == NULL) + start = before; + } + WT_ERR_NOTFOUND_OK(ret); + + /* Be paranoid - __pack_write should never overflow. */ + WT_ASSERT(session, p <= end); + + if (pout != NULL) { + outbuf->data = *reallocp; + outbuf->size = (pout - (uint8_t *)*reallocp); + } else { + outbuf->data = start; + outbuf->size = (p - (uint8_t *)start); + } + +err: return (ret); +} diff --git a/src/session/session_api.c b/src/session/session_api.c index a766829afad..02e8570b649 100644 --- a/src/session/session_api.c +++ b/src/session/session_api.c @@ -244,7 +244,8 @@ __wt_open_cursor(WT_SESSION_IMPL *session, */ case 't': if (WT_PREFIX_MATCH(uri, "table:")) - WT_RET(__wt_curtable_open(session, uri, cfg, cursorp)); + WT_RET(__wt_curtable_open( + session, uri, owner, cfg, cursorp)); break; case 'c': if (WT_PREFIX_MATCH(uri, "colgroup:")) { @@ -265,6 +266,11 @@ __wt_open_cursor(WT_SESSION_IMPL *session, WT_RET(__wt_curindex_open( session, uri, owner, cfg, cursorp)); break; + case 'j': + if (WT_PREFIX_MATCH(uri, "join:")) + WT_RET(__wt_curjoin_open( + session, uri, owner, cfg, cursorp)); + break; case 'l': if (WT_PREFIX_MATCH(uri, "lsm:")) WT_RET(__wt_clsm_open( @@ -616,6 +622,123 @@ err: /* Note: drop operations cannot be unrolled (yet?). */ API_END_RET_NOTFOUND_MAP(session, ret); } +/* + * __session_join -- + * WT_SESSION->join method. + */ +static int +__session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor, + WT_CURSOR *ref_cursor, const char *config) +{ + WT_CONFIG_ITEM cval; + WT_DECL_RET; + WT_SESSION_IMPL *session; + WT_CURSOR_INDEX *cindex; + WT_CURSOR_JOIN *cjoin; + WT_CURSOR_TABLE *ctable; + WT_INDEX *index; + WT_TABLE *table; + uint32_t flags, range; + uint64_t count; + uint64_t bloom_bit_count, bloom_hash_count; + + count = 0; + session = (WT_SESSION_IMPL *)wt_session; + SESSION_API_CALL(session, join, config, cfg); + table = NULL; + + if (!WT_PREFIX_MATCH(join_cursor->uri, "join:")) { + __wt_errx(session, "not a join cursor"); + WT_ERR(EINVAL); + } + + if (WT_PREFIX_MATCH(ref_cursor->uri, "index:")) { + cindex = (WT_CURSOR_INDEX *)ref_cursor; + index = cindex->index; + table = cindex->table; + WT_CURSOR_CHECKKEY(ref_cursor); + } else if (WT_PREFIX_MATCH(ref_cursor->uri, "table:")) { + index = NULL; + ctable = (WT_CURSOR_TABLE *)ref_cursor; + table = ctable->table; + WT_CURSOR_CHECKKEY(ctable->cg_cursors[0]); + } else { + __wt_errx(session, "not an index or table cursor"); + WT_ERR(EINVAL); + } + + cjoin = (WT_CURSOR_JOIN *)join_cursor; + if (cjoin->table != table) { + __wt_errx(session, "table for join cursor does not match " + "table for index"); + WT_ERR(EINVAL); + } + if (F_ISSET(ref_cursor, WT_CURSTD_JOINED)) { + __wt_errx(session, "index cursor already used in a join"); + WT_ERR(EINVAL); + } + + /* "ge" is the default */ + range = WT_CJE_ENDPOINT_GT | WT_CJE_ENDPOINT_EQ; + flags = 0; + WT_ERR(__wt_config_gets(session, cfg, "compare", &cval)); + if (cval.len != 0) { + if (WT_STRING_MATCH("gt", cval.str, cval.len)) + range = WT_CJE_ENDPOINT_GT; + else if (WT_STRING_MATCH("lt", cval.str, cval.len)) + range = WT_CJE_ENDPOINT_LT; + else if (WT_STRING_MATCH("le", cval.str, cval.len)) + range = WT_CJE_ENDPOINT_LT | WT_CJE_ENDPOINT_EQ; + else if (WT_STRING_MATCH("eq", cval.str, cval.len)) + range = WT_CJE_ENDPOINT_EQ; + else if (!WT_STRING_MATCH("ge", cval.str, cval.len)) + WT_ERR(EINVAL); + } + WT_ERR(__wt_config_gets(session, cfg, "count", &cval)); + if (cval.len != 0) + count = cval.val; + + WT_ERR(__wt_config_gets(session, cfg, "strategy", &cval)); + if (cval.len != 0) { + if (WT_STRING_MATCH("bloom", cval.str, cval.len)) + LF_SET(WT_CJE_BLOOM); + else if (!WT_STRING_MATCH("default", cval.str, cval.len)) + WT_ERR(EINVAL); + } + WT_ERR(__wt_config_gets(session, cfg, "bloom_bit_count", &cval)); + bloom_bit_count = (uint64_t)cval.val; + WT_ERR(__wt_config_gets(session, cfg, "bloom_hash_count", &cval)); + bloom_hash_count = (uint64_t)cval.val; + if (LF_ISSET(WT_CJE_BLOOM)) { + if (count == 0) { + __wt_errx(session, "count must be nonzero when " + "strategy=bloom"); + WT_ERR(EINVAL); + } + if (cjoin->entries_next == 0) { + __wt_errx(session, "the first joined cursor cannot " + "specify strategy=bloom"); + WT_ERR(EINVAL); + } + } + WT_ERR(__wt_curjoin_join(session, cjoin, index, ref_cursor, flags, + range, count, bloom_bit_count, bloom_hash_count)); + /* + * There's an implied ownership ordering that isn't + * known when the cursors are created: the join cursor + * must be closed before any of the indices. Enforce + * that here by reordering. + */ + if (TAILQ_FIRST(&session->cursors) != join_cursor) { + TAILQ_REMOVE(&session->cursors, join_cursor, q); + TAILQ_INSERT_HEAD(&session->cursors, join_cursor, q); + } + /* Disable the reference cursor for regular operations */ + F_SET(ref_cursor, WT_CURSTD_JOINED); + +err: API_END_RET_NOTFOUND_MAP(session, ret); +} + /* * __session_salvage -- * WT_SESSION->salvage method. @@ -1162,6 +1285,7 @@ __open_session(WT_CONNECTION_IMPL *conn, __session_create, __session_compact, __session_drop, + __session_join, __session_log_flush, __session_log_printf, __session_rename, -- cgit v1.2.1 From 3507971e82da3c4a26956d2626fd56a194b0abfd Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Tue, 3 Nov 2015 13:31:59 -0500 Subject: WT-1315. Modified csv_extractor arguments to allow either string or int columns to be extracted. This is needed for comprehensive join cursor testing. --- ext/extractors/csv/csv_extractor.c | 43 ++++++++++++++++++++++++++++---------- test/suite/test_schema05.py | 7 ++++--- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/ext/extractors/csv/csv_extractor.c b/ext/extractors/csv/csv_extractor.c index 34b8d7c7c64..2c01271fd02 100644 --- a/ext/extractors/csv/csv_extractor.c +++ b/ext/extractors/csv/csv_extractor.c @@ -49,7 +49,8 @@ typedef struct { WT_EXTRACTOR extractor; /* Must come first */ WT_EXTENSION_API *wt_api; /* Extension API */ - int field_num; /* Field to extract */ + int field; /* Field to extract */ + int format_isnum; /* Field contents are numeric */ } CSV_EXTRACTOR; /* @@ -61,15 +62,15 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session, const WT_ITEM *key, const WT_ITEM *value, WT_CURSOR *result_cursor) { char *copy, *p, *pend, *valstr; - const CSV_EXTRACTOR *cvs_extractor; - int i, ret; + const CSV_EXTRACTOR *csv_extractor; + int i, ret, val; size_t len; WT_EXTENSION_API *wtapi; (void)key; /* Unused parameters */ - cvs_extractor = (const CSV_EXTRACTOR *)extractor; - wtapi = cvs_extractor->wt_api; + csv_extractor = (const CSV_EXTRACTOR *)extractor; + wtapi = csv_extractor->wt_api; /* Unpack the value. */ if ((ret = wtapi->struct_unpack(wtapi, @@ -78,11 +79,11 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session, p = valstr; pend = strchr(p, ','); - for (i = 0; i < cvs_extractor->field_num && pend != NULL; i++) { + for (i = 0; i < csv_extractor->field && pend != NULL; i++) { p = pend + 1; pend = strchr(p, ','); } - if (i == cvs_extractor->field_num) { + if (i == csv_extractor->field) { if (pend == NULL) pend = p + strlen(p); /* @@ -95,7 +96,12 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session, return (errno); strncpy(copy, p, len); copy[len] = '\0'; - result_cursor->set_key(result_cursor, copy); + if (csv_extractor->format_isnum) { + if ((val = atoi(copy)) < 0) + return (EINVAL); + result_cursor->set_key(result_cursor, val); + } else + result_cursor->set_key(result_cursor, copy); ret = result_cursor->insert(result_cursor); free(copy); if (ret != 0) @@ -107,7 +113,7 @@ csv_extract(WT_EXTRACTOR *extractor, WT_SESSION *session, /* * csv_customize -- * The customize function creates a customized extractor, - * needed to save the field number. + * needed to save the field number and format. */ static int csv_customize(WT_EXTRACTOR *extractor, WT_SESSION *session, @@ -115,20 +121,35 @@ csv_customize(WT_EXTRACTOR *extractor, WT_SESSION *session, { const CSV_EXTRACTOR *orig; CSV_EXTRACTOR *csv_extractor; + WT_CONFIG_ITEM field, format; + WT_CONFIG_PARSER *parser; + int ret; long field_num; (void)session; /* Unused parameters */ (void)uri; /* Unused parameters */ orig = (const CSV_EXTRACTOR *)extractor; - field_num = strtol(appcfg->str, NULL, 10); + if ((ret = wiredtiger_config_parser_open(session, appcfg->str, + appcfg->len, &parser)) != 0) + return (ret); + if ((ret = parser->get(parser, "field", &field)) != 0 || + (ret = parser->get(parser, "format", &format)) != 0) { + if (ret == WT_NOTFOUND) + return (EINVAL); + return (ret); + } + field_num = strtol(field.str, NULL, 10); if (field_num < 0 || field_num > INT_MAX) return (EINVAL); + if (format.len != 1 || (format.str[0] != 'S' && format.str[0] != 'i')) + return (EINVAL); if ((csv_extractor = calloc(1, sizeof(CSV_EXTRACTOR))) == NULL) return (errno); *csv_extractor = *orig; - csv_extractor->field_num = (int)field_num; + csv_extractor->field = field_num; + csv_extractor->format_isnum = (format.str[0] == 'i'); *customp = (WT_EXTRACTOR *)csv_extractor; return (0); } diff --git a/test/suite/test_schema05.py b/test/suite/test_schema05.py index 2a7bc042c80..b0562f7983c 100644 --- a/test/suite/test_schema05.py +++ b/test/suite/test_schema05.py @@ -89,9 +89,10 @@ class test_schema05(wttest.WiredTigerTestCase): # Create self.nindices index files, each with a column from the CSV for i in range(0, self.nindices): si = str(i) - self.session.create("index:schema05:x" + si, - "key_format=S,columns=(key)," - "extractor=csv,app_metadata=" + si) + self.session.create('index:schema05:x' + si, + 'key_format=S,columns=(key),' + 'extractor=csv,app_metadata={"format" : "S",' + + '"field" : "' + si + '"}') def drop_indices(self): for i in range(0, self.nindices): -- cgit v1.2.1 From 5fead322675139d198986886f011cd204e7d5950 Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Tue, 3 Nov 2015 13:32:22 -0500 Subject: WT-1315. Added tests for join cursors: basic tests, negative tests, bloom filters (shared and not shared), extractors, joining on indices and main table, testing all operations, a variety of index key formats including composite formats, indexing on 'u' format, projections, joining up to 5 indices. --- test/suite/test_join01.py | 332 ++++++++++++++++++++++++++++++++++++++++++++++ test/suite/test_join02.py | 290 ++++++++++++++++++++++++++++++++++++++++ test/suite/test_join03.py | 158 ++++++++++++++++++++++ 3 files changed, 780 insertions(+) create mode 100644 test/suite/test_join01.py create mode 100644 test/suite/test_join02.py create mode 100644 test/suite/test_join03.py diff --git a/test/suite/test_join01.py b/test/suite/test_join01.py new file mode 100644 index 00000000000..58a135f3bcd --- /dev/null +++ b/test/suite/test_join01.py @@ -0,0 +1,332 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2015 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import wiredtiger, wttest +from wtscenario import check_scenarios, multiply_scenarios, number_scenarios + +# test_join01.py +# Join operations +# Basic tests for join +class test_join01(wttest.WiredTigerTestCase): + table_name1 = 'test_join01' + nentries = 100 + + scenarios = [ + ('table', dict(ref='table')), + ('index', dict(ref='index')) + ] + + def gen_key(self, i): + return [ i + 1 ] + + def gen_values(self, i): + s = str(i) + rs = s[::-1] + sort3 = (self.nentries * (i % 3)) + i # multiples of 3 sort first + return [s, rs, sort3] + + # Common function for testing iteration of join cursors + def iter_common(self, jc, do_proj): + # See comments in join_common() + expect = [73, 82, 62, 83, 92] + while jc.next() == 0: + [k] = jc.get_keys() + i = k - 1 + if do_proj: # our projection test simply reverses the values + [v2,v1,v0] = jc.get_values() + else: + [v0,v1,v2] = jc.get_values() + self.assertEquals(self.gen_values(i), [v0,v1,v2]) + if len(expect) == 0 or i != expect[0]: + self.tty(' result ' + str(i) + ' is not in: ' + str(expect)) + self.assertTrue(i == expect[0]) + expect.remove(i) + self.assertEquals(0, len(expect)) + + # Common function for testing the most basic functionality + # of joins + def join_common(self, joincfg0, joincfg1, do_proj): + #self.tty('join_common(' + joincfg0 + ',' + joincfg1 + ',' + + # str(do_proj) + ')') + self.session.create('table:join01', 'key_format=r' + + ',value_format=SSi,columns=(k,v0,v1,v2)') + self.session.create('index:join01:index0','columns=(v0)') + self.session.create('index:join01:index1','columns=(v1)') + self.session.create('index:join01:index2','columns=(v2)') + + c = self.session.open_cursor('table:join01', None, None) + for i in range(0, self.nentries): + c.set_key(*self.gen_key(i)) + c.set_value(*self.gen_values(i)) + c.insert() + c.close() + + if do_proj: + proj_suffix = '(v2,v1,v0)' # Reversed values + else: + proj_suffix = '' # Default projection (v0,v1,v2) + + # We join on index2 first, not using bloom indices. + # This defines the order that items are returned. + # index2 is sorts multiples of 3 first (see gen_values()) + # and by using 'gt' and key 99, we'll skip multiples of 3, + # and examine primary keys 2,5,8,...,95,98,1,4,7,...,94,97. + jc = self.session.open_cursor('join:table:join01' + proj_suffix, + None, None) + c2 = self.session.open_cursor('index:join01:index2', None, None) + c2.set_key(99) # skips all entries w/ primary key divisible by three + self.assertEquals(0, c2.search()) + self.session.join(jc, c2, 'compare=gt') + + # Then select all the numbers 0-99 whose string representation + # sort >= '60'. + if self.ref == 'index': + c0 = self.session.open_cursor('index:join01:index0', None, None) + c0.set_key('60') + else: + c0 = self.session.open_cursor('table:join01', None, None) + c0.set_key(60) + self.assertEquals(0, c0.search()) + self.session.join(jc, c0, 'compare=ge' + joincfg0) + + # Then select all numbers whose reverse string representation + # is in '20' < x < '40'. + c1a = self.session.open_cursor('index:join01:index1', None, None) + c1a.set_key('21') + self.assertEquals(0, c1a.search()) + self.session.join(jc, c1a, 'compare=gt' + joincfg1) + + c1b = self.session.open_cursor('index:join01:index1', None, None) + c1b.set_key('41') + self.assertEquals(0, c1b.search()) + self.session.join(jc, c1b, 'compare=lt' + joincfg1) + + # Numbers that satisfy these 3 conditions (with ordering implied by c2): + # [73, 82, 62, 83, 92]. + # + # After iterating, we should be able to reset and iterate again. + self.iter_common(jc, do_proj) + jc.reset() + self.iter_common(jc, do_proj) + jc.reset() + self.iter_common(jc, do_proj) + + jc.close() + c2.close() + c1a.close() + c1b.close() + c0.close() + self.session.drop('table:join01') + + # Test joins with basic functionality + def test_join(self): + bloomcfg1000 = ',strategy=bloom,count=1000' + bloomcfg10000 = ',strategy=bloom,count=10000' + for cfga in [ '', bloomcfg1000, bloomcfg10000 ]: + for cfgb in [ '', bloomcfg1000, bloomcfg10000 ]: + for do_proj in [ False, True ]: + #self.tty('cfga=' + cfga + + # ', cfgb=' + cfgb + + # ', doproj=' + str(do_proj)) + self.join_common(cfga, cfgb, do_proj) + + def test_join_errors(self): + self.session.create('table:join01', 'key_format=r,value_format=SS' + ',columns=(k,v0,v1)') + self.session.create('table:join01B', 'key_format=r,value_format=SS' + ',columns=(k,v0,v1)') + self.session.create('index:join01:index0','columns=(v0)') + self.session.create('index:join01:index1','columns=(v1)') + self.session.create('index:join01B:index0','columns=(v0)') + jc = self.session.open_cursor('join:table:join01', None, None) + tc = self.session.open_cursor('table:join01', None, None) + fc = self.session.open_cursor('file:join01.wt', None, None) + ic0 = self.session.open_cursor('index:join01:index0', None, None) + ic0again = self.session.open_cursor('index:join01:index0', None, None) + ic1 = self.session.open_cursor('index:join01:index1', None, None) + icB = self.session.open_cursor('index:join01B:index0', None, None) + tcB = self.session.open_cursor('table:join01B', None, None) + + tc.set_key(1) + tc.set_value('val1', 'val1') + tc.insert() + tcB.set_key(1) + tcB.set_value('val1', 'val1') + tcB.insert() + fc.next() + + # Joining using a non join-cursor + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(tc, ic0, 'compare=ge'), + '/not a join cursor/') + # Joining a table cursor, not index + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(jc, fc, 'compare=ge'), + '/not an index or table cursor/') + # Joining a non positioned cursor + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(jc, ic0, 'compare=ge'), + '/requires key be set/') + + # minimally position the cursors now + ic0.next() + ic0again.next() + icB.next() + + # Joining non matching index + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(jc, icB, 'compare=ge'), + '/table for join cursor does not match/') + + # The cursor must be positioned + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(jc, ic1, 'compare=ge'), + '/requires key be set/') + ic1.next() + + # The first cursor joined cannot be bloom + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(jc, ic1, + 'compare=ge,strategy=bloom,count=1000'), + '/first joined cursor cannot specify strategy=bloom/') + + # This succeeds. + self.session.join(jc, ic1, 'compare=ge'), + + # With bloom filters, a count is required + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(jc, ic0, 'compare=ge,strategy=bloom'), + '/count must be nonzero/') + + # This succeeds. + self.session.join(jc, ic0, 'compare=ge,strategy=bloom,count=1000'), + + bloom_config = ',strategy=bloom,count=1000' + # Cannot use the same index cursor + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(jc, ic0, + 'compare=le' + bloom_config), + '/index cursor already used in a join/') + + # When joining with the same index, need compatible compares + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(jc, ic0again, 'compare=ge' + bloom_config), + '/join has overlapping ranges/') + + # Another incompatible compare + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(jc, ic0again, 'compare=gt' + bloom_config), + '/join has overlapping ranges/') + + # Compare is compatible, but bloom args need to match + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(jc, ic0again, 'compare=le'), + '/join has incompatible strategy/') + + # Counts need to match for bloom filters + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: self.session.join(jc, ic0again, 'compare=le,strategy=bloom,' + 'count=100'), '/count.* does not match previous count/') + + # This succeeds + self.session.join(jc, ic0again, 'compare=le,strategy=bloom,count=1000') + + # Need to do initial next() before getting key/values + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: jc.get_keys(), + '/join cursor must be advanced with next/') + + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: jc.get_values(), + '/join cursor must be advanced with next/') + + # Operations on the joined cursor are frozen until the join is closed. + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: ic0.next(), + '/index cursor is being used in a join/') + + # Operations on the joined cursor are frozen until the join is closed. + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: ic0.prev(), + '/index cursor is being used in a join/') + + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda: ic0.reset(), + '/index cursor is being used in a join/') + + # Only a small number of operations allowed on a join cursor + self.assertRaises(wiredtiger.WiredTigerError, + lambda: jc.search()) + + self.assertRaises(wiredtiger.WiredTigerError, + lambda: jc.prev()) + + self.assertEquals(jc.next(), 0) + self.assertEquals(jc.next(), wiredtiger.WT_NOTFOUND) + + # Only after the join cursor is closed can we use the index cursor + # normally + jc.close() + self.assertEquals(ic0.next(), wiredtiger.WT_NOTFOUND) + self.assertEquals(ic0.prev(), 0) + + # common code for making sure that cursors can be + # implicitly closed, no matter the order they are created + def cursor_close_common(self, joinfirst): + self.session.create('table:join01', 'key_format=r' + + ',value_format=SS,columns=(k,v0,v1)') + self.session.create('index:join01:index0','columns=(v0)') + self.session.create('index:join01:index1','columns=(v1)') + c = self.session.open_cursor('table:join01', None, None) + for i in range(0, self.nentries): + c.set_key(*self.gen_key(i)) + c.set_value(*self.gen_values(i)) + c.insert() + c.close() + + if joinfirst: + jc = self.session.open_cursor('join:table:join01', None, None) + c0 = self.session.open_cursor('index:join01:index0', None, None) + c1 = self.session.open_cursor('index:join01:index1', None, None) + c0.next() # index cursors must be positioned + c1.next() + if not joinfirst: + jc = self.session.open_cursor('join:table:join01', None, None) + self.session.join(jc, c0, 'compare=ge') + self.session.join(jc, c1, 'compare=ge') + self.session.close() + self.session = None + + def test_cursor_close1(self): + self.cursor_close_common(True) + + def test_cursor_close2(self): + self.cursor_close_common(False) + +if __name__ == '__main__': + wttest.run() diff --git a/test/suite/test_join02.py b/test/suite/test_join02.py new file mode 100644 index 00000000000..16e1fff56df --- /dev/null +++ b/test/suite/test_join02.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2015 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import wiredtiger, wttest, suite_random +from wtscenario import check_scenarios, multiply_scenarios, number_scenarios + +# test_join02.py +# Join operations +# Join several indices together, trying all comparison combinations +class test_join02(wttest.WiredTigerTestCase): + table_name1 = 'test_join02' + nentries = 1000 + + keyscen = [ + ('key-r', dict(keyformat='r')), + ('key-S', dict(keyformat='S')), + ('key-i', dict(keyformat='i')), + ('key-iS', dict(keyformat='iS')) + ] + + bloomscen = [ + ('bloom', dict(usebloom=True)), + ('nobloom', dict(usebloom=False)) + ] + + scenarios = number_scenarios(multiply_scenarios('.', keyscen, bloomscen)) + + # Start our range from 1, since WT record numbers start at 1, + # it makes things work out nicer. + def range(self): + return range(1, self.nentries + 1) + + def gen_key(self, i): + if self.keyformat == 'S': + return [ 'key{:06}'.format(i) ] # zero pad so it sorts expectedly + elif self.keyformat == 'iS': + return [ i, 'key{:06}'.format(i) ] + else: + return [ i ] + + def gen_values(self, i): + s = str(i) + x = 'x' * i + rs = s[::-1] + f = int(s[0:1]) + return [i, s, x, rs, f] + + def reinit_joinconfig(self): + self.rand = suite_random.suite_random(self.seed) + self.seed += 1 + + def get_joinconfig(self): + # When we're running the bloom scenario, make it so the + # bloom filters are often shared. Make the number of + # hashes and number of bits per item so they don't always + # match up; WT should allow it. + if self.usebloom: + c = 10000 if (self.rand.rand32() % 3) != 0 else 100000 + k = 8 if (self.rand.rand32() % 10) != 0 else 10 + b = 16 if (self.rand.rand32() % 11) != 0 else 12 + return \ + ',strategy=bloom,count=' + str(c) + \ + ',bloom_bit_count=' + str(b) + \ + ',bloom_hash_count=' + str(k) + else: + return '' + + def do_join(self, jc, curleft, curright, choice, mbr): + c0 = choice[0] + if c0 == None: + return mbr + # The first join cannot use a bloom filter + if jc.first_join: + joinconfig = '' + jc.first_join = False + else: + joinconfig = self.get_joinconfig() + if c0 != None: + #self.tty('join(jc, ' + curleft.name + ' ' + c0 + + # ' ' + str(curleft.low) + ')') + curleft.reset() + curleft.set_key(*curleft.low) + self.assertEquals(0, curleft.search()) + self.session.join(jc, curleft, 'compare=' + c0 + joinconfig) + if c0 == 'eq': + mbr = mbr.intersection(curleft.eqmembers) + elif c0 == 'ge': + mbr = mbr.intersection( + set(curleft.eqmembers.union(curleft.gtmembers))) + elif c0 == 'gt': + mbr = mbr.intersection(curleft.gtmembers) + c1 = choice[1] if len(choice) > 1 else None + if c1 != None: + #self.tty('join(jc, ' + curright.name + ' ' + c1 + + # ' ' + str(curright.high) + ')') + curright.reset() + curright.set_key(*curright.high) + self.assertEquals(0, curright.search()) + self.session.join(jc, curright, 'compare=' + c1 + joinconfig) + if c1 == 'le': + mbr = mbr.intersection( + set(curright.eqmembers.union(curright.ltmembers))) + elif c1 == 'lt': + mbr = mbr.intersection(curright.ltmembers) + return mbr + + def iterate(self, jc, mbr): + #self.tty('iteration expects ' + str(len(mbr)) + + # ' entries: ' + str(mbr)) + while jc.next() == 0: + keys = jc.get_keys() + [v0,v1,v2,v3,v4] = jc.get_values() + k0 = keys[0] + k1 = keys[1] if len(keys) > 1 else None + if self.keyformat == 'S': + i = int(str(k0[3:])) + elif self.keyformat == 'iS': + i = k0 + self.assertEquals(i, int(str(k1[3:]))) + else: + i = k0 + #self.tty(' iteration got key: ' + str(k0) + ',' + str(k1)) + #self.tty(' iteration got values: ' + str([v0,v1,v2,v3,v4])) + #self.tty(' iteration expects values: ' + str(self.gen_values(i))) + self.assertEquals(self.gen_values(i), [v0,v1,v2,v3,v4]) + if not i in mbr: + self.tty(' result ' + str(i) + ' is not in: ' + str(mbr)) + self.assertTrue(i in mbr) + mbr.remove(i) + self.assertEquals(0, len(mbr)) + + def mkmbr(self, expr): + return frozenset([x for x in self.range() if expr(x)]) + + def test_basic_join(self): + self.seed = 1 + if self.keyformat == 'iS': + keycols = 'k0,k1' + else: + keycols = 'k' + self.session.create('table:join02', 'key_format=' + self.keyformat + + ',value_format=iSuSi,columns=(' + keycols + + ',v0,v1,v2,v3,v4)') + self.session.create('index:join02:index0','columns=(v0)') + self.session.create('index:join02:index1','columns=(v1)') + self.session.create('index:join02:index2','columns=(v2)') + self.session.create('index:join02:index3','columns=(v3)') + self.session.create('index:join02:index4','columns=(v4)') + c = self.session.open_cursor('table:join02', None, None) + for i in self.range(): + c.set_key(*self.gen_key(i)) + c.set_value(*self.gen_values(i)) + c.insert() + c.close() + + # Use the primary table in one of the joins. + c0a = self.session.open_cursor('table:join02', None, None) + c0b = self.session.open_cursor('table:join02', None, None) + c1a = self.session.open_cursor('index:join02:index1', None, None) + c1b = self.session.open_cursor('index:join02:index1', None, None) + c2a = self.session.open_cursor('index:join02:index2', None, None) + c2b = self.session.open_cursor('index:join02:index2', None, None) + c3a = self.session.open_cursor('index:join02:index3', None, None) + c3b = self.session.open_cursor('index:join02:index3', None, None) + c4a = self.session.open_cursor('index:join02:index4', None, None) + + # Attach extra properties to each cursor. For cursors that + # may appear on the 'left' side of a range CA < x < CB, + # we give a low value of the range, and calculate the members + # of the set we expect to see for a 'gt' comparison, as well + # as the 'eq' comparison. For cursors that appear on the + # 'right side of the range, we give a high value of the range, + # and calculate membership sets for 'lt' and 'eq'. + # + # We've defined the low/high values so that there's a lot of + # overlap between the values when we're doing ranges. + c0a.name = 'c0a' + c0b.name = 'c0b' + if self.keyformat == 'i' or self.keyformat == 'r': + c0a.low = [ 205 ] + c0b.high = [ 990 ] + elif self.keyformat == 'S': + c0a.low = [ 'key000205' ] + c0b.high = [ 'key000990' ] + elif self.keyformat == 'iS': + c0a.low = [ 205, 'key000205' ] + c0b.high = [ 990, 'key000990' ] + c0a.gtmembers = self.mkmbr(lambda x: x > 205) + c0a.eqmembers = self.mkmbr(lambda x: x == 205) + c0b.ltmembers = self.mkmbr(lambda x: x < 990) + c0b.eqmembers = self.mkmbr(lambda x: x == 990) + + c1a.low = [ '150' ] + c1a.gtmembers = self.mkmbr(lambda x: str(x) > '150') + c1a.eqmembers = self.mkmbr(lambda x: str(x) == '150') + c1a.name = 'c1a' + c1b.high = [ '733' ] + c1b.ltmembers = self.mkmbr(lambda x: str(x) < '733') + c1b.eqmembers = self.mkmbr(lambda x: str(x) == '733') + c1b.name = 'c1b' + + c2a.low = [ 'x' * 321 ] + c2a.gtmembers = self.mkmbr(lambda x: x > 321) + c2a.eqmembers = self.mkmbr(lambda x: x == 321) + c2a.name = 'c2a' + c2b.high = [ 'x' * 765 ] + c2b.ltmembers = self.mkmbr(lambda x: x < 765) + c2b.eqmembers = self.mkmbr(lambda x: x == 765) + c2b.name = 'c2b' + + c3a.low = [ '432' ] + c3a.gtmembers = self.mkmbr(lambda x: str(x)[::-1] > '432') + c3a.eqmembers = self.mkmbr(lambda x: str(x)[::-1] == '432') + c3a.name = 'c3a' + c3b.high = [ '876' ] + c3b.ltmembers = self.mkmbr(lambda x: str(x)[::-1] < '876') + c3b.eqmembers = self.mkmbr(lambda x: str(x)[::-1] == '876') + c3b.name = 'c3b' + + c4a.low = [ 4 ] + c4a.gtmembers = self.mkmbr(lambda x: str(x)[0:1] > '4') + c4a.eqmembers = self.mkmbr(lambda x: str(x)[0:1] == '4') + c4a.name = 'c4a' + + choices = [[None], ['eq'], ['ge'], ['gt'], [None, 'le'], [None, 'lt'], + ['ge', 'le' ], ['ge', 'lt' ], ['gt', 'le' ], ['gt', 'lt' ]] + smallchoices = [[None], ['eq'], ['ge'], ['gt', 'le' ]] + for i0 in smallchoices: + for i1 in choices: + for i2 in smallchoices: + for i3 in smallchoices: + for i4 in [[None], ['eq'], ['ge'], ['gt']]: + if i0[0] == None and i1[0] == None and \ + i2[0] == None and i3[0] == None and \ + i4[0] == None: + continue + self.reinit_joinconfig() + #self.tty('Begin test: ' + + # ','.join([str(i0),str(i1),str(i2), + # str(i3),str(i4)])) + jc = self.session.open_cursor('join:table:join02', + None, None) + jc.first_join = True + mbr = set(self.range()) + + # It shouldn't matter the order of the joins + mbr = self.do_join(jc, c3a, c3b, i3, mbr) + mbr = self.do_join(jc, c2a, c2b, i2, mbr) + mbr = self.do_join(jc, c4a, None, i4, mbr) + mbr = self.do_join(jc, c1a, c1b, i1, mbr) + mbr = self.do_join(jc, c0a, c0b, i0, mbr) + self.iterate(jc, mbr) + jc.close() + c0a.close() + c0b.close() + c1a.close() + c1b.close() + c2a.close() + c2b.close() + c3a.close() + c3b.close() + c4a.close() + +if __name__ == '__main__': + wttest.run() diff --git a/test/suite/test_join03.py b/test/suite/test_join03.py new file mode 100644 index 00000000000..552e3b41748 --- /dev/null +++ b/test/suite/test_join03.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2015 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import os +import wiredtiger, wttest, run +from wtscenario import check_scenarios, multiply_scenarios, number_scenarios + +# test_join03.py +# Join operations +# Joins with a custom extractor +class test_join03(wttest.WiredTigerTestCase): + table_name1 = 'test_join03' + nentries = 100 + + # Return the wiredtiger_open extension argument for a shared library. + def extensionArg(self, exts): + extfiles = [] + for ext in exts: + (dirname, name, libname) = ext + if name != None and name != 'none': + testdir = os.path.dirname(__file__) + extdir = os.path.join(run.wt_builddir, 'ext', dirname) + extfile = os.path.join( + extdir, name, '.libs', 'libwiredtiger_' + libname + '.so') + if not os.path.exists(extfile): + self.skipTest('extension "' + extfile + '" not built') + if not extfile in extfiles: + extfiles.append(extfile) + if len(extfiles) == 0: + return '' + else: + return ',extensions=["' + '","'.join(extfiles) + '"]' + + # Override WiredTigerTestCase, we have extensions. + def setUpConnectionOpen(self, dir): + extarg = self.extensionArg([('extractors', 'csv', 'csv_extractor')]) + connarg = 'create,error_prefix="{0}: ",{1}'.format( + self.shortid(), extarg) + conn = wiredtiger.wiredtiger_open(dir, connarg) + self.pr(`conn`) + return conn + + def gen_key(self, i): + return [ i + 1 ] + + def gen_values(self, i): + s = str(i) + rs = s[::-1].lstrip('0') + return [ s + ',' + rs ] + + # Common function for testing iteration of join cursors + def iter_common(self, jc): + mbr = set([62, 63, 72, 73, 82, 83, 92, 93]) + while jc.next() == 0: + [k] = jc.get_keys() + i = k - 1 + [v] = jc.get_values() + self.assertEquals(self.gen_values(i), [v]) + if not i in mbr: + self.tty(' result ' + str(i) + ' is not in: ' + str(mbr)) + self.assertTrue(i in mbr) + mbr.remove(i) + self.assertEquals(0, len(mbr)) + + # Common function for testing the most basic functionality + # of joins + def join(self, csvformat, args0, args1): + self.session.create('table:join03', 'key_format=r' + + ',value_format=S,columns=(k,v)') + fmt = csvformat[0] + self.session.create('index:join03:index0','key_format=' + fmt + ',' + + 'extractor=csv,app_metadata={"format" : "' + + fmt + '","field" : "0"}') + fmt = csvformat[1] + self.session.create('index:join03:index1','key_format=' + fmt + ',' + + 'extractor=csv,app_metadata={"format" : "' + + fmt + '","field" : "1"}') + + c = self.session.open_cursor('table:join03', None, None) + for i in range(0, self.nentries): + c.set_key(*self.gen_key(i)) + c.set_value(*self.gen_values(i)) + c.insert() + c.close() + + jc = self.session.open_cursor('join:table:join03', None, None) + + # All the numbers 0-99 whose string representation + # sort >= '60' and whose reverse string representation + # is in '20' < x < '40'. That is: [62, 63, 72, 73, 82, 83, 92, 93] + c0 = self.session.open_cursor('index:join03:index0', None, None) + if csvformat[0] == 'S': + c0.set_key('60') + else: + c0.set_key(60) + self.assertEquals(0, c0.search()) + self.session.join(jc, c0, 'compare=ge' + args0) + + c1a = self.session.open_cursor('index:join03:index1', None, None) + if csvformat[1] == 'S': + c1a.set_key('21') + else: + c1a.set_key(21) + self.assertEquals(0, c1a.search()) + self.session.join(jc, c1a, 'compare=gt' + args1) + + c1b = self.session.open_cursor('index:join03:index1', None, None) + if csvformat[1] == 'S': + c1b.set_key('41') + else: + c1b.set_key(41) + self.assertEquals(0, c1b.search()) + self.session.join(jc, c1b, 'compare=lt' + args1) + + # Iterate, and make sure that reset allows us to iterate again. + self.iter_common(jc) + + jc.close() + c1a.close() + c1b.close() + c0.close() + self.session.drop('table:join03') + + # Test joins using CSV fields that are interpreted as different types + # to make sure all the extractor plumbing used in joins is working. + def test_join(self): + for extraargs in [ '', ',strategy=bloom,count=1000' ]: + for csvformat in [ 'SS', 'ii', 'Si', 'iS' ]: + self.join(csvformat, '', extraargs) + + +if __name__ == '__main__': + wttest.run() -- cgit v1.2.1 From 2770eb0749ec395c7937154798ef20e30ad58fcb Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Tue, 3 Nov 2015 13:36:06 -0500 Subject: WT-1315. Added java support for join cursors. --- lang/java/java_doc.i | 1 + 1 file changed, 1 insertion(+) diff --git a/lang/java/java_doc.i b/lang/java/java_doc.i index 75c14dbfe8f..17317ab875b 100644 --- a/lang/java/java_doc.i +++ b/lang/java/java_doc.i @@ -33,6 +33,7 @@ COPYDOC(__wt_session, WT_SESSION, open_cursor) COPYDOC(__wt_session, WT_SESSION, create) COPYDOC(__wt_session, WT_SESSION, compact) COPYDOC(__wt_session, WT_SESSION, drop) +COPYDOC(__wt_session, WT_SESSION, join) COPYDOC(__wt_session, WT_SESSION, log_flush) COPYDOC(__wt_session, WT_SESSION, log_printf) COPYDOC(__wt_session, WT_SESSION, rename) -- cgit v1.2.1 From d46c3bf7b1bed11da473d351847b915bda1bb83f Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Tue, 3 Nov 2015 13:36:26 -0500 Subject: WT-1315. Added C and java examples for join cursors. --- examples/c/ex_schema.c | 39 +++++++++++++++++++- .../java/com/wiredtiger/examples/ex_schema.java | 43 ++++++++++++++++++++-- 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/examples/c/ex_schema.c b/examples/c/ex_schema.c index 8b74500acd3..dabb568129e 100644 --- a/examples/c/ex_schema.c +++ b/examples/c/ex_schema.c @@ -49,12 +49,16 @@ typedef struct { static POP_RECORD pop_data[] = { { "AU", 1900, 4000000 }, + { "AU", 1950, 8267337 }, { "AU", 2000, 19053186 }, { "CAN", 1900, 5500000 }, + { "CAN", 1950, 14011422 }, { "CAN", 2000, 31099561 }, { "UK", 1900, 369000000 }, + { "UK", 1950, 50127000 }, { "UK", 2000, 59522468 }, { "USA", 1900, 76212168 }, + { "USA", 1950, 150697361 }, { "USA", 2000, 301279593 }, { "", 0, 0 } }; @@ -65,7 +69,7 @@ main(void) { POP_RECORD *p; WT_CONNECTION *conn; - WT_CURSOR *cursor; + WT_CURSOR *cursor, *cursor2, *join_cursor; WT_SESSION *session; const char *country; uint64_t recno, population; @@ -321,6 +325,39 @@ main(void) /*! [Access only the index] */ ret = cursor->close(cursor); + /*! [Join cursors] */ + /* Open cursors needed by the join. */ + ret = session->open_cursor(session, + "join:table:poptable", NULL, NULL, &join_cursor); + ret = session->open_cursor(session, + "index:poptable:country", NULL, NULL, &cursor); + ret = session->open_cursor(session, + "index:poptable:immutable_year", NULL, NULL, &cursor2); + + /* select values WHERE country == "AU" AND year > 1900 */ + cursor->set_key(cursor, "AU\0\0\0"); + ret = cursor->search(cursor); + ret = session->join(session, join_cursor, cursor, + "compare=eq,count=10"); + cursor2->set_key(cursor2, (uint16_t)1900); + ret = cursor2->search(cursor2); + ret = session->join(session, join_cursor, cursor2, + "compare=gt,count=10,strategy=bloom"); + + /* List the values that are joined */ + while ((ret = join_cursor->next(join_cursor)) == 0) { + ret = join_cursor->get_key(join_cursor, &recno); + ret = join_cursor->get_value(join_cursor, &country, &year, + &population); + printf("ID %" PRIu64, recno); + printf(": country %s, year %u, population %" PRIu64 "\n", + country, year, population); + } + /*! [Join cursors] */ + ret = join_cursor->close(join_cursor); + ret = cursor2->close(cursor2); + ret = cursor->close(cursor); + ret = conn->close(conn, NULL); return (ret); diff --git a/examples/java/com/wiredtiger/examples/ex_schema.java b/examples/java/com/wiredtiger/examples/ex_schema.java index 5b849ecf430..ba15db62a14 100644 --- a/examples/java/com/wiredtiger/examples/ex_schema.java +++ b/examples/java/com/wiredtiger/examples/ex_schema.java @@ -57,12 +57,16 @@ public class ex_schema { popData = new ArrayList(); popData.add(new PopRecord("AU", (short)1900, 4000000 )); + popData.add(new PopRecord("AU", (short)1950, 8267337 )); popData.add(new PopRecord("AU", (short)2000, 19053186 )); popData.add(new PopRecord("CAN", (short)1900, 5500000 )); + popData.add(new PopRecord("CAN", (short)1950, 14011422 )); popData.add(new PopRecord("CAN", (short)2000, 31099561 )); popData.add(new PopRecord("UK", (short)1900, 369000000 )); + popData.add(new PopRecord("UK", (short)1950, 50127000 )); popData.add(new PopRecord("UK", (short)2000, 59522468 )); popData.add(new PopRecord("USA", (short)1900, 76212168 )); + popData.add(new PopRecord("USA", (short)1950, 150697361 )); popData.add(new PopRecord("USA", (short)2000, 301279593 )); }; /*! [schema declaration] */ @@ -72,7 +76,7 @@ public class ex_schema { throws WiredTigerException { Connection conn; - Cursor cursor; + Cursor cursor, cursor2, join_cursor; Session session; String country; long recno, population; @@ -206,7 +210,7 @@ public class ex_schema { * for a particular country. */ cursor = session.open_cursor("colgroup:poptable:main", null, null); - cursor.putKeyLong(2); + cursor.putKeyRecord(2); if ((ret = cursor.search()) == 0) { country = cursor.getValueString(); year = cursor.getValueShort(); @@ -223,7 +227,7 @@ public class ex_schema { * population of a particular country. */ cursor = session.open_cursor("colgroup:poptable:population", null, null); - cursor.putKeyLong(2); + cursor.putKeyRecord(2); if ((ret = cursor.search()) == 0) { population = cursor.getValueLong(); System.out.println("ID 2: population " + population); @@ -335,6 +339,39 @@ public class ex_schema { /*! [Access only the index] */ ret = cursor.close(); + /*! [Join cursors] */ + /* Open cursors needed by the join. */ + join_cursor = session.open_cursor( + "join:table:poptable", null, null); + cursor = session.open_cursor( + "index:poptable:country", null, null); + cursor2 = session.open_cursor( + "index:poptable:immutable_year", null, null); + + /* select values WHERE country == "AU" AND year > 1900 */ + cursor.putKeyString("AU"); + ret = cursor.search(); + session.join(join_cursor, cursor, "compare=eq,count=10"); + cursor2.putKeyShort((short)1900); + ret = cursor2.search(); + session.join(join_cursor, cursor2, + "compare=gt,count=10,strategy=bloom"); + + /* List the values that are joined */ + while ((ret = join_cursor.next()) == 0) { + recno = join_cursor.getKeyRecord(); + country = join_cursor.getValueString(); + year = join_cursor.getValueShort(); + population = join_cursor.getValueLong(); + System.out.print("ID " + recno); + System.out.println( ": country " + country + ", year " + year + + ", population " + population); + } + /*! [Join cursors] */ + ret = join_cursor.close(); + ret = cursor2.close(); + ret = cursor.close(); + ret = conn.close(null); return (ret); -- cgit v1.2.1 From 10f881b0116109ec30c2b6b4b52d2bc1d216d2e3 Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Tue, 3 Nov 2015 13:36:45 -0500 Subject: WT-1315. Added doc for join cursors, including java doc. --- src/docs/cursor-join.dox | 19 +++++++++++++++++++ src/docs/cursors.dox | 1 + src/docs/data-sources.dox | 4 ++++ src/docs/programming.dox | 1 + src/docs/spell.ok | 3 +++ 5 files changed, 28 insertions(+) create mode 100644 src/docs/cursor-join.dox diff --git a/src/docs/cursor-join.dox b/src/docs/cursor-join.dox new file mode 100644 index 00000000000..51da6b174bf --- /dev/null +++ b/src/docs/cursor-join.dox @@ -0,0 +1,19 @@ +/*! @m_page{{c,java},cursor_join,Join cursors} + +Join cursors provide a way to iterate over a subset of a table, where the subset is specified by relationships with reference cursors. + +A join cursor is created with WT_SESSION::open_cursor using a \c +"join:table:" URI prefix. Then reference cursors are positioned to +keys on indices and joined to the join cursor using WT_SESSION::join calls. +The result is a join cursor that can be iterated to satisfy the join +equation. + +Here is an example using join cursors: + +@snippet ex_schema.c Join cursors + +Joins support various comparison operators: \c "eq", \c "gt", \c "ge", \c "lt", \c "le". Ranges with lower and upper bounds can also be specified, by joining two cursors on the same index, for example, one with \c "compare=ge" and another \c "compare=lt". In addition to joining indices, the main table can be joined so that a range of primary keys can be specified. + +All the joins should be done on the join cursor before WT_CURSOR::next is called. Calling WT_CURSOR::next on a join cursor for the first time populates any bloom filters and performs other initialization. The join cursor's key is the primary key (the key for the main table), and its value is the entire set of values of the main table. A join cursor can be created with a projection by appending \c "(col1,col2,...)" to the URI if a different set of values is needed. + +*/ diff --git a/src/docs/cursors.dox b/src/docs/cursors.dox index c9455e2976c..b6271951f91 100644 --- a/src/docs/cursors.dox +++ b/src/docs/cursors.dox @@ -19,6 +19,7 @@ See the following for more details: - @subpage data_sources - @ref metadata - @ref cursor_log +- @ref cursor_join @section cursor_projections Projections diff --git a/src/docs/data-sources.dox b/src/docs/data-sources.dox index 0c446b2bf78..d09d1cbc1b8 100644 --- a/src/docs/data-sources.dox +++ b/src/docs/data-sources.dox @@ -13,6 +13,10 @@ The following are the builtin basic cursor types: index cursor, key=index key\, value=table value(s) with optional projection of columns} +@row{join:table:\[\], + join cursor, + key=table key\, value=table value(s) with optional projection + of columns}
Some administrative tasks can be accomplished using the following special cursor types that give access to data managed by WiredTiger: diff --git a/src/docs/programming.dox b/src/docs/programming.dox index d99c34d1da2..f005f6d3e2d 100644 --- a/src/docs/programming.dox +++ b/src/docs/programming.dox @@ -43,6 +43,7 @@ each of which is ordered by one or more columns. - @ref transaction_named_snapshots - @subpage shared_cache - @subpage statistics +- @subpage cursor_join - @subpage cursor_log - @subpage_single upgrade diff --git a/src/docs/spell.ok b/src/docs/spell.ok index b887f0ceee2..86af82d8fd2 100644 --- a/src/docs/spell.ok +++ b/src/docs/spell.ok @@ -197,6 +197,7 @@ endinternal english env eof +eq erlang errno exe @@ -220,6 +221,7 @@ freelist fsync gcc gdbm +ge getKey getValue getopt @@ -260,6 +262,7 @@ keyvalue kvs lang lastname +le len leveldb li -- cgit v1.2.1 From 083970f4933b2695ca0cfc86c86318d66cca4e05 Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Wed, 4 Nov 2015 10:30:42 -0500 Subject: WT-1315. Sorted s_string.ok, fixed s_style error. --- dist/s_string.ok | 2 +- src/include/wiredtiger.in | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dist/s_string.ok b/dist/s_string.ok index 7ff24a5080c..817893508aa 100644 --- a/dist/s_string.ok +++ b/dist/s_string.ok @@ -723,8 +723,8 @@ mem memalign membar memcpy -memmove memget +memmove memset memsize metaconf diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in index b2af4cce70c..c0d4dd72577 100644 --- a/src/include/wiredtiger.in +++ b/src/include/wiredtiger.in @@ -1247,7 +1247,7 @@ struct __wt_session { * other than other join calls. * @param ref_cursor either an index cursor having the same base table * as the join_cursor, or a table cursor open on the same base table. - * The ref_cursor must be positioned. + * The ref_cursor must be positioned. * * The ref_cursor limits the results seen by iterating the * join_cursor to table items referred to by the key in this -- cgit v1.2.1 From 86cf45711dde7ec8a2de42be5f930d57002d6bd8 Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Wed, 4 Nov 2015 11:22:11 -0500 Subject: WT-1315. Fixed compile warning. --- src/cursor/cur_join.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index fdc741d0dbd..abf0e4ed6d7 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -607,7 +607,7 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_CURSOR_JOIN_ENTRY *entry, bool skip_left) { WT_CURJOIN_EXTRACTOR extract_cursor; - WT_CURSOR *main; + WT_CURSOR *c; WT_CURSOR_STATIC_INIT(iface, __wt_cursor_get_key, /* get-key */ __wt_cursor_get_value, /* get-value */ @@ -651,14 +651,14 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, } if (entry->index != NULL) { - main = entry->main; - main->set_key(main, key); - if ((ret = main->search(main)) == 0) - ret = main->get_value(main, &v); + c = entry->main; + c->set_key(c, key); + if ((ret = c->search(c)) == 0) + ret = c->get_value(c, &v); else if (ret == WT_NOTFOUND) WT_ERR_MSG(session, WT_ERROR, "main table for join is missing entry."); - main->reset(main); + c->reset(c); WT_ERR(ret); } else v = *key; -- cgit v1.2.1 From 6efd6f7e9016fbac97cb4f676a3c01c327669c9c Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Wed, 4 Nov 2015 12:00:33 -0500 Subject: WT-1315. Compiler warnings and clean up. --- src/cursor/cur_join.c | 40 +++++++++++++--------------------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index abf0e4ed6d7..2e27f6b3a45 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -24,8 +24,7 @@ __curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, session, WT_SESSION_open_cursor), "raw", NULL }; const char *def_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), NULL }; - const char *uri; - const char **config; + const char *uri, **config; char *uribuf; WT_CURSOR_JOIN_ITER *iter; size_t size; @@ -265,14 +264,12 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, const char *raw_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), "raw", NULL }; const char *mainkey_str, *p; - const void *buf; int cmp, mainkey_len; size_t size; u_int i; void *allocbuf; c = NULL; - buf = NULL; allocbuf = NULL; if (entry->index != NULL) { @@ -421,7 +418,6 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) { WT_BLOOM *bloom; WT_DECL_RET; - WT_CURSOR *to_dup; WT_CURSOR_JOIN_ENTRY *je, *jeend, *je2; uint64_t k, m; @@ -433,17 +429,11 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) je = &cjoin->entries[0]; WT_ERR(__curjoin_entry_iter_init(session, cjoin, je, &cjoin->iter)); - if (je->ends[0].cursor != NULL) - to_dup = je->ends[0].cursor; - else - to_dup = je->ends[1].cursor; jeend = &cjoin->entries[cjoin->entries_next]; for (je = cjoin->entries; je < jeend; je++) { - WT_ERR(__curjoin_endpoint_init_key(session, je, - &je->ends[0])); - WT_ERR(__curjoin_endpoint_init_key(session, je, - &je->ends[1])); + WT_ERR(__curjoin_endpoint_init_key(session, je, &je->ends[0])); + WT_ERR(__curjoin_endpoint_init_key(session, je, &je->ends[1])); /* * The first entry is iterated as the 'outermost' cursor. @@ -515,16 +505,10 @@ err: return (ret); } -typedef struct { - WT_CURSOR iface; - WT_CURSOR_JOIN_ENTRY *entry; - int ismember; -} WT_CURJOIN_EXTRACTOR; - /* * __curjoin_entry_in_range -- - * Check if a key is in the range specified by the entry. - * Return WT_NOTFOUND if not. + * Check if a key is in the range specified by the entry, returning + * WT_NOTFOUND if not. */ static int __curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, @@ -557,6 +541,12 @@ __curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, err: return (ret); } +typedef struct { + WT_CURSOR iface; + WT_CURSOR_JOIN_ENTRY *entry; + int ismember; +} WT_CURJOIN_EXTRACTOR; + /* * __curjoin_extract_insert -- * Handle a key produced by a custom extractor. @@ -588,10 +578,9 @@ __curjoin_extract_insert(WT_CURSOR *cursor) { --ikey.size; ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, 0); - if (ret == WT_NOTFOUND) { - cextract->ismember = 0; + if (ret == WT_NOTFOUND) ret = 0; - } else + else cextract->ismember = 1; return (ret); @@ -627,10 +616,8 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, __wt_cursor_notsup); /* close */ WT_DECL_RET; WT_INDEX *index; - WT_SESSION *wtsession; WT_ITEM *key, v; - wtsession = (WT_SESSION *)session; key = cjoin->iter->curkey; if (entry->bloom != NULL) { @@ -649,7 +636,6 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, */ WT_ERR(__wt_bloom_inmem_get(entry->bloom, key)); } - if (entry->index != NULL) { c = entry->main; c->set_key(c, key); -- cgit v1.2.1 From 4a7f74260e83934e5d338430e8539d601aac3b2e Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Wed, 4 Nov 2015 13:13:59 -0500 Subject: WT-1315. Remove dependency from extension to WT library, it's not portable. --- ext/extractors/csv/csv_extractor.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ext/extractors/csv/csv_extractor.c b/ext/extractors/csv/csv_extractor.c index 2c01271fd02..8d50cc7ec5d 100644 --- a/ext/extractors/csv/csv_extractor.c +++ b/ext/extractors/csv/csv_extractor.c @@ -123,6 +123,7 @@ csv_customize(WT_EXTRACTOR *extractor, WT_SESSION *session, CSV_EXTRACTOR *csv_extractor; WT_CONFIG_ITEM field, format; WT_CONFIG_PARSER *parser; + WT_EXTENSION_API *wtapi; int ret; long field_num; @@ -130,7 +131,8 @@ csv_customize(WT_EXTRACTOR *extractor, WT_SESSION *session, (void)uri; /* Unused parameters */ orig = (const CSV_EXTRACTOR *)extractor; - if ((ret = wiredtiger_config_parser_open(session, appcfg->str, + wtapi = orig->wt_api; + if ((ret = wtapi->config_parser_open(wtapi, session, appcfg->str, appcfg->len, &parser)) != 0) return (ret); if ((ret = parser->get(parser, "field", &field)) != 0 || -- cgit v1.2.1 From 778c16f19dc1c8612974b72c1a41e43a7d83f15b Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Tue, 10 Nov 2015 10:43:48 -0500 Subject: WT-1315. Changed names of macros and inline functions to be more clear and consistent. --- src/cursor/cur_index.c | 2 +- src/cursor/cur_join.c | 83 ++++++++++++++++++++++++----------------------- src/cursor/cur_table.c | 11 +++---- src/include/api.h | 8 +++++ src/include/cursor.h | 18 +++++----- src/include/cursor.i | 8 ++--- src/session/session_api.c | 14 ++++---- 7 files changed, 76 insertions(+), 68 deletions(-) diff --git a/src/cursor/cur_index.c b/src/cursor/cur_index.c index 5819bb04cf2..a909eaece99 100644 --- a/src/cursor/cur_index.c +++ b/src/cursor/cur_index.c @@ -35,7 +35,7 @@ __curindex_get_value(WT_CURSOR *cursor, ...) va_start(ap, cursor); JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL); - WT_ERR(__wt_curindex_get_value_ap(cursor, ap)); + WT_ERR(__wt_curindex_get_valuev(cursor, ap)); err: va_end(ap); API_END_RET(session, ret); diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index 2e27f6b3a45..f2f482b4b3a 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -201,7 +201,7 @@ __curjoin_get_key(WT_CURSOR *cursor, ...) va_start(ap, cursor); CURSOR_API_CALL(cursor, session, get_key, NULL); - if (!F_ISSET(cjoin, WT_CJ_INITIALIZED) || + if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) || !__curjoin_entry_iter_ready(cjoin->iter)) { __wt_errx(session, "join cursor must be advanced with next()"); WT_ERR(EINVAL); @@ -231,15 +231,15 @@ __curjoin_get_value(WT_CURSOR *cursor, ...) va_start(ap, cursor); CURSOR_API_CALL(cursor, session, get_value, NULL); - if (!F_ISSET(cjoin, WT_CJ_INITIALIZED) || + if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) || !__curjoin_entry_iter_ready(iter)) { __wt_errx(session, "join cursor must be advanced with next()"); WT_ERR(EINVAL); } if (iter->entry->index != NULL) - WT_ERR(__wt_curindex_get_value_ap(iter->cursor, ap)); + WT_ERR(__wt_curindex_get_valuev(iter->cursor, ap)); else - WT_ERR(__wt_curtable_get_value_ap(iter->cursor, ap)); + WT_ERR(__wt_curtable_get_valuev(iter->cursor, ap)); err: va_end(ap); API_END_RET(session, ret); @@ -304,7 +304,7 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_ERR(__wt_cursor_dup_position(entry->ends[0].cursor, c)); skip_left = (entry->ends[0].cursor == NULL) || - entry->ends[0].flags == (WT_CJE_ENDPOINT_GT | WT_CJE_ENDPOINT_EQ); + entry->ends[0].flags == (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ); collator = (entry->index == NULL) ? NULL : entry->index->collator; while (ret == 0) { c->get_key(c, &curkey); @@ -327,11 +327,11 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_ERR(__wt_compare(session, collator, &curkey, &entry->ends[0].key, &cmp)); if (cmp < 0 || (cmp == 0 && - !F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_EQ))) + !F_ISSET(&entry->ends[0], WT_CURJOIN_END_EQ))) goto advance; if (cmp > 0) { if (F_ISSET(&entry->ends[0], - WT_CJE_ENDPOINT_GT)) + WT_CURJOIN_END_GT)) skip_left = true; else break; @@ -341,7 +341,7 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_ERR(__wt_compare(session, collator, &curkey, &entry->ends[1].key, &cmp)); if (cmp > 0 || (cmp == 0 && - !F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_EQ))) + !F_ISSET(&entry->ends[1], WT_CURJOIN_END_EQ))) break; } if (entry->index != NULL) @@ -387,7 +387,7 @@ __curjoin_endpoint_init_key(WT_SESSION_IMPL *session, &cindex->child->key, &endpoint->key, &allocbuf)); if (allocbuf != NULL) - F_SET(endpoint, WT_CJE_ENDPOINT_OWNKEY); + F_SET(endpoint, WT_CURJOIN_END_OWNKEY); } else endpoint->key = cindex->child->key; } else { @@ -442,10 +442,10 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) * the btree is ordered. */ if (je == cjoin->entries && je->ends[0].flags == - (WT_CJE_ENDPOINT_GT | WT_CJE_ENDPOINT_EQ)) - F_SET(cjoin, WT_CJ_SKIP_FIRST_LEFT); + (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ)) + F_SET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT); - if (F_ISSET(je, WT_CJE_BLOOM)) { + if (F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) { if (je->bloom == NULL) { /* * Look for compatible filters to be shared, @@ -455,7 +455,8 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) m = je->bloom_bit_count; k = je->bloom_hash_count; for (je2 = je + 1; je2 < jeend; je2++) - if (F_ISSET(je2, WT_CJE_BLOOM) && + if (F_ISSET(je2, + WT_CURJOIN_ENTRY_BLOOM) && je2->count == je->count) { m = WT_MAX( je2->bloom_bit_count, m); @@ -466,7 +467,7 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) je->bloom_hash_count = k; WT_ERR(__wt_bloom_create(session, NULL, NULL, je->count, m, k, &je->bloom)); - F_SET(je, WT_CJE_OWN_BLOOM); + F_SET(je, WT_CURJOIN_ENTRY_OWN_BLOOM); WT_ERR(__curjoin_init_bloom(session, cjoin, je, je->bloom)); /* @@ -474,7 +475,8 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) * config info consistent. */ for (je2 = je + 1; je2 < jeend; je2++) - if (F_ISSET(je2, WT_CJE_BLOOM) && + if (F_ISSET(je2, + WT_CURJOIN_ENTRY_BLOOM) && je2->count == je->count) { WT_ASSERT(session, je2->bloom == NULL); @@ -499,7 +501,7 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) } } } - F_SET(cjoin, WT_CJ_INITIALIZED); + F_SET(cjoin, WT_CURJOIN_INITIALIZED); err: return (ret); @@ -524,8 +526,8 @@ __curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, &entry->ends[0].key, &cmp)); if (cmp < 0 || (cmp == 0 && - !F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_EQ)) || - (cmp > 0 && !F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_GT))) + !F_ISSET(&entry->ends[0], WT_CURJOIN_END_EQ)) || + (cmp > 0 && !F_ISSET(&entry->ends[0], WT_CURJOIN_END_GT))) WT_ERR(WT_NOTFOUND); } if (entry->ends[1].cursor != NULL) { @@ -533,8 +535,8 @@ __curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, &entry->ends[1].key, &cmp)); if (cmp > 0 || (cmp == 0 && - !F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_EQ)) || - (cmp < 0 && !F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_LT))) + !F_ISSET(&entry->ends[1], WT_CURJOIN_END_EQ)) || + (cmp < 0 && !F_ISSET(&entry->ends[1], WT_CURJOIN_END_LT))) WT_ERR(WT_NOTFOUND); } @@ -626,7 +628,7 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, * in a previous entry. So the shared filter has already * been checked and passed. */ - if (!F_ISSET(entry, WT_CJE_OWN_BLOOM)) + if (!F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) return (0); /* @@ -682,11 +684,11 @@ __curjoin_next(WT_CURSOR *cursor) CURSOR_API_CALL(cursor, session, next, NULL); - if (F_ISSET(cjoin, WT_CJ_ERROR)) { + if (F_ISSET(cjoin, WT_CURJOIN_ERROR)) { __wt_errx(session, "join cursor encountered previous error"); WT_ERR(WT_ERROR); } - if (!F_ISSET(cjoin, WT_CJ_INITIALIZED)) + if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED)) WT_ERR(__curjoin_init_iter(session, cjoin)); nextkey: @@ -699,7 +701,7 @@ nextkey: * 'left' case for the first entry, since we're * using that in our iteration. */ - skip_left = F_ISSET(cjoin, WT_CJ_SKIP_FIRST_LEFT); + skip_left = F_ISSET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT); for (count = 0; count < cjoin->entries_next; count++) { ret = __curjoin_entry_member(session, cjoin, &cjoin->entries[count], skip_left); @@ -711,7 +713,7 @@ nextkey: } if (0) { -err: F_SET(cjoin, WT_CJ_ERROR); +err: F_SET(cjoin, WT_CURJOIN_ERROR); } API_END_RET(session, ret); } @@ -731,7 +733,7 @@ __curjoin_reset(WT_CURSOR *cursor) CURSOR_API_CALL(cursor, session, reset, NULL); - if (F_ISSET(cjoin, WT_CJ_INITIALIZED)) + if (F_ISSET(cjoin, WT_CURJOIN_INITIALIZED)) WT_ERR(__curjoin_entry_iter_reset(cjoin->iter)); err: API_END_RET(session, ret); @@ -771,11 +773,11 @@ __curjoin_close(WT_CURSOR *cursor) F_CLR(entry->ends[0].cursor, WT_CURSTD_JOINED); if (entry->ends[1].cursor != NULL) F_CLR(entry->ends[1].cursor, WT_CURSTD_JOINED); - if (F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_OWNKEY)) + if (F_ISSET(&entry->ends[0], WT_CURJOIN_END_OWNKEY)) __wt_free(session, entry->ends[0].key.data); - if (F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_OWNKEY)) + if (F_ISSET(&entry->ends[1], WT_CURJOIN_END_OWNKEY)) __wt_free(session, entry->ends[1].key.data); - if (F_ISSET(entry, WT_CJE_OWN_BLOOM)) + if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) WT_TRET(__wt_bloom_close(entry->bloom)); } @@ -899,13 +901,13 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, break; } if (nonbloom == -1 && i > 0 && - !F_ISSET(&cjoin->entries[i], WT_CJE_BLOOM)) + !F_ISSET(&cjoin->entries[i], WT_CURJOIN_ENTRY_BLOOM)) nonbloom = i; } if (entry == NULL) { WT_ERR(__wt_realloc_def(session, &cjoin->entries_allocated, cjoin->entries_next + 1, &cjoin->entries)); - if (LF_ISSET(WT_CJE_BLOOM) && nonbloom != -1) { + if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM) && nonbloom != -1) { /* * Reorder the list so that after the first entry, * the Bloom filtered entries come next, followed by @@ -937,19 +939,20 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, count, entry->count); WT_ERR(EINVAL); } - if (LF_ISSET(WT_CJE_BLOOM) != F_ISSET(entry, WT_CJE_BLOOM)) { + if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM) != + F_ISSET(entry, WT_CURJOIN_ENTRY_BLOOM)) { __wt_errx(session, "join has incompatible strategy " "values for the same index"); WT_ERR(EINVAL); } /* Check flag combinations */ - if ((F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_GT) && - (range & WT_CJE_ENDPOINT_GT) != 0) || - (F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_LT) && - (range & WT_CJE_ENDPOINT_LT) != 0) || - ((F_ISSET(&entry->ends[0], WT_CJE_ENDPOINT_EQ) || - F_ISSET(&entry->ends[1], WT_CJE_ENDPOINT_EQ)) && - (range == WT_CJE_ENDPOINT_EQ))) { + if ((F_ISSET(&entry->ends[0], WT_CURJOIN_END_GT) && + (range & WT_CURJOIN_END_GT) != 0) || + (F_ISSET(&entry->ends[1], WT_CURJOIN_END_LT) && + (range & WT_CURJOIN_END_LT) != 0) || + ((F_ISSET(&entry->ends[0], WT_CURJOIN_END_EQ) || + F_ISSET(&entry->ends[1], WT_CURJOIN_END_EQ)) && + (range == WT_CURJOIN_END_EQ))) { __wt_errx(session, "join has overlapping ranges"); WT_ERR(EINVAL); } @@ -960,7 +963,7 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, entry->bloom_hash_count = WT_MAX(entry->bloom_hash_count, bloom_hash_count); } - if (range & WT_CJE_ENDPOINT_LT) + if (range & WT_CURJOIN_END_LT) endpoint = &entry->ends[1]; else endpoint = &entry->ends[0]; diff --git a/src/cursor/cur_table.c b/src/cursor/cur_table.c index d940f52c387..7d227e76c7b 100644 --- a/src/cursor/cur_table.c +++ b/src/cursor/cur_table.c @@ -192,7 +192,7 @@ __wt_curtable_get_value(WT_CURSOR *cursor, ...) va_start(ap, cursor); JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL); - WT_ERR(__wt_curtable_get_value_ap(cursor, ap)); + WT_ERR(__wt_curtable_get_valuev(cursor, ap)); err: va_end(ap); API_END_RET(session, ret); @@ -483,8 +483,7 @@ __curtable_insert(WT_CURSOR *cursor) u_int i; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_UPDATE_API_CALL(cursor, session, insert, NULL); - JOINABLE_CURSOR_CALL_CHECK(cursor); + JOINABLE_CURSOR_UPDATE_API_CALL(cursor, session, insert, NULL); WT_ERR(__curtable_open_indices(ctable)); /* @@ -542,8 +541,7 @@ __curtable_update(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_UPDATE_API_CALL(cursor, session, update, NULL); - JOINABLE_CURSOR_CALL_CHECK(cursor); + JOINABLE_CURSOR_UPDATE_API_CALL(cursor, session, update, NULL); WT_ERR(__curtable_open_indices(ctable)); /* @@ -594,8 +592,7 @@ __curtable_remove(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_REMOVE_API_CALL(cursor, session, NULL); - JOINABLE_CURSOR_CALL_CHECK(cursor); + JOINABLE_CURSOR_REMOVE_API_CALL(cursor, session, NULL); WT_ERR(__curtable_open_indices(ctable)); /* Find the old record so it can be removed from indices */ diff --git a/src/include/api.h b/src/include/api.h index 826d5f37a8c..4821b450f9e 100644 --- a/src/include/api.h +++ b/src/include/api.h @@ -129,6 +129,10 @@ TXN_API_CALL_NOCONF(s, WT_CURSOR, remove, cur, \ ((bt) == NULL) ? NULL : ((WT_BTREE *)(bt))->dhandle); +#define JOINABLE_CURSOR_REMOVE_API_CALL(cur, s, bt) \ + CURSOR_REMOVE_API_CALL(cur, s, bt); \ + JOINABLE_CURSOR_CALL_CHECK(cur) + #define CURSOR_UPDATE_API_CALL(cur, s, n, bt) \ (s) = (WT_SESSION_IMPL *)(cur)->session; \ TXN_API_CALL_NOCONF(s, WT_CURSOR, n, cur, \ @@ -136,6 +140,10 @@ if (F_ISSET(S2C(s), WT_CONN_IN_MEMORY) && __wt_cache_full(s)) \ WT_ERR(WT_CACHE_FULL); +#define JOINABLE_CURSOR_UPDATE_API_CALL(cur, s, n, bt) \ + CURSOR_UPDATE_API_CALL(cur, s, n, bt); \ + JOINABLE_CURSOR_CALL_CHECK(cur) + #define CURSOR_UPDATE_API_END(s, ret) \ TXN_API_END(s, ret) diff --git a/src/include/cursor.h b/src/include/cursor.h index 61ee81c96ff..d0825d307c1 100644 --- a/src/include/cursor.h +++ b/src/include/cursor.h @@ -278,10 +278,10 @@ struct __wt_cursor_join_endpoint { uint8_t recno_buf[10]; /* holds packed recno */ WT_CURSOR *cursor; -#define WT_CJE_ENDPOINT_LT 0x01 /* include values < cursor */ -#define WT_CJE_ENDPOINT_EQ 0x02 /* include values == cursor */ -#define WT_CJE_ENDPOINT_GT 0x04 /* include values > cursor */ -#define WT_CJE_ENDPOINT_OWNKEY 0x08 /* must free key's data */ +#define WT_CURJOIN_END_LT 0x01 /* include values < cursor */ +#define WT_CURJOIN_END_EQ 0x02 /* include values == cursor */ +#define WT_CURJOIN_END_GT 0x04 /* include values > cursor */ +#define WT_CURJOIN_END_OWNKEY 0x08 /* must free key's data */ uint8_t flags; /* range for this endpoint */ }; @@ -293,8 +293,8 @@ struct __wt_cursor_join_entry { uint64_t bloom_hash_count; /* hash functions in bloom */ uint64_t count; /* approx number of matches */ -#define WT_CJE_BLOOM 0x01 /* use a bloom filter */ -#define WT_CJE_OWN_BLOOM 0x02 /* this entry owns the bloom */ +#define WT_CURJOIN_ENTRY_BLOOM 0x01 /* use a bloom filter */ +#define WT_CURJOIN_ENTRY_OWN_BLOOM 0x02 /* this entry owns the bloom */ uint8_t flags; WT_CURSOR_JOIN_ENDPOINT ends[2]; /* reference endpoints */ @@ -311,9 +311,9 @@ struct __wt_cursor_join { size_t entries_next; uint8_t recno_buf[10]; /* holds packed recno */ -#define WT_CJ_ERROR 0x01 /* Error during initialization */ -#define WT_CJ_INITIALIZED 0x02 /* Successful initialization */ -#define WT_CJ_SKIP_FIRST_LEFT 0x04 +#define WT_CURJOIN_ERROR 0x01 /* Error in initialization */ +#define WT_CURJOIN_INITIALIZED 0x02 /* Successful initialization */ +#define WT_CURJOIN_SKIP_FIRST_LEFT 0x04 /* First check not needed */ uint8_t flags; }; diff --git a/src/include/cursor.i b/src/include/cursor.i index 7f90e43ed6c..9dd280534b4 100644 --- a/src/include/cursor.i +++ b/src/include/cursor.i @@ -139,11 +139,11 @@ __curfile_leave(WT_CURSOR_BTREE *cbt) } /* - * __wt_curindex_get_value_ap -- + * __wt_curindex_get_valuev -- * Internal implementation of WT_CURSOR->get_value for index cursors */ static inline int -__wt_curindex_get_value_ap(WT_CURSOR *cursor, va_list ap) +__wt_curindex_get_valuev(WT_CURSOR *cursor, va_list ap) { WT_CURSOR_INDEX *cindex; WT_DECL_RET; @@ -170,11 +170,11 @@ err: return (ret); } /* - * __wt_curtable_get_value_ap -- + * __wt_curtable_get_valuev -- * Internal implementation of WT_CURSOR->get_value for table cursors. */ static inline int -__wt_curtable_get_value_ap(WT_CURSOR *cursor, va_list ap) +__wt_curtable_get_valuev(WT_CURSOR *cursor, va_list ap) { WT_CURSOR *primary; WT_CURSOR_TABLE *ctable; diff --git a/src/session/session_api.c b/src/session/session_api.c index 4e5ee8b238c..bfa7a6d67e3 100644 --- a/src/session/session_api.c +++ b/src/session/session_api.c @@ -676,18 +676,18 @@ __session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor, } /* "ge" is the default */ - range = WT_CJE_ENDPOINT_GT | WT_CJE_ENDPOINT_EQ; + range = WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ; flags = 0; WT_ERR(__wt_config_gets(session, cfg, "compare", &cval)); if (cval.len != 0) { if (WT_STRING_MATCH("gt", cval.str, cval.len)) - range = WT_CJE_ENDPOINT_GT; + range = WT_CURJOIN_END_GT; else if (WT_STRING_MATCH("lt", cval.str, cval.len)) - range = WT_CJE_ENDPOINT_LT; + range = WT_CURJOIN_END_LT; else if (WT_STRING_MATCH("le", cval.str, cval.len)) - range = WT_CJE_ENDPOINT_LT | WT_CJE_ENDPOINT_EQ; + range = WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ; else if (WT_STRING_MATCH("eq", cval.str, cval.len)) - range = WT_CJE_ENDPOINT_EQ; + range = WT_CURJOIN_END_EQ; else if (!WT_STRING_MATCH("ge", cval.str, cval.len)) WT_ERR(EINVAL); } @@ -698,7 +698,7 @@ __session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor, WT_ERR(__wt_config_gets(session, cfg, "strategy", &cval)); if (cval.len != 0) { if (WT_STRING_MATCH("bloom", cval.str, cval.len)) - LF_SET(WT_CJE_BLOOM); + LF_SET(WT_CURJOIN_ENTRY_BLOOM); else if (!WT_STRING_MATCH("default", cval.str, cval.len)) WT_ERR(EINVAL); } @@ -706,7 +706,7 @@ __session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor, bloom_bit_count = (uint64_t)cval.val; WT_ERR(__wt_config_gets(session, cfg, "bloom_hash_count", &cval)); bloom_hash_count = (uint64_t)cval.val; - if (LF_ISSET(WT_CJE_BLOOM)) { + if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM)) { if (count == 0) { __wt_errx(session, "count must be nonzero when " "strategy=bloom"); -- cgit v1.2.1 From 962f79a091bcc062ce46eceb430ce714281ab416 Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Wed, 11 Nov 2015 08:50:30 -0500 Subject: WT-1315. Change the 'ends' array to be dynamically sized, needed for disjunctions of equality joins. --- dist/s_string.ok | 1 + src/cursor/cur_join.c | 209 ++++++++++++++++++++++++++++++-------------------- src/include/cursor.h | 11 ++- 3 files changed, 133 insertions(+), 88 deletions(-) diff --git a/dist/s_string.ok b/dist/s_string.ok index 990f1475395..61f72ebbfd1 100644 --- a/dist/s_string.ok +++ b/dist/s_string.ok @@ -513,6 +513,7 @@ dhandle dhandles dir dirlist +disjunction dlclose dlh dll diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index f2f482b4b3a..a178c5aab74 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -31,10 +31,7 @@ __curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, iter = NULL; uribuf = NULL; - if (entry->ends[0].cursor != NULL) - to_dup = entry->ends[0].cursor; - else - to_dup = entry->ends[1].cursor; + to_dup = entry->ends[0].cursor; uri = to_dup->uri; if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW)) @@ -140,18 +137,12 @@ err: return (ret); static int __curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter) { - WT_CURSOR *to_dup; - WT_CURSOR_JOIN_ENTRY *entry; WT_DECL_RET; if (iter->advance) { WT_ERR(iter->cursor->reset(iter->cursor)); - entry = &iter->cjoin->entries[0]; - if (entry->ends[0].cursor != NULL) - to_dup = entry->ends[0].cursor; - else - to_dup = entry->ends[1].cursor; - WT_ERR(__wt_cursor_dup_position(to_dup, iter->cursor)); + WT_ERR(__wt_cursor_dup_position( + iter->cjoin->entries[0].ends[0].cursor, iter->cursor)); iter->advance = false; } @@ -256,21 +247,21 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_COLLATOR *collator; WT_CURSOR *c; WT_CURSOR_INDEX *cindex; + WT_CURSOR_JOIN_ENDPOINT *end, *endmax; WT_DECL_RET; WT_ITEM curkey, curvalue, *k; WT_TABLE *maintable; - bool skip_left; char *uri; const char *raw_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), "raw", NULL }; const char *mainkey_str, *p; - int cmp, mainkey_len; - size_t size; - u_int i; + int cmp, mainkey_len, skip; + size_t i, size; void *allocbuf; c = NULL; allocbuf = NULL; + skip = 0; if (entry->index != NULL) { /* @@ -300,11 +291,15 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, snprintf(uri, size, "%s()", cjoin->table->name); } WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, raw_cfg, &c)); - if (entry->ends[0].cursor != NULL) - WT_ERR(__wt_cursor_dup_position(entry->ends[0].cursor, c)); - skip_left = (entry->ends[0].cursor == NULL) || - entry->ends[0].flags == (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ); + /* Initially position the cursor if necessary. */ + endmax = &entry->ends[entry->ends_next]; + if ((end = &entry->ends[0]) < endmax && + F_ISSET(end, WT_CURJOIN_END_GE)) { + WT_ERR(__wt_cursor_dup_position(end->cursor, c)); + if (end->flags == WT_CURJOIN_END_GE) + skip = 1; + } collator = (entry->index == NULL) ? NULL : entry->index->collator; while (ret == 0) { c->get_key(c, &curkey); @@ -323,27 +318,25 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, } else curkey = cindex->child->key; } - if (!skip_left) { + for (end = &entry->ends[skip]; end < endmax; end++) { WT_ERR(__wt_compare(session, collator, &curkey, - &entry->ends[0].key, &cmp)); - if (cmp < 0 || (cmp == 0 && - !F_ISSET(&entry->ends[0], WT_CURJOIN_END_EQ))) - goto advance; - if (cmp > 0) { - if (F_ISSET(&entry->ends[0], - WT_CURJOIN_END_GT)) - skip_left = true; - else - break; + &end->key, &cmp)); + if (!F_ISSET(end, WT_CURJOIN_END_LT)) { + if (cmp < 0 || (cmp == 0 && + !F_ISSET(end, WT_CURJOIN_END_EQ))) + goto advance; + if (cmp > 0) { + if (F_ISSET(end, WT_CURJOIN_END_GT)) + skip = 1; + else + goto done; + } + } else { + if (cmp > 0 || (cmp == 0 && + !F_ISSET(end, WT_CURJOIN_END_EQ))) + goto done; } } - if (entry->ends[1].cursor != NULL) { - WT_ERR(__wt_compare(session, collator, &curkey, - &entry->ends[1].key, &cmp)); - if (cmp > 0 || (cmp == 0 && - !F_ISSET(&entry->ends[1], WT_CURJOIN_END_EQ))) - break; - } if (entry->index != NULL) c->get_value(c, &curvalue); else @@ -353,6 +346,7 @@ advance: if ((ret = c->next(c)) == WT_NOTFOUND) break; } +done: WT_ERR_NOTFOUND_OK(ret); err: if (c != NULL) @@ -387,7 +381,7 @@ __curjoin_endpoint_init_key(WT_SESSION_IMPL *session, &cindex->child->key, &endpoint->key, &allocbuf)); if (allocbuf != NULL) - F_SET(endpoint, WT_CURJOIN_END_OWNKEY); + F_SET(endpoint, WT_CURJOIN_END_OWN_KEY); } else endpoint->key = cindex->child->key; } else { @@ -419,6 +413,7 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) WT_BLOOM *bloom; WT_DECL_RET; WT_CURSOR_JOIN_ENTRY *je, *jeend, *je2; + WT_CURSOR_JOIN_ENDPOINT *end; uint64_t k, m; if (cjoin->entries_next == 0) { @@ -432,8 +427,9 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) jeend = &cjoin->entries[cjoin->entries_next]; for (je = cjoin->entries; je < jeend; je++) { - WT_ERR(__curjoin_endpoint_init_key(session, je, &je->ends[0])); - WT_ERR(__curjoin_endpoint_init_key(session, je, &je->ends[1])); + for (end = &je->ends[0]; end < &je->ends[je->ends_next]; + end++) + WT_ERR(__curjoin_endpoint_init_key(session, je, end)); /* * The first entry is iterated as the 'outermost' cursor. @@ -517,29 +513,29 @@ __curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, WT_ITEM *curkey, bool skip_left) { WT_COLLATOR *collator; + WT_CURSOR_JOIN_ENDPOINT *end, *endmax; WT_DECL_RET; int cmp; collator = (entry->index != NULL) ? entry->index->collator : NULL; - if (!skip_left && entry->ends[0].cursor != NULL) { - WT_ERR(__wt_compare(session, collator, curkey, - &entry->ends[0].key, &cmp)); - if (cmp < 0 || - (cmp == 0 && - !F_ISSET(&entry->ends[0], WT_CURJOIN_END_EQ)) || - (cmp > 0 && !F_ISSET(&entry->ends[0], WT_CURJOIN_END_GT))) - WT_ERR(WT_NOTFOUND); - } - if (entry->ends[1].cursor != NULL) { - WT_ERR(__wt_compare(session, collator, curkey, - &entry->ends[1].key, &cmp)); - if (cmp > 0 || - (cmp == 0 && - !F_ISSET(&entry->ends[1], WT_CURJOIN_END_EQ)) || - (cmp < 0 && !F_ISSET(&entry->ends[1], WT_CURJOIN_END_LT))) - WT_ERR(WT_NOTFOUND); + endmax = &entry->ends[entry->ends_next]; + for (end = &entry->ends[skip_left ? 1 : 0]; end < endmax; end++) { + WT_ERR(__wt_compare(session, collator, curkey, &end->key, + &cmp)); + if (!F_ISSET(end, WT_CURJOIN_END_LT)) { + if (cmp < 0 || + (cmp == 0 && + !F_ISSET(end, WT_CURJOIN_END_EQ)) || + (cmp > 0 && !F_ISSET(end, WT_CURJOIN_END_GT))) + WT_ERR(WT_NOTFOUND); + } else { + if (cmp > 0 || + (cmp == 0 && + !F_ISSET(end, WT_CURJOIN_END_EQ)) || + (cmp < 0 && !F_ISSET(end, WT_CURJOIN_END_LT))) + WT_ERR(WT_NOTFOUND); + } } - err: return (ret); } @@ -579,7 +575,7 @@ __curjoin_extract_insert(WT_CURSOR *cursor) { WT_ASSERT(session, ikey.size > 0); --ikey.size; - ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, 0); + ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false); if (ret == WT_NOTFOUND) ret = 0; else @@ -747,6 +743,7 @@ static int __curjoin_close(WT_CURSOR *cursor) { WT_CURSOR_JOIN *cjoin; + WT_CURSOR_JOIN_ENDPOINT *end; WT_CURSOR_JOIN_ENTRY *entry; WT_DECL_RET; WT_SESSION_IMPL *session; @@ -769,16 +766,14 @@ __curjoin_close(WT_CURSOR *cursor) entry++, i++) { if (entry->main != NULL) WT_TRET(entry->main->close(entry->main)); - if (entry->ends[0].cursor != NULL) - F_CLR(entry->ends[0].cursor, WT_CURSTD_JOINED); - if (entry->ends[1].cursor != NULL) - F_CLR(entry->ends[1].cursor, WT_CURSTD_JOINED); - if (F_ISSET(&entry->ends[0], WT_CURJOIN_END_OWNKEY)) - __wt_free(session, entry->ends[0].key.data); - if (F_ISSET(&entry->ends[1], WT_CURJOIN_END_OWNKEY)) - __wt_free(session, entry->ends[1].key.data); if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) WT_TRET(__wt_bloom_close(entry->bloom)); + for (end = &entry->ends[0]; + end < &entry->ends[entry->ends_next]; end++) { + F_CLR(end->cursor, WT_CURSTD_JOINED); + if (F_ISSET(end, WT_CURJOIN_END_OWN_KEY)) + __wt_free(session, end->key.data); + } } if (cjoin->iter != NULL) @@ -883,15 +878,17 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, { WT_CURSOR_JOIN_ENTRY *entry; WT_DECL_RET; - WT_CURSOR_JOIN_ENDPOINT *endpoint; + WT_CURSOR_JOIN_ENDPOINT *end, *newend; int nonbloom; size_t i; + ssize_t ins; const char *raw_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), "raw", NULL }; char *main_uri; size_t namesize, newsize; entry = NULL; + ins = -1; main_uri = NULL; nonbloom = -1; namesize = strlen(cjoin->table->name); @@ -945,16 +942,53 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, "values for the same index"); WT_ERR(EINVAL); } - /* Check flag combinations */ - if ((F_ISSET(&entry->ends[0], WT_CURJOIN_END_GT) && - (range & WT_CURJOIN_END_GT) != 0) || - (F_ISSET(&entry->ends[1], WT_CURJOIN_END_LT) && - (range & WT_CURJOIN_END_LT) != 0) || - ((F_ISSET(&entry->ends[0], WT_CURJOIN_END_EQ) || - F_ISSET(&entry->ends[1], WT_CURJOIN_END_EQ)) && - (range == WT_CURJOIN_END_EQ))) { - __wt_errx(session, "join has overlapping ranges"); - WT_ERR(EINVAL); + /* + * Check against other comparisons (we call them endpoints) + * already set up for this index. + * We allow either: + * - one or more "eq" (with disjunction) + * - exactly one "eq" (with conjunction) + * - exactly one of "gt" or "ge" (conjunction or disjunction) + * - exactly one of "lt" or "le" (conjunction or disjunction) + * - one of "gt"/"ge" along with one of "lt"/"le" + * (currently restricted to conjunction). + * + * Some other combinations, although expressible either do + * not make sense (X == 3 AND X == 5) or are reducible (X < + * 7 AND X < 9). Other specific cases of (X < 7 OR X > 15) + * or (X == 4 OR X > 15) make sense but we don't handle yet. + */ + for (i = 0; i < entry->ends_next; i++) { + end = &entry->ends[i]; + if ((F_ISSET(end, WT_CURJOIN_END_GT) && + (range & WT_CURJOIN_END_GE) != 0) || + (F_ISSET(end, WT_CURJOIN_END_LT) && + (range & WT_CURJOIN_END_LE) != 0) || + (end->flags == WT_CURJOIN_END_EQ && + (range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT)) + != 0)) { + __wt_errx(session, + "join has overlapping ranges"); + WT_ERR(EINVAL); + } + if (range == WT_CURJOIN_END_EQ && + end->flags == WT_CURJOIN_END_EQ && + !F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)) { + __wt_errx(session, + "compare=eq can only be combined " + "using operation=or"); + WT_ERR(EINVAL); + } + + /* + * Sort "gt"/"ge" to the front, followed by any number + * of "eq", and finally "lt"/"le". + */ + if (ins == -1 && + ((range & WT_CURJOIN_END_GT) != 0 || + (range == WT_CURJOIN_END_EQ && + !F_ISSET(end, WT_CURJOIN_END_GT)))) + ins = i; } /* All checks completed, merge any new configuration now */ entry->count = count; @@ -963,12 +997,17 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, entry->bloom_hash_count = WT_MAX(entry->bloom_hash_count, bloom_hash_count); } - if (range & WT_CURJOIN_END_LT) - endpoint = &entry->ends[1]; - else - endpoint = &entry->ends[0]; - endpoint->cursor = ref_cursor; - F_SET(endpoint, range); + WT_ERR(__wt_realloc_def(session, &entry->ends_allocated, + entry->ends_next + 1, &entry->ends)); + if (ins == -1) + ins = entry->ends_next; + newend = &entry->ends[ins]; + memmove(newend + 1, newend, + (entry->ends_next - ins) * sizeof(WT_CURSOR_JOIN_ENDPOINT)); + memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT)); + entry->ends_next++; + newend->cursor = ref_cursor; + F_SET(newend, range); /* Open the main file with a projection of the indexed columns. */ if (entry->main == NULL && entry->index != NULL) { diff --git a/src/include/cursor.h b/src/include/cursor.h index d0825d307c1..a65a95e91f5 100644 --- a/src/include/cursor.h +++ b/src/include/cursor.h @@ -281,7 +281,9 @@ struct __wt_cursor_join_endpoint { #define WT_CURJOIN_END_LT 0x01 /* include values < cursor */ #define WT_CURJOIN_END_EQ 0x02 /* include values == cursor */ #define WT_CURJOIN_END_GT 0x04 /* include values > cursor */ -#define WT_CURJOIN_END_OWNKEY 0x08 /* must free key's data */ +#define WT_CURJOIN_END_GE (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ) +#define WT_CURJOIN_END_LE (WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ) +#define WT_CURJOIN_END_OWN_KEY 0x08 /* must free key's data */ uint8_t flags; /* range for this endpoint */ }; @@ -294,10 +296,13 @@ struct __wt_cursor_join_entry { uint64_t count; /* approx number of matches */ #define WT_CURJOIN_ENTRY_BLOOM 0x01 /* use a bloom filter */ -#define WT_CURJOIN_ENTRY_OWN_BLOOM 0x02 /* this entry owns the bloom */ +#define WT_CURJOIN_ENTRY_DISJUNCTION 0x02 /* endpoints are or-ed */ +#define WT_CURJOIN_ENTRY_OWN_BLOOM 0x04 /* this entry owns the bloom */ uint8_t flags; - WT_CURSOR_JOIN_ENDPOINT ends[2]; /* reference endpoints */ + WT_CURSOR_JOIN_ENDPOINT *ends; /* reference endpoints */ + size_t ends_allocated; + size_t ends_next; }; struct __wt_cursor_join { -- cgit v1.2.1 From 1d26b39e4f65f5c2076cbdd9cfa21f237e0d0947 Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Thu, 12 Nov 2015 10:35:03 -0500 Subject: WT-1315. Fixed error in check for overlapping ranges. --- src/cursor/cur_join.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index a178c5aab74..7527a657db4 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -879,6 +879,7 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_CURSOR_JOIN_ENTRY *entry; WT_DECL_RET; WT_CURSOR_JOIN_ENDPOINT *end, *newend; + bool range_eq; int nonbloom; size_t i; ssize_t ins; @@ -960,10 +961,11 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, */ for (i = 0; i < entry->ends_next; i++) { end = &entry->ends[i]; + range_eq = (range == WT_CURJOIN_END_EQ); if ((F_ISSET(end, WT_CURJOIN_END_GT) && - (range & WT_CURJOIN_END_GE) != 0) || + ((range & WT_CURJOIN_END_GT) != 0 || range_eq)) || (F_ISSET(end, WT_CURJOIN_END_LT) && - (range & WT_CURJOIN_END_LE) != 0) || + ((range & WT_CURJOIN_END_LT) != 0 || range_eq)) || (end->flags == WT_CURJOIN_END_EQ && (range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT)) != 0)) { -- cgit v1.2.1 From 67f6acb52291598e4ecfd2e22fa97cad8433115f Mon Sep 17 00:00:00 2001 From: Susan LoVerso Date: Mon, 16 Nov 2015 13:12:25 -0500 Subject: WT-2218 Add truncate stats --- dist/stat_data.py | 2 + src/btree/bt_cursor.c | 1 + src/cursor/cur_table.c | 1 + src/include/stat.h | 2 + src/include/wiredtiger.in | 218 ++++++++++++++++++++++--------------------- src/schema/schema_truncate.c | 3 + src/session/session_api.c | 1 + src/support/stat.c | 7 ++ 8 files changed, 128 insertions(+), 107 deletions(-) diff --git a/dist/stat_data.py b/dist/stat_data.py index 76fdf185137..4b186625e7d 100644 --- a/dist/stat_data.py +++ b/dist/stat_data.py @@ -349,6 +349,7 @@ connection_stats = [ CursorStat('cursor_restart', 'cursor restarted searches'), CursorStat('cursor_search', 'cursor search calls'), CursorStat('cursor_search_near', 'cursor search near calls'), + CursorStat('cursor_truncate', 'truncate calls'), CursorStat('cursor_update', 'cursor update calls'), ########################################## @@ -390,6 +391,7 @@ dsrc_stats = [ CursorStat('cursor_restart', 'restarted searches'), CursorStat('cursor_search', 'search calls'), CursorStat('cursor_search_near', 'search near calls'), + CursorStat('cursor_truncate', 'truncate calls'), CursorStat('cursor_update', 'update calls'), CursorStat('cursor_update_bytes', 'cursor-update value bytes updated'), diff --git a/src/btree/bt_cursor.c b/src/btree/bt_cursor.c index 3290fd6374c..69512f45933 100644 --- a/src/btree/bt_cursor.c +++ b/src/btree/bt_cursor.c @@ -1093,6 +1093,7 @@ __wt_btcur_range_truncate(WT_CURSOR_BTREE *start, WT_CURSOR_BTREE *stop) cbt = (start != NULL) ? start : stop; session = (WT_SESSION_IMPL *)cbt->iface.session; btree = cbt->btree; + WT_STAT_FAST_DATA_INCR(session, cursor_truncate); /* * We always delete in a forward direction because it's faster, assert diff --git a/src/cursor/cur_table.c b/src/cursor/cur_table.c index c5a00649334..b78e12c2648 100644 --- a/src/cursor/cur_table.c +++ b/src/cursor/cur_table.c @@ -659,6 +659,7 @@ __wt_table_range_truncate(WT_CURSOR_TABLE *start, WT_CURSOR_TABLE *stop) /* Open any indices. */ WT_RET(__curtable_open_indices(ctable)); WT_RET(__wt_scr_alloc(session, 128, &key)); + WT_STAT_FAST_DATA_INCR(session, cursor_truncate); /* * Step through the cursor range, removing the index entries. diff --git a/src/include/stat.h b/src/include/stat.h index 1ebe253e5db..0a79a6f6140 100644 --- a/src/include/stat.h +++ b/src/include/stat.h @@ -297,6 +297,7 @@ struct __wt_connection_stats { int64_t cursor_restart; int64_t cursor_search; int64_t cursor_search_near; + int64_t cursor_truncate; int64_t cursor_update; int64_t dh_conn_handle_count; int64_t dh_session_handles; @@ -461,6 +462,7 @@ struct __wt_dsrc_stats { int64_t cursor_restart; int64_t cursor_search; int64_t cursor_search_near; + int64_t cursor_truncate; int64_t cursor_update; int64_t cursor_update_bytes; int64_t lsm_checkpoint_throttle; diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in index c50f9dbe117..ad716fa5e6a 100644 --- a/src/include/wiredtiger.in +++ b/src/include/wiredtiger.in @@ -3763,174 +3763,176 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection); #define WT_STAT_CONN_CURSOR_SEARCH 1068 /*! cursor: cursor search near calls */ #define WT_STAT_CONN_CURSOR_SEARCH_NEAR 1069 +/*! cursor: truncate calls */ +#define WT_STAT_CONN_CURSOR_TRUNCATE 1070 /*! cursor: cursor update calls */ -#define WT_STAT_CONN_CURSOR_UPDATE 1070 +#define WT_STAT_CONN_CURSOR_UPDATE 1071 /*! data-handle: connection data handles currently active */ -#define WT_STAT_CONN_DH_CONN_HANDLE_COUNT 1071 +#define WT_STAT_CONN_DH_CONN_HANDLE_COUNT 1072 /*! data-handle: session dhandles swept */ -#define WT_STAT_CONN_DH_SESSION_HANDLES 1072 +#define WT_STAT_CONN_DH_SESSION_HANDLES 1073 /*! data-handle: session sweep attempts */ -#define WT_STAT_CONN_DH_SESSION_SWEEPS 1073 +#define WT_STAT_CONN_DH_SESSION_SWEEPS 1074 /*! data-handle: connection sweep dhandles closed */ -#define WT_STAT_CONN_DH_SWEEP_CLOSE 1074 +#define WT_STAT_CONN_DH_SWEEP_CLOSE 1075 /*! data-handle: connection sweep candidate became referenced */ -#define WT_STAT_CONN_DH_SWEEP_REF 1075 +#define WT_STAT_CONN_DH_SWEEP_REF 1076 /*! data-handle: connection sweep dhandles removed from hash list */ -#define WT_STAT_CONN_DH_SWEEP_REMOVE 1076 +#define WT_STAT_CONN_DH_SWEEP_REMOVE 1077 /*! data-handle: connection sweep time-of-death sets */ -#define WT_STAT_CONN_DH_SWEEP_TOD 1077 +#define WT_STAT_CONN_DH_SWEEP_TOD 1078 /*! data-handle: connection sweeps */ -#define WT_STAT_CONN_DH_SWEEPS 1078 +#define WT_STAT_CONN_DH_SWEEPS 1079 /*! connection: files currently open */ -#define WT_STAT_CONN_FILE_OPEN 1079 +#define WT_STAT_CONN_FILE_OPEN 1080 /*! log: total log buffer size */ -#define WT_STAT_CONN_LOG_BUFFER_SIZE 1080 +#define WT_STAT_CONN_LOG_BUFFER_SIZE 1081 /*! log: log bytes of payload data */ -#define WT_STAT_CONN_LOG_BYTES_PAYLOAD 1081 +#define WT_STAT_CONN_LOG_BYTES_PAYLOAD 1082 /*! log: log bytes written */ -#define WT_STAT_CONN_LOG_BYTES_WRITTEN 1082 +#define WT_STAT_CONN_LOG_BYTES_WRITTEN 1083 /*! log: yields waiting for previous log file close */ -#define WT_STAT_CONN_LOG_CLOSE_YIELDS 1083 +#define WT_STAT_CONN_LOG_CLOSE_YIELDS 1084 /*! log: total size of compressed records */ -#define WT_STAT_CONN_LOG_COMPRESS_LEN 1084 +#define WT_STAT_CONN_LOG_COMPRESS_LEN 1085 /*! log: total in-memory size of compressed records */ -#define WT_STAT_CONN_LOG_COMPRESS_MEM 1085 +#define WT_STAT_CONN_LOG_COMPRESS_MEM 1086 /*! log: log records too small to compress */ -#define WT_STAT_CONN_LOG_COMPRESS_SMALL 1086 +#define WT_STAT_CONN_LOG_COMPRESS_SMALL 1087 /*! log: log records not compressed */ -#define WT_STAT_CONN_LOG_COMPRESS_WRITE_FAILS 1087 +#define WT_STAT_CONN_LOG_COMPRESS_WRITE_FAILS 1088 /*! log: log records compressed */ -#define WT_STAT_CONN_LOG_COMPRESS_WRITES 1088 +#define WT_STAT_CONN_LOG_COMPRESS_WRITES 1089 /*! log: log flush operations */ -#define WT_STAT_CONN_LOG_FLUSH 1089 +#define WT_STAT_CONN_LOG_FLUSH 1090 /*! log: maximum log file size */ -#define WT_STAT_CONN_LOG_MAX_FILESIZE 1090 +#define WT_STAT_CONN_LOG_MAX_FILESIZE 1091 /*! log: pre-allocated log files prepared */ -#define WT_STAT_CONN_LOG_PREALLOC_FILES 1091 +#define WT_STAT_CONN_LOG_PREALLOC_FILES 1092 /*! log: number of pre-allocated log files to create */ -#define WT_STAT_CONN_LOG_PREALLOC_MAX 1092 +#define WT_STAT_CONN_LOG_PREALLOC_MAX 1093 /*! log: pre-allocated log files not ready and missed */ -#define WT_STAT_CONN_LOG_PREALLOC_MISSED 1093 +#define WT_STAT_CONN_LOG_PREALLOC_MISSED 1094 /*! log: pre-allocated log files used */ -#define WT_STAT_CONN_LOG_PREALLOC_USED 1094 +#define WT_STAT_CONN_LOG_PREALLOC_USED 1095 /*! log: log release advances write LSN */ -#define WT_STAT_CONN_LOG_RELEASE_WRITE_LSN 1095 +#define WT_STAT_CONN_LOG_RELEASE_WRITE_LSN 1096 /*! log: records processed by log scan */ -#define WT_STAT_CONN_LOG_SCAN_RECORDS 1096 +#define WT_STAT_CONN_LOG_SCAN_RECORDS 1097 /*! log: log scan records requiring two reads */ -#define WT_STAT_CONN_LOG_SCAN_REREADS 1097 +#define WT_STAT_CONN_LOG_SCAN_REREADS 1098 /*! log: log scan operations */ -#define WT_STAT_CONN_LOG_SCANS 1098 +#define WT_STAT_CONN_LOG_SCANS 1099 /*! log: consolidated slot closures */ -#define WT_STAT_CONN_LOG_SLOT_CLOSES 1099 +#define WT_STAT_CONN_LOG_SLOT_CLOSES 1100 /*! log: written slots coalesced */ -#define WT_STAT_CONN_LOG_SLOT_COALESCED 1100 +#define WT_STAT_CONN_LOG_SLOT_COALESCED 1101 /*! log: logging bytes consolidated */ -#define WT_STAT_CONN_LOG_SLOT_CONSOLIDATED 1101 +#define WT_STAT_CONN_LOG_SLOT_CONSOLIDATED 1102 /*! log: consolidated slot joins */ -#define WT_STAT_CONN_LOG_SLOT_JOINS 1102 +#define WT_STAT_CONN_LOG_SLOT_JOINS 1103 /*! log: consolidated slot join races */ -#define WT_STAT_CONN_LOG_SLOT_RACES 1103 +#define WT_STAT_CONN_LOG_SLOT_RACES 1104 /*! log: busy returns attempting to switch slots */ -#define WT_STAT_CONN_LOG_SLOT_SWITCH_BUSY 1104 +#define WT_STAT_CONN_LOG_SLOT_SWITCH_BUSY 1105 /*! log: consolidated slot join transitions */ -#define WT_STAT_CONN_LOG_SLOT_TRANSITIONS 1105 +#define WT_STAT_CONN_LOG_SLOT_TRANSITIONS 1106 /*! log: consolidated slot unbuffered writes */ -#define WT_STAT_CONN_LOG_SLOT_UNBUFFERED 1106 +#define WT_STAT_CONN_LOG_SLOT_UNBUFFERED 1107 /*! log: log sync operations */ -#define WT_STAT_CONN_LOG_SYNC 1107 +#define WT_STAT_CONN_LOG_SYNC 1108 /*! log: log sync_dir operations */ -#define WT_STAT_CONN_LOG_SYNC_DIR 1108 +#define WT_STAT_CONN_LOG_SYNC_DIR 1109 /*! log: log server thread advances write LSN */ -#define WT_STAT_CONN_LOG_WRITE_LSN 1109 +#define WT_STAT_CONN_LOG_WRITE_LSN 1110 /*! log: log write operations */ -#define WT_STAT_CONN_LOG_WRITES 1110 +#define WT_STAT_CONN_LOG_WRITES 1111 /*! log: log files manually zero-filled */ -#define WT_STAT_CONN_LOG_ZERO_FILLS 1111 +#define WT_STAT_CONN_LOG_ZERO_FILLS 1112 /*! LSM: sleep for LSM checkpoint throttle */ -#define WT_STAT_CONN_LSM_CHECKPOINT_THROTTLE 1112 +#define WT_STAT_CONN_LSM_CHECKPOINT_THROTTLE 1113 /*! LSM: sleep for LSM merge throttle */ -#define WT_STAT_CONN_LSM_MERGE_THROTTLE 1113 +#define WT_STAT_CONN_LSM_MERGE_THROTTLE 1114 /*! LSM: rows merged in an LSM tree */ -#define WT_STAT_CONN_LSM_ROWS_MERGED 1114 +#define WT_STAT_CONN_LSM_ROWS_MERGED 1115 /*! LSM: application work units currently queued */ -#define WT_STAT_CONN_LSM_WORK_QUEUE_APP 1115 +#define WT_STAT_CONN_LSM_WORK_QUEUE_APP 1116 /*! LSM: merge work units currently queued */ -#define WT_STAT_CONN_LSM_WORK_QUEUE_MANAGER 1116 +#define WT_STAT_CONN_LSM_WORK_QUEUE_MANAGER 1117 /*! LSM: tree queue hit maximum */ -#define WT_STAT_CONN_LSM_WORK_QUEUE_MAX 1117 +#define WT_STAT_CONN_LSM_WORK_QUEUE_MAX 1118 /*! LSM: switch work units currently queued */ -#define WT_STAT_CONN_LSM_WORK_QUEUE_SWITCH 1118 +#define WT_STAT_CONN_LSM_WORK_QUEUE_SWITCH 1119 /*! LSM: tree maintenance operations scheduled */ -#define WT_STAT_CONN_LSM_WORK_UNITS_CREATED 1119 +#define WT_STAT_CONN_LSM_WORK_UNITS_CREATED 1120 /*! LSM: tree maintenance operations discarded */ -#define WT_STAT_CONN_LSM_WORK_UNITS_DISCARDED 1120 +#define WT_STAT_CONN_LSM_WORK_UNITS_DISCARDED 1121 /*! LSM: tree maintenance operations executed */ -#define WT_STAT_CONN_LSM_WORK_UNITS_DONE 1121 +#define WT_STAT_CONN_LSM_WORK_UNITS_DONE 1122 /*! connection: memory allocations */ -#define WT_STAT_CONN_MEMORY_ALLOCATION 1122 +#define WT_STAT_CONN_MEMORY_ALLOCATION 1123 /*! connection: memory frees */ -#define WT_STAT_CONN_MEMORY_FREE 1123 +#define WT_STAT_CONN_MEMORY_FREE 1124 /*! connection: memory re-allocations */ -#define WT_STAT_CONN_MEMORY_GROW 1124 +#define WT_STAT_CONN_MEMORY_GROW 1125 /*! thread-yield: page acquire busy blocked */ -#define WT_STAT_CONN_PAGE_BUSY_BLOCKED 1125 +#define WT_STAT_CONN_PAGE_BUSY_BLOCKED 1126 /*! thread-yield: page acquire eviction blocked */ -#define WT_STAT_CONN_PAGE_FORCIBLE_EVICT_BLOCKED 1126 +#define WT_STAT_CONN_PAGE_FORCIBLE_EVICT_BLOCKED 1127 /*! thread-yield: page acquire locked blocked */ -#define WT_STAT_CONN_PAGE_LOCKED_BLOCKED 1127 +#define WT_STAT_CONN_PAGE_LOCKED_BLOCKED 1128 /*! thread-yield: page acquire read blocked */ -#define WT_STAT_CONN_PAGE_READ_BLOCKED 1128 +#define WT_STAT_CONN_PAGE_READ_BLOCKED 1129 /*! thread-yield: page acquire time sleeping (usecs) */ -#define WT_STAT_CONN_PAGE_SLEEP 1129 +#define WT_STAT_CONN_PAGE_SLEEP 1130 /*! connection: total read I/Os */ -#define WT_STAT_CONN_READ_IO 1130 +#define WT_STAT_CONN_READ_IO 1131 /*! reconciliation: page reconciliation calls */ -#define WT_STAT_CONN_REC_PAGES 1131 +#define WT_STAT_CONN_REC_PAGES 1132 /*! reconciliation: page reconciliation calls for eviction */ -#define WT_STAT_CONN_REC_PAGES_EVICTION 1132 +#define WT_STAT_CONN_REC_PAGES_EVICTION 1133 /*! reconciliation: split bytes currently awaiting free */ -#define WT_STAT_CONN_REC_SPLIT_STASHED_BYTES 1133 +#define WT_STAT_CONN_REC_SPLIT_STASHED_BYTES 1134 /*! reconciliation: split objects currently awaiting free */ -#define WT_STAT_CONN_REC_SPLIT_STASHED_OBJECTS 1134 +#define WT_STAT_CONN_REC_SPLIT_STASHED_OBJECTS 1135 /*! connection: pthread mutex shared lock read-lock calls */ -#define WT_STAT_CONN_RWLOCK_READ 1135 +#define WT_STAT_CONN_RWLOCK_READ 1136 /*! connection: pthread mutex shared lock write-lock calls */ -#define WT_STAT_CONN_RWLOCK_WRITE 1136 +#define WT_STAT_CONN_RWLOCK_WRITE 1137 /*! session: open cursor count */ -#define WT_STAT_CONN_SESSION_CURSOR_OPEN 1137 +#define WT_STAT_CONN_SESSION_CURSOR_OPEN 1138 /*! session: open session count */ -#define WT_STAT_CONN_SESSION_OPEN 1138 +#define WT_STAT_CONN_SESSION_OPEN 1139 /*! transaction: transaction begins */ -#define WT_STAT_CONN_TXN_BEGIN 1139 +#define WT_STAT_CONN_TXN_BEGIN 1140 /*! transaction: transaction checkpoints */ -#define WT_STAT_CONN_TXN_CHECKPOINT 1140 +#define WT_STAT_CONN_TXN_CHECKPOINT 1141 /*! transaction: transaction checkpoint generation */ -#define WT_STAT_CONN_TXN_CHECKPOINT_GENERATION 1141 +#define WT_STAT_CONN_TXN_CHECKPOINT_GENERATION 1142 /*! transaction: transaction checkpoint currently running */ -#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1142 +#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1143 /*! transaction: transaction checkpoint max time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1143 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1144 /*! transaction: transaction checkpoint min time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1144 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1145 /*! transaction: transaction checkpoint most recent time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1145 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1146 /*! transaction: transaction checkpoint total time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1146 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1147 /*! transaction: transactions committed */ -#define WT_STAT_CONN_TXN_COMMIT 1147 +#define WT_STAT_CONN_TXN_COMMIT 1148 /*! transaction: transaction failures due to cache overflow */ -#define WT_STAT_CONN_TXN_FAIL_CACHE 1148 +#define WT_STAT_CONN_TXN_FAIL_CACHE 1149 /*! transaction: transaction range of IDs currently pinned by a checkpoint */ -#define WT_STAT_CONN_TXN_PINNED_CHECKPOINT_RANGE 1149 +#define WT_STAT_CONN_TXN_PINNED_CHECKPOINT_RANGE 1150 /*! transaction: transaction range of IDs currently pinned */ -#define WT_STAT_CONN_TXN_PINNED_RANGE 1150 +#define WT_STAT_CONN_TXN_PINNED_RANGE 1151 /*! transaction: transactions rolled back */ -#define WT_STAT_CONN_TXN_ROLLBACK 1151 +#define WT_STAT_CONN_TXN_ROLLBACK 1152 /*! transaction: transaction sync calls */ -#define WT_STAT_CONN_TXN_SYNC 1152 +#define WT_STAT_CONN_TXN_SYNC 1153 /*! connection: total write I/Os */ -#define WT_STAT_CONN_WRITE_IO 1153 +#define WT_STAT_CONN_WRITE_IO 1154 /*! * @} @@ -4084,54 +4086,56 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection); #define WT_STAT_DSRC_CURSOR_SEARCH 2071 /*! cursor: search near calls */ #define WT_STAT_DSRC_CURSOR_SEARCH_NEAR 2072 +/*! cursor: truncate calls */ +#define WT_STAT_DSRC_CURSOR_TRUNCATE 2073 /*! cursor: update calls */ -#define WT_STAT_DSRC_CURSOR_UPDATE 2073 +#define WT_STAT_DSRC_CURSOR_UPDATE 2074 /*! cursor: cursor-update value bytes updated */ -#define WT_STAT_DSRC_CURSOR_UPDATE_BYTES 2074 +#define WT_STAT_DSRC_CURSOR_UPDATE_BYTES 2075 /*! LSM: sleep for LSM checkpoint throttle */ -#define WT_STAT_DSRC_LSM_CHECKPOINT_THROTTLE 2075 +#define WT_STAT_DSRC_LSM_CHECKPOINT_THROTTLE 2076 /*! LSM: chunks in the LSM tree */ -#define WT_STAT_DSRC_LSM_CHUNK_COUNT 2076 +#define WT_STAT_DSRC_LSM_CHUNK_COUNT 2077 /*! LSM: highest merge generation in the LSM tree */ -#define WT_STAT_DSRC_LSM_GENERATION_MAX 2077 +#define WT_STAT_DSRC_LSM_GENERATION_MAX 2078 /*! LSM: queries that could have benefited from a Bloom filter that did * not exist */ -#define WT_STAT_DSRC_LSM_LOOKUP_NO_BLOOM 2078 +#define WT_STAT_DSRC_LSM_LOOKUP_NO_BLOOM 2079 /*! LSM: sleep for LSM merge throttle */ -#define WT_STAT_DSRC_LSM_MERGE_THROTTLE 2079 +#define WT_STAT_DSRC_LSM_MERGE_THROTTLE 2080 /*! reconciliation: dictionary matches */ -#define WT_STAT_DSRC_REC_DICTIONARY 2080 +#define WT_STAT_DSRC_REC_DICTIONARY 2081 /*! reconciliation: internal page multi-block writes */ -#define WT_STAT_DSRC_REC_MULTIBLOCK_INTERNAL 2081 +#define WT_STAT_DSRC_REC_MULTIBLOCK_INTERNAL 2082 /*! reconciliation: leaf page multi-block writes */ -#define WT_STAT_DSRC_REC_MULTIBLOCK_LEAF 2082 +#define WT_STAT_DSRC_REC_MULTIBLOCK_LEAF 2083 /*! reconciliation: maximum blocks required for a page */ -#define WT_STAT_DSRC_REC_MULTIBLOCK_MAX 2083 +#define WT_STAT_DSRC_REC_MULTIBLOCK_MAX 2084 /*! reconciliation: internal-page overflow keys */ -#define WT_STAT_DSRC_REC_OVERFLOW_KEY_INTERNAL 2084 +#define WT_STAT_DSRC_REC_OVERFLOW_KEY_INTERNAL 2085 /*! reconciliation: leaf-page overflow keys */ -#define WT_STAT_DSRC_REC_OVERFLOW_KEY_LEAF 2085 +#define WT_STAT_DSRC_REC_OVERFLOW_KEY_LEAF 2086 /*! reconciliation: overflow values written */ -#define WT_STAT_DSRC_REC_OVERFLOW_VALUE 2086 +#define WT_STAT_DSRC_REC_OVERFLOW_VALUE 2087 /*! reconciliation: pages deleted */ -#define WT_STAT_DSRC_REC_PAGE_DELETE 2087 +#define WT_STAT_DSRC_REC_PAGE_DELETE 2088 /*! reconciliation: page checksum matches */ -#define WT_STAT_DSRC_REC_PAGE_MATCH 2088 +#define WT_STAT_DSRC_REC_PAGE_MATCH 2089 /*! reconciliation: page reconciliation calls */ -#define WT_STAT_DSRC_REC_PAGES 2089 +#define WT_STAT_DSRC_REC_PAGES 2090 /*! reconciliation: page reconciliation calls for eviction */ -#define WT_STAT_DSRC_REC_PAGES_EVICTION 2090 +#define WT_STAT_DSRC_REC_PAGES_EVICTION 2091 /*! reconciliation: leaf page key bytes discarded using prefix compression */ -#define WT_STAT_DSRC_REC_PREFIX_COMPRESSION 2091 +#define WT_STAT_DSRC_REC_PREFIX_COMPRESSION 2092 /*! reconciliation: internal page key bytes discarded using suffix * compression */ -#define WT_STAT_DSRC_REC_SUFFIX_COMPRESSION 2092 +#define WT_STAT_DSRC_REC_SUFFIX_COMPRESSION 2093 /*! session: object compaction */ -#define WT_STAT_DSRC_SESSION_COMPACT 2093 +#define WT_STAT_DSRC_SESSION_COMPACT 2094 /*! session: open cursor count */ -#define WT_STAT_DSRC_SESSION_CURSOR_OPEN 2094 +#define WT_STAT_DSRC_SESSION_CURSOR_OPEN 2095 /*! transaction: update conflicts */ -#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2095 +#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2096 /*! @} */ /* * Statistics section: END diff --git a/src/schema/schema_truncate.c b/src/schema/schema_truncate.c index 03a991a9aba..c39bba4753c 100644 --- a/src/schema/schema_truncate.c +++ b/src/schema/schema_truncate.c @@ -26,6 +26,7 @@ __truncate_file(WT_SESSION_IMPL *session, const char *uri) /* Open and lock the file. */ WT_RET(__wt_session_get_btree( session, uri, NULL, NULL, WT_DHANDLE_EXCLUSIVE)); + WT_STAT_FAST_DATA_INCR(session, cursor_truncate); /* Get the allocation size. */ allocsize = S2BT(session)->allocsize; @@ -56,6 +57,7 @@ __truncate_table(WT_SESSION_IMPL *session, const char *uri, const char *cfg[]) u_int i; WT_RET(__wt_schema_get_table(session, uri, strlen(uri), false, &table)); + WT_STAT_FAST_DATA_INCR(session, cursor_truncate); /* Truncate the column groups. */ for (i = 0; i < WT_COLGROUPS(table); i++) @@ -90,6 +92,7 @@ __truncate_dsrc(WT_SESSION_IMPL *session, const char *uri) while ((ret = cursor->next(cursor)) == 0) WT_ERR(cursor->remove(cursor)); WT_ERR_NOTFOUND_OK(ret); + WT_STAT_FAST_DATA_INCR(session, cursor_truncate); err: WT_TRET(cursor->close(cursor)); return (ret); diff --git a/src/session/session_api.c b/src/session/session_api.c index ed0e016dcb2..f1599320675 100644 --- a/src/session/session_api.c +++ b/src/session/session_api.c @@ -657,6 +657,7 @@ __session_truncate(WT_SESSION *wt_session, session = (WT_SESSION_IMPL *)wt_session; SESSION_TXN_API_CALL(session, truncate, config, cfg); + WT_STAT_FAST_CONN_INCR(session, cursor_truncate); /* * If the URI is specified, we don't need a start/stop, if start/stop diff --git a/src/support/stat.c b/src/support/stat.c index 9e817fad512..6c18621c42d 100644 --- a/src/support/stat.c +++ b/src/support/stat.c @@ -76,6 +76,7 @@ static const char * const __stats_dsrc_desc[] = { "cursor: restarted searches", "cursor: search calls", "cursor: search near calls", + "cursor: truncate calls", "cursor: update calls", "cursor: cursor-update value bytes updated", "LSM: sleep for LSM checkpoint throttle", @@ -194,6 +195,7 @@ __wt_stat_dsrc_clear_single(WT_DSRC_STATS *stats) stats->cursor_restart = 0; stats->cursor_search = 0; stats->cursor_search_near = 0; + stats->cursor_truncate = 0; stats->cursor_update = 0; stats->bloom_false_positive = 0; stats->bloom_hit = 0; @@ -311,6 +313,7 @@ __wt_stat_dsrc_aggregate_single( to->cursor_restart += from->cursor_restart; to->cursor_search += from->cursor_search; to->cursor_search_near += from->cursor_search_near; + to->cursor_truncate += from->cursor_truncate; to->cursor_update += from->cursor_update; to->bloom_false_positive += from->bloom_false_positive; to->bloom_hit += from->bloom_hit; @@ -442,6 +445,7 @@ __wt_stat_dsrc_aggregate( to->cursor_restart += WT_STAT_READ(from, cursor_restart); to->cursor_search += WT_STAT_READ(from, cursor_search); to->cursor_search_near += WT_STAT_READ(from, cursor_search_near); + to->cursor_truncate += WT_STAT_READ(from, cursor_truncate); to->cursor_update += WT_STAT_READ(from, cursor_update); to->bloom_false_positive += WT_STAT_READ(from, bloom_false_positive); to->bloom_hit += WT_STAT_READ(from, bloom_hit); @@ -554,6 +558,7 @@ static const char * const __stats_connection_desc[] = { "cursor: cursor restarted searches", "cursor: cursor search calls", "cursor: cursor search near calls", + "cursor: truncate calls", "cursor: cursor update calls", "data-handle: connection data handles currently active", "data-handle: session dhandles swept", @@ -745,6 +750,7 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats) stats->cursor_search = 0; stats->cursor_search_near = 0; stats->cursor_update = 0; + stats->cursor_truncate = 0; /* not clearing dh_conn_handle_count */ stats->dh_sweep_ref = 0; stats->dh_sweep_close = 0; @@ -930,6 +936,7 @@ __wt_stat_connection_aggregate( to->cursor_search += WT_STAT_READ(from, cursor_search); to->cursor_search_near += WT_STAT_READ(from, cursor_search_near); to->cursor_update += WT_STAT_READ(from, cursor_update); + to->cursor_truncate += WT_STAT_READ(from, cursor_truncate); to->dh_conn_handle_count += WT_STAT_READ(from, dh_conn_handle_count); to->dh_sweep_ref += WT_STAT_READ(from, dh_sweep_ref); to->dh_sweep_close += WT_STAT_READ(from, dh_sweep_close); -- cgit v1.2.1 From a22cde620d799e8019a72c427ba5b2b8e2e39489 Mon Sep 17 00:00:00 2001 From: Don Anderson Date: Tue, 17 Nov 2015 10:34:11 -0500 Subject: WT-1315. Support statistics on join cursors. Statistics for these are a bit different than for a data source or a connection: 1) The statistics are simple (currently only 3), but range over sets of indices, so traversing the stats is a two level operation. Added a private callback API to advance to the next index when the stats have been exhausted. 2) cursors are used as single threaded operations, so arrays of stats is to avoid thread conflicts is not needed. Minor change: the stats_desc callback changed to return an int, and added an arg to provide some context. In the join cursor case, the returned desc is not a static string, it needs to be created and managed. --- dist/stat.py | 44 +++++++++----- dist/stat_data.py | 15 +++++ src/conn/conn_stat.c | 17 +++--- src/cursor/cur_join.c | 19 +++++- src/cursor/cur_stat.c | 146 +++++++++++++++++++++++++++++++++++++++------- src/include/cursor.h | 18 +++++- src/include/extern.h | 13 +++-- src/include/stat.h | 10 ++++ src/include/wiredtiger.in | 13 +++++ src/include/wt_internal.h | 4 ++ src/lsm/lsm_stat.c | 6 +- src/schema/schema_stat.c | 8 +-- src/session/session_api.c | 38 ++++++++---- src/support/stat.c | 62 ++++++++++++++++++-- test/suite/test_join01.py | 68 ++++++++++++++++++++- 15 files changed, 405 insertions(+), 76 deletions(-) diff --git a/dist/stat.py b/dist/stat.py index c9684665a53..d62fda3fcb9 100644 --- a/dist/stat.py +++ b/dist/stat.py @@ -5,7 +5,7 @@ import re, string, sys, textwrap from dist import compare_srcfile # Read the source files. -from stat_data import groups, dsrc_stats, connection_stats +from stat_data import groups, dsrc_stats, connection_stats, join_stats def print_struct(title, name, base, stats): '''Print the structures for the stat.h file.''' @@ -35,9 +35,17 @@ for line in open('../src/include/stat.h', 'r'): print_struct( 'connections', 'connection', 1000, connection_stats) print_struct('data sources', 'dsrc', 2000, dsrc_stats) + print_struct('join cursors', 'join', 3000, join_stats) f.close() compare_srcfile(tmp_file, '../src/include/stat.h') +def print_defines_one(capname, base, stats): + for v, l in enumerate(stats, base): + f.write('/*! %s */\n' % '\n * '.join(textwrap.wrap(l.desc, 70))) + f.write('#define\tWT_STAT_' + capname + '_' + l.name.upper() + "\t" * + max(1, 6 - int((len('WT_STAT_' + capname + '_' + l.name)) / 8)) + + str(v) + '\n') + def print_defines(): '''Print the #defines for the wiredtiger.in file.''' f.write(''' @@ -51,11 +59,7 @@ def print_defines(): * @{ */ ''') - for v, l in enumerate(connection_stats, 1000): - f.write('/*! %s */\n' % '\n * '.join(textwrap.wrap(l.desc, 70))) - f.write('#define\tWT_STAT_CONN_' + l.name.upper() + "\t" * - max(1, 6 - int((len('WT_STAT_CONN_' + l.name)) / 8)) + - str(v) + '\n') + print_defines_one('CONN', 1000, connection_stats) f.write(''' /*! * @} @@ -64,11 +68,16 @@ def print_defines(): * @{ */ ''') - for v, l in enumerate(dsrc_stats, 2000): - f.write('/*! %s */\n' % '\n * '.join(textwrap.wrap(l.desc, 70))) - f.write('#define\tWT_STAT_DSRC_' + l.name.upper() + "\t" * - max(1, 6 - int((len('WT_STAT_DSRC_' + l.name)) / 8)) + - str(v) + '\n') + print_defines_one('DSRC', 2000, dsrc_stats) + f.write(''' +/*! + * @} + * @name Statistics for join cursors + * @anchor statistics_join + * @{ + */ +''') + print_defines_one('JOIN', 3000, join_stats) f.write('/*! @} */\n') # Update the #defines in the wiredtiger.in file. @@ -98,10 +107,12 @@ def print_func(name, handle, list): f.write('};\n') f.write(''' -const char * -__wt_stat_''' + name + '''_desc(int slot) +int +__wt_stat_''' + name + '''_desc(WT_CURSOR_STAT *cst, int slot, const char **p) { -\treturn (__stats_''' + name + '''_desc[slot]); +\tWT_UNUSED(cst); +\t*p = __stats_''' + name + '''_desc[slot]; +\treturn (0); } ''') @@ -113,7 +124,8 @@ __wt_stat_''' + name + '_init_single(WT_' + name.upper() + '''_STATS *stats) } ''') - f.write(''' + if handle != None: + f.write(''' void __wt_stat_''' + name + '_init(' + handle + ''' *handle) { @@ -205,6 +217,7 @@ f.write('#include "wt_internal.h"\n') print_func('dsrc', 'WT_DATA_HANDLE', dsrc_stats) print_func('connection', 'WT_CONNECTION_IMPL', connection_stats) +print_func('join', None, join_stats) f.close() compare_srcfile(tmp_file, '../src/support/stat.c') @@ -224,6 +237,7 @@ for l in sorted(dsrc_stats): scale_info += ' \'' + l.desc + '\',\n' if 'no_clear' in l.flags: clear_info += ' \'' + l.desc + '\',\n' +# No join statistics can be captured in wtstats scale_info += ']\n' clear_info += ']\n' prefix_info = 'prefix_list = [\n' diff --git a/dist/stat_data.py b/dist/stat_data.py index 76fdf185137..e3abec0f204 100644 --- a/dist/stat_data.py +++ b/dist/stat_data.py @@ -67,6 +67,10 @@ class DhandleStat(Stat): prefix = 'data-handle' def __init__(self, name, desc, flags=''): Stat.__init__(self, name, DhandleStat.prefix, desc, flags) +class JoinStat(Stat): + prefix = '' # prefix is inserted dynamically + def __init__(self, name, desc, flags=''): + Stat.__init__(self, name, JoinStat.prefix, desc, flags) class LogStat(Stat): prefix = 'log' def __init__(self, name, desc, flags=''): @@ -533,3 +537,14 @@ dsrc_stats = [ ] dsrc_stats = sorted(dsrc_stats, key=attrgetter('name')) + +########################################## +# Cursor Join statistics +########################################## +join_stats = [ + JoinStat('accesses', 'accesses'), + JoinStat('actual_count', 'actual count of items'), + JoinStat('bloom_false_positive', 'bloom filter false positives'), +] + +join_stats = sorted(join_stats, key=attrgetter('name')) diff --git a/src/conn/conn_stat.c b/src/conn/conn_stat.c index ec3a630581a..f08c2b7996c 100644 --- a/src/conn/conn_stat.c +++ b/src/conn/conn_stat.c @@ -154,7 +154,7 @@ __statlog_dump(WT_SESSION_IMPL *session, const char *name, bool conn_stats) WT_DECL_RET; int64_t *stats; int i; - const char *uri; + const char *desc, *uri; const char *cfg[] = { WT_CONFIG_BASE(session, WT_SESSION_open_cursor), NULL }; @@ -175,16 +175,19 @@ __statlog_dump(WT_SESSION_IMPL *session, const char *name, bool conn_stats) * If we don't find an underlying object, silently ignore it, the object * may exist only intermittently. */ - switch (ret = __wt_curstat_open(session, uri, cfg, &cursor)) { + switch (ret = __wt_curstat_open(session, uri, NULL, cfg, &cursor)) { case 0: cst = (WT_CURSOR_STAT *)cursor; - for (stats = cst->stats, i = 0; i < cst->stats_count; ++i) + for (stats = cst->stats, i = 0; i < cst->stats_count; ++i) { + if (conn_stats) + WT_ERR(__wt_stat_connection_desc(cst, i, + &desc)); + else + WT_ERR(__wt_stat_dsrc_desc(cst, i, &desc)); WT_ERR(__wt_fprintf(conn->stat_fp, "%s %" PRId64 " %s %s\n", - conn->stat_stamp, stats[i], - name, conn_stats ? - __wt_stat_connection_desc(i) : - __wt_stat_dsrc_desc(i))); + conn->stat_stamp, stats[i], name, desc)); + } WT_ERR(cursor->close(cursor)); break; case EBUSY: diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index 7527a657db4..4913a06764a 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -98,6 +98,7 @@ __curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey, WT_CURSOR *firstcg_cur; WT_CURSOR_JOIN *cjoin; WT_DECL_RET; + WT_SESSION_IMPL *session; uint64_t r; if (iter->advance) @@ -105,6 +106,7 @@ __curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey, else iter->advance = true; + session = iter->session; cjoin = iter->cjoin; /* @@ -117,7 +119,7 @@ __curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey, firstcg_cur = ((WT_CURSOR_TABLE *)iter->cursor)->cg_cursors[0]; if (WT_CURSOR_RECNO(&cjoin->iface)) { r = *(uint64_t *)firstcg_cur->key.data; - WT_ERR(__curjoin_pack_recno(iter->session, r, cjoin->recno_buf, + WT_ERR(__curjoin_pack_recno(session, r, cjoin->recno_buf, sizeof(cjoin->recno_buf), primkey)); *rp = r; } else { @@ -125,6 +127,8 @@ __curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey, *rp = 0; } iter->curkey = primkey; + iter->entry->stats.actual_count++; + iter->entry->stats.accesses++; err: return (ret); } @@ -144,6 +148,7 @@ __curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter) WT_ERR(__wt_cursor_dup_position( iter->cjoin->entries[0].ends[0].cursor, iter->cursor)); iter->advance = false; + iter->entry->stats.actual_count = 0; } err: return (ret); @@ -342,6 +347,7 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, else c->get_key(c, &curvalue); WT_ERR(__wt_bloom_insert(bloom, &curvalue)); + entry->stats.actual_count++; advance: if ((ret = c->next(c)) == WT_NOTFOUND) break; @@ -427,6 +433,7 @@ __curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) jeend = &cjoin->entries[cjoin->entries_next]; for (je = cjoin->entries; je < jeend; je++) { + __wt_stat_join_init_single(&je->stats); for (end = &je->ends[0]; end < &je->ends[je->ends_next]; end++) WT_ERR(__curjoin_endpoint_init_key(session, je, end)); @@ -615,8 +622,11 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_DECL_RET; WT_INDEX *index; WT_ITEM *key, v; + bool bloom_found; key = cjoin->iter->curkey; + entry->stats.accesses++; + bloom_found = false; if (entry->bloom != NULL) { /* @@ -633,6 +643,7 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, * long way. */ WT_ERR(__wt_bloom_inmem_get(entry->bloom, key)); + bloom_found = true; } if (entry->index != NULL) { c = entry->main; @@ -659,7 +670,11 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_ERR(WT_NOTFOUND); } else WT_ERR(__curjoin_entry_in_range(session, entry, &v, skip_left)); -err: + + if (0) { +err: if (ret == WT_NOTFOUND && bloom_found) + entry->stats.bloom_false_positive++; + } return (ret); } diff --git a/src/cursor/cur_stat.c b/src/cursor/cur_stat.c index 81d028c165a..65d2dc81406 100644 --- a/src/cursor/cur_stat.c +++ b/src/cursor/cur_stat.c @@ -103,7 +103,7 @@ __curstat_get_value(WT_CURSOR *cursor, ...) va_list ap; size_t size; uint64_t *v; - const char **p; + const char *desc, **p; cst = (WT_CURSOR_STAT *)cursor; va_start(ap, cursor); @@ -111,15 +111,13 @@ __curstat_get_value(WT_CURSOR *cursor, ...) WT_CURSOR_NEEDVALUE(cursor); + WT_ERR(cst->stats_desc(cst, WT_STAT_KEY_OFFSET(cst), &desc)); if (F_ISSET(cursor, WT_CURSTD_RAW)) { WT_ERR(__wt_struct_size(session, &size, cursor->value_format, - cst->stats_desc(WT_STAT_KEY_OFFSET(cst)), - cst->pv.data, cst->v)); + desc, cst->pv.data, cst->v)); WT_ERR(__wt_buf_initsize(session, &cursor->value, size)); WT_ERR(__wt_struct_pack(session, cursor->value.mem, size, - cursor->value_format, - cst->stats_desc(WT_STAT_KEY_OFFSET(cst)), - cst->pv.data, cst->v)); + cursor->value_format, desc, cst->pv.data, cst->v)); item = va_arg(ap, WT_ITEM *); item->data = cursor->value.data; @@ -130,7 +128,7 @@ __curstat_get_value(WT_CURSOR *cursor, ...) * pointer support isn't documented, but it's a cheap test. */ if ((p = va_arg(ap, const char **)) != NULL) - *p = cst->stats_desc(WT_STAT_KEY_OFFSET(cst)); + *p = desc; if ((p = va_arg(ap, const char **)) != NULL) *p = cst->pv.data; if ((v = va_arg(ap, uint64_t *)) != NULL) @@ -201,7 +199,9 @@ __curstat_next(WT_CURSOR *cursor) /* Initialize on demand. */ if (cst->notinitialized) { WT_ERR(__wt_curstat_init( - session, cursor->internal_uri, cst->cfg, cst)); + session, cursor->internal_uri, NULL, cst->cfg, cst)); + if (cst->next_set != NULL) + WT_ERR((*cst->next_set)(session, cst, true, true)); cst->notinitialized = false; } @@ -211,15 +211,19 @@ __curstat_next(WT_CURSOR *cursor) cst->key = WT_STAT_KEY_MIN(cst); } else if (cst->key < WT_STAT_KEY_MAX(cst)) ++cst->key; - else { - F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); + else if (cst->next_set != NULL) + WT_ERR((*cst->next_set)(session, cst, true, false)); + else WT_ERR(WT_NOTFOUND); - } + cst->v = (uint64_t)cst->stats[WT_STAT_KEY_OFFSET(cst)]; WT_ERR(__curstat_print_value(session, cst->v, &cst->pv)); F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT); -err: API_END_RET(session, ret); + if (0) { +err: F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); + } + API_END_RET(session, ret); } /* @@ -239,7 +243,9 @@ __curstat_prev(WT_CURSOR *cursor) /* Initialize on demand. */ if (cst->notinitialized) { WT_ERR(__wt_curstat_init( - session, cursor->internal_uri, cst->cfg, cst)); + session, cursor->internal_uri, NULL, cst->cfg, cst)); + if (cst->next_set != NULL) + WT_ERR((*cst->next_set)(session, cst, false, true)); cst->notinitialized = false; } @@ -249,16 +255,19 @@ __curstat_prev(WT_CURSOR *cursor) cst->key = WT_STAT_KEY_MAX(cst); } else if (cst->key > WT_STAT_KEY_MIN(cst)) --cst->key; - else { - F_CLR(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT); + else if (cst->next_set != NULL) + WT_ERR((*cst->next_set)(session, cst, false, false)); + else WT_ERR(WT_NOTFOUND); - } cst->v = (uint64_t)cst->stats[WT_STAT_KEY_OFFSET(cst)]; WT_ERR(__curstat_print_value(session, cst->v, &cst->pv)); F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT); -err: API_END_RET(session, ret); + if (0) { +err: F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); + } + API_END_RET(session, ret); } /* @@ -301,7 +310,7 @@ __curstat_search(WT_CURSOR *cursor) /* Initialize on demand. */ if (cst->notinitialized) { WT_ERR(__wt_curstat_init( - session, cursor->internal_uri, cst->cfg, cst)); + session, cursor->internal_uri, NULL, cst->cfg, cst)); cst->notinitialized = false; } @@ -332,6 +341,7 @@ __curstat_close(WT_CURSOR *cursor) __curstat_free_config(session, cst); __wt_buf_free(session, &cst->pv); + __wt_free(session, cst->desc_buf); WT_ERR(__wt_cursor_close(cursor)); @@ -425,13 +435,103 @@ __wt_curstat_dsrc_final(WT_CURSOR_STAT *cst) cst->stats_desc = __wt_stat_dsrc_desc; } +/* + * __curstat_join_next_set -- + * Advance to another index used in a join to give another set of + * statistics. + */ +static int +__curstat_join_next_set(WT_SESSION_IMPL *session, WT_CURSOR_STAT *cst, + bool forw, bool init) +{ + WT_CURSOR_JOIN *cjoin; + WT_JOIN_STATS_GROUP *join_group; + ssize_t pos; + + WT_ASSERT(session, WT_STREQ(cst->iface.uri, "statistics:join")); + join_group = &cst->u.join_stats_group; + cjoin = join_group->join_cursor; + if (init) + pos = forw ? 0 : cjoin->entries_next - 1; + else + pos = join_group->join_cursor_entry + (forw ? 1 : -1); + if (pos < 0 || (size_t)pos >= cjoin->entries_next) + return (WT_NOTFOUND); + + join_group->join_cursor_entry = pos; + if (cjoin->entries[pos].index == NULL) { + WT_ASSERT(session, WT_PREFIX_MATCH(cjoin->iface.uri, "join:")); + join_group->desc_prefix = cjoin->iface.uri + 5; + } else + join_group->desc_prefix = cjoin->entries[pos].index->name; + join_group->join_stats = cjoin->entries[pos].stats; + if (!init) + cst->key = forw ? WT_STAT_KEY_MIN(cst) : WT_STAT_KEY_MAX(cst); + return (0); +} + +/* + * __curstat_join_desc -- + * Assemble the description field based on current index and statistic. + */ +static int +__curstat_join_desc(WT_CURSOR_STAT *cst, int slot, const char **resultp) +{ + size_t len; + const char *static_desc; + WT_JOIN_STATS_GROUP *sgrp; + WT_SESSION_IMPL *session; + + sgrp = &cst->u.join_stats_group; + session = (WT_SESSION_IMPL *)sgrp->join_cursor->iface.session; + WT_RET(__wt_stat_join_desc(cst, slot, &static_desc)); + len = strlen("join: ") + strlen(sgrp->desc_prefix) + + strlen(static_desc) + 1; + WT_RET(__wt_realloc(session, NULL, len, &cst->desc_buf)); + snprintf(cst->desc_buf, len, "join: %s%s", sgrp->desc_prefix, + static_desc); + *resultp = cst->desc_buf; + return (0); +} + +/* + * __curstat_join_init -- + * Initialize the statistics for a joined cursor. + */ +static int +__curstat_join_init(WT_SESSION_IMPL *session, + WT_CURSOR *curjoin, const char *cfg[], WT_CURSOR_STAT *cst) +{ + WT_CURSOR_JOIN *cjoin; + WT_DECL_RET; + + WT_UNUSED(cfg); + + if (curjoin == NULL && cst->u.join_stats_group.join_cursor != NULL) + curjoin = &cst->u.join_stats_group.join_cursor->iface; + if (curjoin == NULL || !WT_PREFIX_MATCH(curjoin->uri, "join:")) + WT_ERR_MSG(session, EINVAL, + "join cursor must be used with statistics:join"); + cjoin = (WT_CURSOR_JOIN *)curjoin; + memset(&cst->u.join_stats_group, 0, sizeof(WT_JOIN_STATS_GROUP)); + cst->u.join_stats_group.join_cursor = cjoin; + + cst->stats = (int64_t *)&cst->u.join_stats_group.join_stats; + cst->stats_base = WT_JOIN_STATS_BASE; + cst->stats_count = sizeof(WT_JOIN_STATS) / sizeof(int64_t); + cst->stats_desc = __curstat_join_desc; + cst->next_set = __curstat_join_next_set; + +err: return (ret); +} + /* * __wt_curstat_init -- * Initialize a statistics cursor. */ int __wt_curstat_init(WT_SESSION_IMPL *session, - const char *uri, const char *cfg[], WT_CURSOR_STAT *cst) + const char *uri, WT_CURSOR *curjoin, const char *cfg[], WT_CURSOR_STAT *cst) { const char *dsrc_uri; @@ -442,6 +542,10 @@ __wt_curstat_init(WT_SESSION_IMPL *session, dsrc_uri = uri + strlen("statistics:"); + if (WT_STREQ(dsrc_uri, "join")) + return ( + __curstat_join_init(session, curjoin, cfg, cst)); + if (WT_PREFIX_MATCH(dsrc_uri, "colgroup:")) return ( __wt_curstat_colgroup_init(session, dsrc_uri, cfg, cst)); @@ -467,7 +571,7 @@ __wt_curstat_init(WT_SESSION_IMPL *session, */ int __wt_curstat_open(WT_SESSION_IMPL *session, - const char *uri, const char *cfg[], WT_CURSOR **cursorp) + const char *uri, WT_CURSOR *other, const char *cfg[], WT_CURSOR **cursorp) { WT_CONNECTION_IMPL *conn; WT_CURSOR_STATIC_INIT(iface, @@ -581,7 +685,7 @@ __wt_curstat_open(WT_SESSION_IMPL *session, * objects like tables, we need to a valid set of statistics when before * the open returns. */ - WT_ERR(__wt_curstat_init(session, uri, cst->cfg, cst)); + WT_ERR(__wt_curstat_init(session, uri, other, cst->cfg, cst)); cst->notinitialized = false; /* The cursor isn't yet positioned. */ diff --git a/src/include/cursor.h b/src/include/cursor.h index a65a95e91f5..98f520b9fd8 100644 --- a/src/include/cursor.h +++ b/src/include/cursor.h @@ -300,9 +300,11 @@ struct __wt_cursor_join_entry { #define WT_CURJOIN_ENTRY_OWN_BLOOM 0x04 /* this entry owns the bloom */ uint8_t flags; - WT_CURSOR_JOIN_ENDPOINT *ends; /* reference endpoints */ + WT_CURSOR_JOIN_ENDPOINT *ends; /* reference endpoints */ size_t ends_allocated; size_t ends_next; + + WT_JOIN_STATS stats; /* Join statistics */ }; struct __wt_cursor_join { @@ -356,6 +358,13 @@ struct __wt_cursor_metadata { uint32_t flags; }; +struct __wt_join_stats_group { + const char *desc_prefix; /* Prefix appears before description */ + WT_CURSOR_JOIN *join_cursor; + size_t join_cursor_entry; /* Position in entries */ + WT_JOIN_STATS join_stats; +}; + struct __wt_cursor_stat { WT_CURSOR iface; @@ -365,14 +374,19 @@ struct __wt_cursor_stat { int64_t *stats; /* Statistics */ int stats_base; /* Base statistics value */ int stats_count; /* Count of statistics values */ - const char *(*stats_desc)(int); /* Statistics descriptions */ + int (*stats_desc)(WT_CURSOR_STAT *, int, const char **); + /* Statistics descriptions */ + int (*next_set)(WT_SESSION_IMPL *, WT_CURSOR_STAT *, bool, + bool); /* Advance to next set */ union { /* Copies of the statistics */ WT_DSRC_STATS dsrc_stats; WT_CONNECTION_STATS conn_stats; + WT_JOIN_STATS_GROUP join_stats_group; } u; const char **cfg; /* Original cursor configuration */ + char *desc_buf; /* Saved description string */ int key; /* Current stats key */ uint64_t v; /* Current stats value */ diff --git a/src/include/extern.h b/src/include/extern.h index 0c99e2068f9..6f4731aaa93 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -292,8 +292,8 @@ extern int __wt_json_strncpy(char **pdst, size_t dstlen, const char *src, size_t extern int __wt_curlog_open(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR **cursorp); extern int __wt_curmetadata_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp); extern void __wt_curstat_dsrc_final(WT_CURSOR_STAT *cst); -extern int __wt_curstat_init(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR_STAT *cst); -extern int __wt_curstat_open(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], WT_CURSOR **cursorp); +extern int __wt_curstat_init(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *curjoin, const char *cfg[], WT_CURSOR_STAT *cst); +extern int __wt_curstat_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *other, const char *cfg[], WT_CURSOR **cursorp); extern int __wt_cursor_notsup(WT_CURSOR *cursor); extern int __wt_cursor_noop(WT_CURSOR *cursor); extern void __wt_cursor_set_notsup(WT_CURSOR *cursor); @@ -682,19 +682,24 @@ __wt_scr_alloc_func(WT_SESSION_IMPL *session, size_t size, WT_ITEM **scratchp extern void __wt_scr_discard(WT_SESSION_IMPL *session); extern void *__wt_ext_scr_alloc( WT_EXTENSION_API *wt_api, WT_SESSION *wt_session, size_t size); extern void __wt_ext_scr_free(WT_EXTENSION_API *wt_api, WT_SESSION *wt_session, void *p); -extern const char *__wt_stat_dsrc_desc(int slot); +extern int __wt_stat_dsrc_desc(WT_CURSOR_STAT *cst, int slot, const char **p); extern void __wt_stat_dsrc_init_single(WT_DSRC_STATS *stats); extern void __wt_stat_dsrc_init(WT_DATA_HANDLE *handle); extern void __wt_stat_dsrc_clear_single(WT_DSRC_STATS *stats); extern void __wt_stat_dsrc_clear_all(WT_DSRC_STATS **stats); extern void __wt_stat_dsrc_aggregate_single( WT_DSRC_STATS *from, WT_DSRC_STATS *to); extern void __wt_stat_dsrc_aggregate( WT_DSRC_STATS **from, WT_DSRC_STATS *to); -extern const char *__wt_stat_connection_desc(int slot); +extern int __wt_stat_connection_desc(WT_CURSOR_STAT *cst, int slot, const char **p); extern void __wt_stat_connection_init_single(WT_CONNECTION_STATS *stats); extern void __wt_stat_connection_init(WT_CONNECTION_IMPL *handle); extern void __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats); extern void __wt_stat_connection_clear_all(WT_CONNECTION_STATS **stats); extern void __wt_stat_connection_aggregate( WT_CONNECTION_STATS **from, WT_CONNECTION_STATS *to); +extern int __wt_stat_join_desc(WT_CURSOR_STAT *cst, int slot, const char **p); +extern void __wt_stat_join_init_single(WT_JOIN_STATS *stats); +extern void __wt_stat_join_clear_single(WT_JOIN_STATS *stats); +extern void __wt_stat_join_clear_all(WT_JOIN_STATS **stats); +extern void __wt_stat_join_aggregate( WT_JOIN_STATS **from, WT_JOIN_STATS *to); extern void __wt_txn_release_snapshot(WT_SESSION_IMPL *session); extern void __wt_txn_get_snapshot(WT_SESSION_IMPL *session); extern void __wt_txn_update_oldest(WT_SESSION_IMPL *session, bool force); diff --git a/src/include/stat.h b/src/include/stat.h index 1ebe253e5db..32cee27f832 100644 --- a/src/include/stat.h +++ b/src/include/stat.h @@ -486,4 +486,14 @@ struct __wt_dsrc_stats { int64_t txn_update_conflict; }; +/* + * Statistics entries for join cursors. + */ +#define WT_JOIN_STATS_BASE 3000 +struct __wt_join_stats { + int64_t accesses; + int64_t actual_count; + int64_t bloom_false_positive; +}; + /* Statistics section: END */ diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in index 23e6e183e43..3b84c128982 100644 --- a/src/include/wiredtiger.in +++ b/src/include/wiredtiger.in @@ -4185,6 +4185,19 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection); #define WT_STAT_DSRC_SESSION_CURSOR_OPEN 2094 /*! transaction: update conflicts */ #define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2095 + +/*! + * @} + * @name Statistics for join cursors + * @anchor statistics_join + * @{ + */ +/*! : accesses */ +#define WT_STAT_JOIN_ACCESSES 3000 +/*! : actual count of items */ +#define WT_STAT_JOIN_ACTUAL_COUNT 3001 +/*! : bloom filter false positives */ +#define WT_STAT_JOIN_BLOOM_FALSE_POSITIVE 3002 /*! @} */ /* * Statistics section: END diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h index 9dcc70042cc..0a1e143ce70 100644 --- a/src/include/wt_internal.h +++ b/src/include/wt_internal.h @@ -186,6 +186,10 @@ struct __wt_insert; typedef struct __wt_insert WT_INSERT; struct __wt_insert_head; typedef struct __wt_insert_head WT_INSERT_HEAD; +struct __wt_join_stats; + typedef struct __wt_join_stats WT_JOIN_STATS; +struct __wt_join_stats_group; + typedef struct __wt_join_stats_group WT_JOIN_STATS_GROUP; struct __wt_keyed_encryptor; typedef struct __wt_keyed_encryptor WT_KEYED_ENCRYPTOR; struct __wt_log; diff --git a/src/lsm/lsm_stat.c b/src/lsm/lsm_stat.c index 4381ca0df00..c1eb7a2a389 100644 --- a/src/lsm/lsm_stat.c +++ b/src/lsm/lsm_stat.c @@ -77,12 +77,12 @@ __curstat_lsm_init( */ WT_ERR(__wt_buf_fmt( session, uribuf, "statistics:%s", chunk->uri)); - ret = __wt_curstat_open(session, uribuf->data, + ret = __wt_curstat_open(session, uribuf->data, NULL, F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) ? disk_cfg : cfg, &stat_cursor); if (ret == WT_NOTFOUND && F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)) ret = __wt_curstat_open( - session, uribuf->data, cfg, &stat_cursor); + session, uribuf->data, NULL, cfg, &stat_cursor); WT_ERR(ret); /* @@ -107,7 +107,7 @@ __curstat_lsm_init( WT_ERR(__wt_buf_fmt( session, uribuf, "statistics:%s", chunk->bloom_uri)); WT_ERR(__wt_curstat_open( - session, uribuf->data, cfg, &stat_cursor)); + session, uribuf->data, NULL, cfg, &stat_cursor)); /* * The underlying statistics have now been initialized; fill in diff --git a/src/schema/schema_stat.c b/src/schema/schema_stat.c index d73d66cd399..82c2e2a15dc 100644 --- a/src/schema/schema_stat.c +++ b/src/schema/schema_stat.c @@ -24,7 +24,7 @@ __wt_curstat_colgroup_init(WT_SESSION_IMPL *session, WT_RET(__wt_scr_alloc(session, 0, &buf)); WT_ERR(__wt_buf_fmt(session, buf, "statistics:%s", colgroup->source)); - ret = __wt_curstat_init(session, buf->data, cfg, cst); + ret = __wt_curstat_init(session, buf->data, NULL, cfg, cst); err: __wt_scr_free(session, &buf); return (ret); @@ -46,7 +46,7 @@ __wt_curstat_index_init(WT_SESSION_IMPL *session, WT_RET(__wt_scr_alloc(session, 0, &buf)); WT_ERR(__wt_buf_fmt(session, buf, "statistics:%s", idx->source)); - ret = __wt_curstat_init(session, buf->data, cfg, cst); + ret = __wt_curstat_init(session, buf->data, NULL, cfg, cst); err: __wt_scr_free(session, &buf); return (ret); @@ -159,7 +159,7 @@ __wt_curstat_table_init(WT_SESSION_IMPL *session, WT_ERR(__wt_buf_fmt( session, buf, "statistics:%s", table->cgroups[i]->name)); WT_ERR(__wt_curstat_open( - session, buf->data, cfg, &stat_cursor)); + session, buf->data, NULL, cfg, &stat_cursor)); new = (WT_DSRC_STATS *)WT_CURSOR_STATS(stat_cursor); if (i == 0) *stats = *new; @@ -174,7 +174,7 @@ __wt_curstat_table_init(WT_SESSION_IMPL *session, WT_ERR(__wt_buf_fmt( session, buf, "statistics:%s", table->indices[i]->name)); WT_ERR(__wt_curstat_open( - session, buf->data, cfg, &stat_cursor)); + session, buf->data, NULL, cfg, &stat_cursor)); new = (WT_DSRC_STATS *)WT_CURSOR_STATS(stat_cursor); __wt_stat_dsrc_aggregate_single(new, stats); WT_ERR(stat_cursor->close(stat_cursor)); diff --git a/src/session/session_api.c b/src/session/session_api.c index bfa7a6d67e3..e3a28732494 100644 --- a/src/session/session_api.c +++ b/src/session/session_api.c @@ -240,12 +240,12 @@ err: API_END_RET_NOTFOUND_MAP(session, ret); } /* - * __wt_open_cursor -- - * Internal version of WT_SESSION::open_cursor. + * __session_open_cursor_int -- + * Internal version of WT_SESSION::open_cursor, with second cursor arg. */ -int -__wt_open_cursor(WT_SESSION_IMPL *session, - const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp) +static int +__session_open_cursor_int(WT_SESSION_IMPL *session, const char *uri, + WT_CURSOR *owner, WT_CURSOR *other, const char *cfg[], WT_CURSOR **cursorp) { WT_COLGROUP *colgroup; WT_DATA_SOURCE *dsrc; @@ -322,7 +322,8 @@ __wt_open_cursor(WT_SESSION_IMPL *session, break; case 's': if (WT_PREFIX_MATCH(uri, "statistics:")) - WT_RET(__wt_curstat_open(session, uri, cfg, cursorp)); + WT_RET(__wt_curstat_open(session, uri, other, cfg, + cursorp)); break; default: break; @@ -351,6 +352,18 @@ __wt_open_cursor(WT_SESSION_IMPL *session, return (ret); } +/* + * __wt_open_cursor -- + * Internal version of WT_SESSION::open_cursor. + */ +int +__wt_open_cursor(WT_SESSION_IMPL *session, + const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp) +{ + return (__session_open_cursor_int(session, uri, owner, NULL, cfg, + cursorp)); +} + /* * __session_open_cursor -- * WT_SESSION->open_cursor method. @@ -362,18 +375,22 @@ __session_open_cursor(WT_SESSION *wt_session, WT_CURSOR *cursor; WT_DECL_RET; WT_SESSION_IMPL *session; + bool statjoin; cursor = *cursorp = NULL; session = (WT_SESSION_IMPL *)wt_session; SESSION_API_CALL(session, open_cursor, config, cfg); - if ((to_dup == NULL && uri == NULL) || (to_dup != NULL && uri != NULL)) + statjoin = (to_dup != NULL && uri != NULL && + WT_STREQ(uri, "statistics:join")); + if ((to_dup == NULL && uri == NULL) || + (to_dup != NULL && uri != NULL && !statjoin)) WT_ERR_MSG(session, EINVAL, "should be passed either a URI or a cursor to duplicate, " "but not both"); - if (to_dup != NULL) { + if (to_dup != NULL && !statjoin) { uri = to_dup->uri; if (!WT_PREFIX_MATCH(uri, "colgroup:") && !WT_PREFIX_MATCH(uri, "index:") && @@ -385,8 +402,9 @@ __session_open_cursor(WT_SESSION *wt_session, WT_ERR(__wt_bad_object_type(session, uri)); } - WT_ERR(__wt_open_cursor(session, uri, NULL, cfg, &cursor)); - if (to_dup != NULL) + WT_ERR(__session_open_cursor_int(session, uri, NULL, + statjoin ? to_dup : NULL, cfg, &cursor)); + if (to_dup != NULL && !statjoin) WT_ERR(__wt_cursor_dup_position(to_dup, cursor)); *cursorp = cursor; diff --git a/src/support/stat.c b/src/support/stat.c index 9e817fad512..83c0166b46e 100644 --- a/src/support/stat.c +++ b/src/support/stat.c @@ -101,10 +101,12 @@ static const char * const __stats_dsrc_desc[] = { "transaction: update conflicts", }; -const char * -__wt_stat_dsrc_desc(int slot) +int +__wt_stat_dsrc_desc(WT_CURSOR_STAT *cst, int slot, const char **p) { - return (__stats_dsrc_desc[slot]); + WT_UNUSED(cst); + *p = __stats_dsrc_desc[slot]; + return (0); } void @@ -640,10 +642,12 @@ static const char * const __stats_connection_desc[] = { "connection: total write I/Os", }; -const char * -__wt_stat_connection_desc(int slot) +int +__wt_stat_connection_desc(WT_CURSOR_STAT *cst, int slot, const char **p) { - return (__stats_connection_desc[slot]); + WT_UNUSED(cst); + *p = __stats_connection_desc[slot]; + return (0); } void @@ -1024,3 +1028,49 @@ __wt_stat_connection_aggregate( to->txn_commit += WT_STAT_READ(from, txn_commit); to->txn_rollback += WT_STAT_READ(from, txn_rollback); } + +static const char * const __stats_join_desc[] = { + ": accesses", + ": actual count of items", + ": bloom filter false positives", +}; + +int +__wt_stat_join_desc(WT_CURSOR_STAT *cst, int slot, const char **p) +{ + WT_UNUSED(cst); + *p = __stats_join_desc[slot]; + return (0); +} + +void +__wt_stat_join_init_single(WT_JOIN_STATS *stats) +{ + memset(stats, 0, sizeof(*stats)); +} + +void +__wt_stat_join_clear_single(WT_JOIN_STATS *stats) +{ + stats->accesses = 0; + stats->actual_count = 0; + stats->bloom_false_positive = 0; +} + +void +__wt_stat_join_clear_all(WT_JOIN_STATS **stats) +{ + u_int i; + + for (i = 0; i < WT_COUNTER_SLOTS; ++i) + __wt_stat_join_clear_single(stats[i]); +} + +void +__wt_stat_join_aggregate( + WT_JOIN_STATS **from, WT_JOIN_STATS *to) +{ + to->accesses += WT_STAT_READ(from, accesses); + to->actual_count += WT_STAT_READ(from, actual_count); + to->bloom_false_positive += WT_STAT_READ(from, bloom_false_positive); +} diff --git a/test/suite/test_join01.py b/test/suite/test_join01.py index 58a135f3bcd..ca6e5fbcabb 100644 --- a/test/suite/test_join01.py +++ b/test/suite/test_join01.py @@ -41,6 +41,12 @@ class test_join01(wttest.WiredTigerTestCase): ('index', dict(ref='index')) ] + # Override WiredTigerTestCase, we have statistics tests. + def setUpConnectionOpen(self, dir): + conn = wiredtiger.wiredtiger_open(dir, + 'create,statistics=(all),' + 'error_prefix="%s: "' % self.shortid()) + return conn + def gen_key(self, i): return [ i + 1 ] @@ -68,9 +74,51 @@ class test_join01(wttest.WiredTigerTestCase): expect.remove(i) self.assertEquals(0, len(expect)) + # Stats are collected twice: after iterating + # through the join cursor once, and secondly after resetting + # the join cursor and iterating again. + def stats(self, jc, which): + statcur = self.session.open_cursor('statistics:join', jc, None) + self.check_stats(statcur, 0, 'join: index:join01:index1: ' + + 'bloom filter false positives') + statcur.close() + + def statstr_to_int(self, str): + """ + Convert a statistics value string, which may be in either form: + '12345' or '33M (33604836)' + """ + parts = str.rpartition('(') + return int(parts[2].rstrip(')')) + + # string should appear with a minimum value of least "min". + def check_stats(self, statcursor, min, lookfor): + stringclass = ''.__class__ + intclass = (0).__class__ + + # Reset the cursor, we're called multiple times. + statcursor.reset() + + found = False + foundval = 0 + self.printVerbose(3, 'statistics:') + for id, desc, valstr, val in statcursor: + self.assertEqual(type(desc), stringclass) + self.assertEqual(type(valstr), stringclass) + self.assertEqual(type(val), intclass) + self.assertEqual(val, self.statstr_to_int(valstr)) + self.printVerbose(3, ' stat: \'' + desc + '\', \'' + + valstr + '\', ' + str(val)) + if desc == lookfor: + found = True + foundval = val + + self.assertTrue(found, 'in stats, did not see: ' + lookfor) + self.assertTrue(foundval >= min) + # Common function for testing the most basic functionality # of joins - def join_common(self, joincfg0, joincfg1, do_proj): + def join_common(self, joincfg0, joincfg1, do_proj, do_stats): #self.tty('join_common(' + joincfg0 + ',' + joincfg1 + ',' + # str(do_proj) + ')') self.session.create('table:join01', 'key_format=r' + @@ -130,9 +178,15 @@ class test_join01(wttest.WiredTigerTestCase): # [73, 82, 62, 83, 92]. # # After iterating, we should be able to reset and iterate again. + if do_stats: + self.stats(jc, 0) self.iter_common(jc, do_proj) + if do_stats: + self.stats(jc, 1) jc.reset() self.iter_common(jc, do_proj) + if do_stats: + self.stats(jc, 2) jc.reset() self.iter_common(jc, do_proj) @@ -153,7 +207,7 @@ class test_join01(wttest.WiredTigerTestCase): #self.tty('cfga=' + cfga + # ', cfgb=' + cfgb + # ', doproj=' + str(do_proj)) - self.join_common(cfga, cfgb, do_proj) + self.join_common(cfga, cfgb, do_proj, False) def test_join_errors(self): self.session.create('table:join01', 'key_format=r,value_format=SS' @@ -328,5 +382,15 @@ class test_join01(wttest.WiredTigerTestCase): def test_cursor_close2(self): self.cursor_close_common(False) + def test_stats(self): + bloomcfg1000 = ',strategy=bloom,count=1000' + bloomcfg10 = ',strategy=bloom,count=10' + self.join_common(bloomcfg1000, bloomcfg1000, False, True) + + # Intentially run with an underconfigured Bloom filter, + # statistics should pick up some false positives. + self.join_common(bloomcfg10, bloomcfg10, False, True) + + if __name__ == '__main__': wttest.run() -- cgit v1.2.1 From e95bff1310097caef190dbe8210ee3f59b7681ac Mon Sep 17 00:00:00 2001 From: Susan LoVerso Date: Tue, 17 Nov 2015 14:21:40 -0500 Subject: WT-2218 Add connection level stat for number of deleted pages. --- dist/stat_data.py | 1 + src/include/stat.h | 1 + src/include/wiredtiger.in | 48 ++++++++++++++++++++++++----------------------- src/reconcile/rec_write.c | 1 + src/support/stat.c | 3 +++ 5 files changed, 31 insertions(+), 23 deletions(-) diff --git a/dist/stat_data.py b/dist/stat_data.py index 4b186625e7d..9c49e20fa61 100644 --- a/dist/stat_data.py +++ b/dist/stat_data.py @@ -278,6 +278,7 @@ connection_stats = [ # Reconciliation statistics ########################################## RecStat('rec_pages', 'page reconciliation calls'), + RecStat('rec_page_delete', 'pages deleted'), RecStat('rec_pages_eviction', 'page reconciliation calls for eviction'), RecStat('rec_split_stashed_bytes', 'split bytes currently awaiting free', 'no_clear,no_scale'), diff --git a/src/include/stat.h b/src/include/stat.h index 0a79a6f6140..2a8552fded1 100644 --- a/src/include/stat.h +++ b/src/include/stat.h @@ -359,6 +359,7 @@ struct __wt_connection_stats { int64_t page_read_blocked; int64_t page_sleep; int64_t read_io; + int64_t rec_page_delete; int64_t rec_pages; int64_t rec_pages_eviction; int64_t rec_split_stashed_bytes; diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in index ad716fa5e6a..8503f5918e9 100644 --- a/src/include/wiredtiger.in +++ b/src/include/wiredtiger.in @@ -3887,52 +3887,54 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection); #define WT_STAT_CONN_PAGE_SLEEP 1130 /*! connection: total read I/Os */ #define WT_STAT_CONN_READ_IO 1131 +/*! reconciliation: pages deleted */ +#define WT_STAT_CONN_REC_PAGE_DELETE 1132 /*! reconciliation: page reconciliation calls */ -#define WT_STAT_CONN_REC_PAGES 1132 +#define WT_STAT_CONN_REC_PAGES 1133 /*! reconciliation: page reconciliation calls for eviction */ -#define WT_STAT_CONN_REC_PAGES_EVICTION 1133 +#define WT_STAT_CONN_REC_PAGES_EVICTION 1134 /*! reconciliation: split bytes currently awaiting free */ -#define WT_STAT_CONN_REC_SPLIT_STASHED_BYTES 1134 +#define WT_STAT_CONN_REC_SPLIT_STASHED_BYTES 1135 /*! reconciliation: split objects currently awaiting free */ -#define WT_STAT_CONN_REC_SPLIT_STASHED_OBJECTS 1135 +#define WT_STAT_CONN_REC_SPLIT_STASHED_OBJECTS 1136 /*! connection: pthread mutex shared lock read-lock calls */ -#define WT_STAT_CONN_RWLOCK_READ 1136 +#define WT_STAT_CONN_RWLOCK_READ 1137 /*! connection: pthread mutex shared lock write-lock calls */ -#define WT_STAT_CONN_RWLOCK_WRITE 1137 +#define WT_STAT_CONN_RWLOCK_WRITE 1138 /*! session: open cursor count */ -#define WT_STAT_CONN_SESSION_CURSOR_OPEN 1138 +#define WT_STAT_CONN_SESSION_CURSOR_OPEN 1139 /*! session: open session count */ -#define WT_STAT_CONN_SESSION_OPEN 1139 +#define WT_STAT_CONN_SESSION_OPEN 1140 /*! transaction: transaction begins */ -#define WT_STAT_CONN_TXN_BEGIN 1140 +#define WT_STAT_CONN_TXN_BEGIN 1141 /*! transaction: transaction checkpoints */ -#define WT_STAT_CONN_TXN_CHECKPOINT 1141 +#define WT_STAT_CONN_TXN_CHECKPOINT 1142 /*! transaction: transaction checkpoint generation */ -#define WT_STAT_CONN_TXN_CHECKPOINT_GENERATION 1142 +#define WT_STAT_CONN_TXN_CHECKPOINT_GENERATION 1143 /*! transaction: transaction checkpoint currently running */ -#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1143 +#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1144 /*! transaction: transaction checkpoint max time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1144 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1145 /*! transaction: transaction checkpoint min time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1145 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1146 /*! transaction: transaction checkpoint most recent time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1146 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1147 /*! transaction: transaction checkpoint total time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1147 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1148 /*! transaction: transactions committed */ -#define WT_STAT_CONN_TXN_COMMIT 1148 +#define WT_STAT_CONN_TXN_COMMIT 1149 /*! transaction: transaction failures due to cache overflow */ -#define WT_STAT_CONN_TXN_FAIL_CACHE 1149 +#define WT_STAT_CONN_TXN_FAIL_CACHE 1150 /*! transaction: transaction range of IDs currently pinned by a checkpoint */ -#define WT_STAT_CONN_TXN_PINNED_CHECKPOINT_RANGE 1150 +#define WT_STAT_CONN_TXN_PINNED_CHECKPOINT_RANGE 1151 /*! transaction: transaction range of IDs currently pinned */ -#define WT_STAT_CONN_TXN_PINNED_RANGE 1151 +#define WT_STAT_CONN_TXN_PINNED_RANGE 1152 /*! transaction: transactions rolled back */ -#define WT_STAT_CONN_TXN_ROLLBACK 1152 +#define WT_STAT_CONN_TXN_ROLLBACK 1153 /*! transaction: transaction sync calls */ -#define WT_STAT_CONN_TXN_SYNC 1153 +#define WT_STAT_CONN_TXN_SYNC 1154 /*! connection: total write I/Os */ -#define WT_STAT_CONN_WRITE_IO 1154 +#define WT_STAT_CONN_WRITE_IO 1155 /*! * @} diff --git a/src/reconcile/rec_write.c b/src/reconcile/rec_write.c index 4479f4a8515..e57e18f4e87 100644 --- a/src/reconcile/rec_write.c +++ b/src/reconcile/rec_write.c @@ -5470,6 +5470,7 @@ __rec_write_wrapup(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page) case 0: /* Page delete */ WT_RET(__wt_verbose( session, WT_VERB_RECONCILE, "page %p empty", page)); + WT_STAT_FAST_CONN_INCR(session, rec_page_delete); WT_STAT_FAST_DATA_INCR(session, rec_page_delete); /* If this is the root page, we need to create a sync point. */ diff --git a/src/support/stat.c b/src/support/stat.c index 6c18621c42d..7a84a7b39da 100644 --- a/src/support/stat.c +++ b/src/support/stat.c @@ -620,6 +620,7 @@ static const char * const __stats_connection_desc[] = { "thread-yield: page acquire read blocked", "thread-yield: page acquire time sleeping (usecs)", "connection: total read I/Os", + "reconciliation: pages deleted", "reconciliation: page reconciliation calls", "reconciliation: page reconciliation calls for eviction", "reconciliation: split bytes currently awaiting free", @@ -803,6 +804,7 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats) stats->lsm_work_queue_max = 0; stats->rec_pages = 0; stats->rec_pages_eviction = 0; + stats->rec_page_delete = 0; /* not clearing rec_split_stashed_bytes */ /* not clearing rec_split_stashed_objects */ /* not clearing session_cursor_open */ @@ -997,6 +999,7 @@ __wt_stat_connection_aggregate( to->lsm_work_queue_max += WT_STAT_READ(from, lsm_work_queue_max); to->rec_pages += WT_STAT_READ(from, rec_pages); to->rec_pages_eviction += WT_STAT_READ(from, rec_pages_eviction); + to->rec_page_delete += WT_STAT_READ(from, rec_page_delete); to->rec_split_stashed_bytes += WT_STAT_READ(from, rec_split_stashed_bytes); to->rec_split_stashed_objects += -- cgit v1.2.1 From d72cf1ce398c7cc3e97ea4611cfeb8de088d43cc Mon Sep 17 00:00:00 2001 From: Michael Cahill Date: Wed, 18 Nov 2015 11:40:27 +1100 Subject: Fix -Wshadow warnings: ../src/cursor/cur_join.c: In function '__curjoin_entry_member': ../src/cursor/cur_join.c:623:12: error: declaration of 'index' shadows a global declaration [-Werror=shadow] ../src/cursor/cur_join.c: In function '__wt_curjoin_join': ../src/cursor/cur_join.c:891:15: error: declaration of 'index' shadows a global declaration [-Werror=shadow] --- src/cursor/cur_join.c | 14 +++++++------- src/include/extern.h | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index 407b0b7f386..6fb35d8c241 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -620,7 +620,7 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, __wt_cursor_notsup, /* remove */ __wt_cursor_notsup); /* close */ WT_DECL_RET; - WT_INDEX *index; + WT_INDEX *idx; WT_ITEM *key, v; bool bloom_found; @@ -658,13 +658,13 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, } else v = *key; - if ((index = entry->index) != NULL && index->extractor) { + if ((idx = entry->index) != NULL && idx->extractor != NULL) { extract_cursor.iface = iface; extract_cursor.iface.session = &session->iface; - extract_cursor.iface.key_format = index->exkey_format; + extract_cursor.iface.key_format = idx->exkey_format; extract_cursor.ismember = 0; extract_cursor.entry = entry; - WT_ERR(index->extractor->extract(index->extractor, + WT_ERR(idx->extractor->extract(idx->extractor, &session->iface, key, &v, &extract_cursor.iface)); if (!extract_cursor.ismember) WT_ERR(WT_NOTFOUND); @@ -888,7 +888,7 @@ err: WT_TRET(__curjoin_close(cursor)); */ int __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, - WT_INDEX *index, WT_CURSOR *ref_cursor, uint32_t flags, uint32_t range, + WT_INDEX *idx, WT_CURSOR *ref_cursor, uint32_t flags, uint32_t range, uint64_t count, uint64_t bloom_bit_count, uint64_t bloom_hash_count) { WT_CURSOR_JOIN_ENTRY *entry; @@ -906,7 +906,7 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, main_uri = NULL; namesize = strlen(cjoin->table->name); for (i = 0; i < cjoin->entries_next; i++) { - if (cjoin->entries[i].index == index) { + if (cjoin->entries[i].index == idx) { entry = &cjoin->entries[i]; break; } @@ -937,7 +937,7 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, } else entry = &cjoin->entries[cjoin->entries_next]; - entry->index = index; + entry->index = idx; entry->flags = flags; entry->count = count; entry->bloom_bit_count = bloom_bit_count; diff --git a/src/include/extern.h b/src/include/extern.h index c999ee2752f..743a3c3ac31 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -279,7 +279,7 @@ extern int __wt_curfile_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSO extern int __wt_curindex_joined(WT_CURSOR *cursor); extern int __wt_curindex_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp); extern int __wt_curjoin_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp); -extern int __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_INDEX *index, WT_CURSOR *ref_cursor, uint32_t flags, uint32_t range, uint64_t count, uint64_t bloom_bit_count, uint64_t bloom_hash_count); +extern int __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_INDEX *idx, WT_CURSOR *ref_cursor, uint32_t flags, uint32_t range, uint64_t count, uint64_t bloom_bit_count, uint64_t bloom_hash_count); extern int __wt_json_alloc_unpack(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, WT_CURSOR_JSON *json, bool iskey, va_list ap); extern void __wt_json_close(WT_SESSION_IMPL *session, WT_CURSOR *cursor); extern size_t __wt_json_unpack_char(char ch, u_char *buf, size_t bufsz, bool force_unicode); -- cgit v1.2.1 From 7407929fdff8aa934603395f7837eb5e389ac68e Mon Sep 17 00:00:00 2001 From: Michael Cahill Date: Wed, 18 Nov 2015 12:48:50 +1100 Subject: Fix another warning: ../src/cursor/cur_join.c:1022:11: error: 'ins' may be used uninitialized in this function [-Werror=uninitialized] --- src/cursor/cur_join.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index 6fb35d8c241..2536e458ee0 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -904,7 +904,9 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, entry = NULL; hasins = needbloom = false; main_uri = NULL; + nonbloom = 0; /* -Wuninitialized */ namesize = strlen(cjoin->table->name); + for (i = 0; i < cjoin->entries_next; i++) { if (cjoin->entries[i].index == idx) { entry = &cjoin->entries[i]; -- cgit v1.2.1 From 773396d9413dbd0671dbac0b4f42d0fd125d3f90 Mon Sep 17 00:00:00 2001 From: Michael Cahill Date: Wed, 18 Nov 2015 13:29:10 +1100 Subject: Another warning: ../src/cursor/cur_join.c:1024:11: error: 'ins' may be used uninitialized in this function [-Werror=uninitialized] --- src/cursor/cur_join.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index 2536e458ee0..af53dc0a271 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -903,6 +903,7 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, entry = NULL; hasins = needbloom = false; + ins = 0; /* -Wuninitialized */ main_uri = NULL; nonbloom = 0; /* -Wuninitialized */ namesize = strlen(cjoin->table->name); -- cgit v1.2.1 From a6c11bf6b5dabeda24dcc427d81154b619e811f1 Mon Sep 17 00:00:00 2001 From: Michael Cahill Date: Wed, 18 Nov 2015 13:36:05 +1100 Subject: Fix another warning: ../src/session/session_api.c:654:12: error: declaration of 'index' shadows a global declaration [-Werror=shadow] (Note: these were not picked up earlier because Jenkins scheduled builds on a server with a different version of GCC). --- src/session/session_api.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/session/session_api.c b/src/session/session_api.c index 25de2f2983a..67e26618cf6 100644 --- a/src/session/session_api.c +++ b/src/session/session_api.c @@ -651,7 +651,7 @@ __session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor, WT_CURSOR_INDEX *cindex; WT_CURSOR_JOIN *cjoin; WT_CURSOR_TABLE *ctable; - WT_INDEX *index; + WT_INDEX *idx; WT_TABLE *table; uint32_t flags, range; uint64_t count; @@ -669,11 +669,11 @@ __session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor, if (WT_PREFIX_MATCH(ref_cursor->uri, "index:")) { cindex = (WT_CURSOR_INDEX *)ref_cursor; - index = cindex->index; + idx = cindex->index; table = cindex->table; WT_CURSOR_CHECKKEY(ref_cursor); } else if (WT_PREFIX_MATCH(ref_cursor->uri, "table:")) { - index = NULL; + idx = NULL; ctable = (WT_CURSOR_TABLE *)ref_cursor; table = ctable->table; WT_CURSOR_CHECKKEY(ctable->cg_cursors[0]); @@ -736,7 +736,7 @@ __session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor, WT_ERR(EINVAL); } } - WT_ERR(__wt_curjoin_join(session, cjoin, index, ref_cursor, flags, + WT_ERR(__wt_curjoin_join(session, cjoin, idx, ref_cursor, flags, range, count, bloom_bit_count, bloom_hash_count)); /* * There's an implied ownership ordering that isn't -- cgit v1.2.1 From 04625b8e4c0f7db524a962106aff5f0df0a0dc18 Mon Sep 17 00:00:00 2001 From: Michael Cahill Date: Wed, 18 Nov 2015 14:14:25 +1100 Subject: Make test work with Python 2.6. --- test/suite/test_join02.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/suite/test_join02.py b/test/suite/test_join02.py index 16e1fff56df..bf376575103 100644 --- a/test/suite/test_join02.py +++ b/test/suite/test_join02.py @@ -57,9 +57,9 @@ class test_join02(wttest.WiredTigerTestCase): def gen_key(self, i): if self.keyformat == 'S': - return [ 'key{:06}'.format(i) ] # zero pad so it sorts expectedly + return [ 'key%06d' % i ] # zero pad so it sorts expectedly elif self.keyformat == 'iS': - return [ i, 'key{:06}'.format(i) ] + return [ i, 'key%06d' % i ] else: return [ i ] -- cgit v1.2.1 From c55c750d7e562e654d02773a16aef40fedc87068 Mon Sep 17 00:00:00 2001 From: Michael Cahill Date: Wed, 18 Nov 2015 21:50:01 +1100 Subject: WT-2224 Track which deleted refs are discarded by a split. --- src/btree/bt_split.c | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/btree/bt_split.c b/src/btree/bt_split.c index eaeac683f9a..9591a40fea2 100644 --- a/src/btree/bt_split.c +++ b/src/btree/bt_split.c @@ -693,6 +693,7 @@ static int __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new, uint32_t new_entries, size_t parent_incr, bool exclusive, bool discard) { + WT_DECL_ITEM(scr); WT_DECL_RET; WT_IKEY *ikey; WT_PAGE *parent; @@ -701,14 +702,15 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new, size_t parent_decr, size; uint64_t split_gen; uint32_t i, j; - uint32_t deleted_entries, parent_entries, result_entries; + uint32_t deleted_entries, orig_slot, parent_entries, result_entries; + uint32_t *deleted_refs; bool complete, empty_parent; parent = ref->home; alloc_index = pindex = NULL; parent_decr = 0; - parent_entries = 0; + orig_slot = parent_entries = 0; complete = empty_parent = false; /* The parent page will be marked dirty, make sure that will succeed. */ @@ -727,14 +729,21 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new, * array anyway. Switch them to the special split state, so that any * reading thread will restart. */ + WT_RET(__wt_scr_alloc(session, 10 * sizeof(uint32_t), &scr)); for (deleted_entries = 0, i = 0; i < parent_entries; ++i) { next_ref = pindex->index[i]; WT_ASSERT(session, next_ref->state != WT_REF_SPLIT); + if (next_ref == ref) + orig_slot = i; if (next_ref->state == WT_REF_DELETED && __wt_delete_page_skip(session, next_ref, true) && __wt_atomic_casv32( - &next_ref->state, WT_REF_DELETED, WT_REF_SPLIT)) - deleted_entries++; + &next_ref->state, WT_REF_DELETED, WT_REF_SPLIT)) { + WT_ERR(__wt_buf_grow(session, scr, + (deleted_entries + 1) * sizeof(uint32_t))); + deleted_refs = scr->mem; + deleted_refs[deleted_entries++] = i; + } } /* @@ -803,8 +812,12 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new, * parent page's index. */ if (discard) { - ++deleted_entries; + WT_ASSERT(session, pindex->index[orig_slot] == ref); WT_PUBLISH(ref->state, WT_REF_SPLIT); + WT_ERR(__wt_buf_grow(session, scr, + (deleted_entries + 1) * sizeof(uint32_t))); + deleted_refs = scr->mem; + deleted_refs[deleted_entries++] = orig_slot; } /* @@ -842,11 +855,9 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new, * Acquire a new split generation. */ split_gen = __wt_atomic_addv64(&S2C(session)->split_gen, 1); - for (i = 0; deleted_entries > 0 && i < parent_entries; ++i) { - next_ref = pindex->index[i]; - if (next_ref->state != WT_REF_SPLIT) - continue; - --deleted_entries; + for (i = 0, deleted_refs = scr->mem; i < deleted_entries; ++i) { + next_ref = pindex->index[deleted_refs[i]]; + WT_ASSERT(session, next_ref->state == WT_REF_SPLIT); /* * We set the WT_REF to split, discard it, freeing any resources @@ -929,6 +940,8 @@ err: /* return (EBUSY); } + __wt_scr_free(session, &scr); + if (ret != 0 && ret != WT_PANIC) __wt_err(session, ret, "ignoring not-fatal error during parent page split"); -- cgit v1.2.1 From afe7dd62998146362b92428323339724cd431e65 Mon Sep 17 00:00:00 2001 From: Michael Cahill Date: Wed, 18 Nov 2015 22:12:51 +1100 Subject: WT-2224 Cleanup and fix. --- src/btree/bt_split.c | 39 ++++++++++++++++----------------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/src/btree/bt_split.c b/src/btree/bt_split.c index 9591a40fea2..caba12b78f1 100644 --- a/src/btree/bt_split.c +++ b/src/btree/bt_split.c @@ -702,7 +702,7 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new, size_t parent_decr, size; uint64_t split_gen; uint32_t i, j; - uint32_t deleted_entries, orig_slot, parent_entries, result_entries; + uint32_t deleted_entries, parent_entries, result_entries; uint32_t *deleted_refs; bool complete, empty_parent; @@ -710,7 +710,7 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new, alloc_index = pindex = NULL; parent_decr = 0; - orig_slot = parent_entries = 0; + parent_entries = 0; complete = empty_parent = false; /* The parent page will be marked dirty, make sure that will succeed. */ @@ -733,12 +733,11 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new, for (deleted_entries = 0, i = 0; i < parent_entries; ++i) { next_ref = pindex->index[i]; WT_ASSERT(session, next_ref->state != WT_REF_SPLIT); - if (next_ref == ref) - orig_slot = i; - if (next_ref->state == WT_REF_DELETED && + if ((discard && next_ref == ref) || + (next_ref->state == WT_REF_DELETED && __wt_delete_page_skip(session, next_ref, true) && __wt_atomic_casv32( - &next_ref->state, WT_REF_DELETED, WT_REF_SPLIT)) { + &next_ref->state, WT_REF_DELETED, WT_REF_SPLIT))) { WT_ERR(__wt_buf_grow(session, scr, (deleted_entries + 1) * sizeof(uint32_t))); deleted_refs = scr->mem; @@ -751,7 +750,9 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new, * pages, less any WT_REFs we're removing (deleted entries plus the * entry we're replacing). */ - result_entries = (parent_entries + new_entries) - (deleted_entries + 1); + result_entries = (parent_entries + new_entries) - deleted_entries; + if (!discard) + --result_entries; /* * If there are no remaining entries on the parent, give up, we can't @@ -804,21 +805,14 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new, #endif /* - * If discarding the page's original WT_REF field, reset it to split and - * increment the number of entries being discarded. Threads cursoring - * through the tree were blocked because that WT_REF state was set to - * locked. Changing the locked state to split unblocks those threads and - * causes them to re-calculate their position based on the just-updated - * parent page's index. + * If discarding the page's original WT_REF field, reset it to split. + * Threads cursoring through the tree were blocked because that WT_REF + * state was set to locked. Changing the locked state to split unblocks + * those threads and causes them to re-calculate their position based + * on the just-updated parent page's index. */ - if (discard) { - WT_ASSERT(session, pindex->index[orig_slot] == ref); + if (discard) WT_PUBLISH(ref->state, WT_REF_SPLIT); - WT_ERR(__wt_buf_grow(session, scr, - (deleted_entries + 1) * sizeof(uint32_t))); - deleted_refs = scr->mem; - deleted_refs[deleted_entries++] = orig_slot; - } /* * Push out the changes: not required for correctness, but don't let @@ -917,7 +911,8 @@ __split_parent(WT_SESSION_IMPL *session, WT_REF *ref, WT_REF **ref_new, __wt_cache_page_inmem_decr(session, parent, parent_decr); __wt_page_modify_set(session, parent); -err: /* +err: __wt_scr_free(session, &scr); + /* * A note on error handling: if we completed the split, return success, * nothing really bad can have happened, and our caller has to proceed * with the split. @@ -940,8 +935,6 @@ err: /* return (EBUSY); } - __wt_scr_free(session, &scr); - if (ret != 0 && ret != WT_PANIC) __wt_err(session, ret, "ignoring not-fatal error during parent page split"); -- cgit v1.2.1 From a5d6d8e1b6c687566c8b368fc5048ade7eecfea6 Mon Sep 17 00:00:00 2001 From: Susan LoVerso Date: Wed, 18 Nov 2015 15:17:13 -0500 Subject: WT-2218 Add fast-path delete stat. --- dist/stat_data.py | 2 ++ src/btree/bt_delete.c | 2 ++ src/include/stat.h | 2 ++ src/include/wiredtiger.in | 66 +++++++++++++++++++++++++---------------------- src/support/stat.c | 7 +++++ 5 files changed, 48 insertions(+), 31 deletions(-) diff --git a/dist/stat_data.py b/dist/stat_data.py index 85ecdeb5be5..77e3fee9646 100644 --- a/dist/stat_data.py +++ b/dist/stat_data.py @@ -281,6 +281,7 @@ connection_stats = [ ########################################## RecStat('rec_pages', 'page reconciliation calls'), RecStat('rec_page_delete', 'pages deleted'), + RecStat('rec_page_delete_fast', 'fast-path pages deleted'), RecStat('rec_pages_eviction', 'page reconciliation calls for eviction'), RecStat('rec_split_stashed_bytes', 'split bytes currently awaiting free', 'no_clear,no_scale'), @@ -525,6 +526,7 @@ dsrc_stats = [ RecStat('rec_overflow_key_leaf', 'leaf-page overflow keys'), RecStat('rec_overflow_value', 'overflow values written'), RecStat('rec_page_delete', 'pages deleted'), + RecStat('rec_page_delete_fast', 'fast-path pages deleted'), RecStat('rec_page_match', 'page checksum matches'), RecStat('rec_pages', 'page reconciliation calls'), RecStat('rec_pages_eviction', 'page reconciliation calls for eviction'), diff --git a/src/btree/bt_delete.c b/src/btree/bt_delete.c index 757b7b51cdd..98c6390e0f4 100644 --- a/src/btree/bt_delete.c +++ b/src/btree/bt_delete.c @@ -138,6 +138,8 @@ __wt_delete_page(WT_SESSION_IMPL *session, WT_REF *ref, bool *skipp) WT_ERR(__wt_txn_modify_ref(session, ref)); *skipp = true; + WT_STAT_FAST_CONN_INCR(session, rec_page_delete_fast); + WT_STAT_FAST_DATA_INCR(session, rec_page_delete_fast); WT_PUBLISH(ref->state, WT_REF_DELETED); return (0); diff --git a/src/include/stat.h b/src/include/stat.h index 0ad872d11da..44e2d7edd8a 100644 --- a/src/include/stat.h +++ b/src/include/stat.h @@ -361,6 +361,7 @@ struct __wt_connection_stats { int64_t page_sleep; int64_t read_io; int64_t rec_page_delete; + int64_t rec_page_delete_fast; int64_t rec_pages; int64_t rec_pages_eviction; int64_t rec_split_stashed_bytes; @@ -481,6 +482,7 @@ struct __wt_dsrc_stats { int64_t rec_overflow_key_leaf; int64_t rec_overflow_value; int64_t rec_page_delete; + int64_t rec_page_delete_fast; int64_t rec_page_match; int64_t rec_pages; int64_t rec_pages_eviction; diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in index 1b8c345e32b..a246cec2bab 100644 --- a/src/include/wiredtiger.in +++ b/src/include/wiredtiger.in @@ -3891,52 +3891,54 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection); #define WT_STAT_CONN_READ_IO 1132 /*! reconciliation: pages deleted */ #define WT_STAT_CONN_REC_PAGE_DELETE 1133 +/*! reconciliation: fast-path pages deleted */ +#define WT_STAT_CONN_REC_PAGE_DELETE_FAST 1134 /*! reconciliation: page reconciliation calls */ -#define WT_STAT_CONN_REC_PAGES 1134 +#define WT_STAT_CONN_REC_PAGES 1135 /*! reconciliation: page reconciliation calls for eviction */ -#define WT_STAT_CONN_REC_PAGES_EVICTION 1135 +#define WT_STAT_CONN_REC_PAGES_EVICTION 1136 /*! reconciliation: split bytes currently awaiting free */ -#define WT_STAT_CONN_REC_SPLIT_STASHED_BYTES 1136 +#define WT_STAT_CONN_REC_SPLIT_STASHED_BYTES 1137 /*! reconciliation: split objects currently awaiting free */ -#define WT_STAT_CONN_REC_SPLIT_STASHED_OBJECTS 1137 +#define WT_STAT_CONN_REC_SPLIT_STASHED_OBJECTS 1138 /*! connection: pthread mutex shared lock read-lock calls */ -#define WT_STAT_CONN_RWLOCK_READ 1138 +#define WT_STAT_CONN_RWLOCK_READ 1139 /*! connection: pthread mutex shared lock write-lock calls */ -#define WT_STAT_CONN_RWLOCK_WRITE 1139 +#define WT_STAT_CONN_RWLOCK_WRITE 1140 /*! session: open cursor count */ -#define WT_STAT_CONN_SESSION_CURSOR_OPEN 1140 +#define WT_STAT_CONN_SESSION_CURSOR_OPEN 1141 /*! session: open session count */ -#define WT_STAT_CONN_SESSION_OPEN 1141 +#define WT_STAT_CONN_SESSION_OPEN 1142 /*! transaction: transaction begins */ -#define WT_STAT_CONN_TXN_BEGIN 1142 +#define WT_STAT_CONN_TXN_BEGIN 1143 /*! transaction: transaction checkpoints */ -#define WT_STAT_CONN_TXN_CHECKPOINT 1143 +#define WT_STAT_CONN_TXN_CHECKPOINT 1144 /*! transaction: transaction checkpoint generation */ -#define WT_STAT_CONN_TXN_CHECKPOINT_GENERATION 1144 +#define WT_STAT_CONN_TXN_CHECKPOINT_GENERATION 1145 /*! transaction: transaction checkpoint currently running */ -#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1145 +#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1146 /*! transaction: transaction checkpoint max time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1146 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1147 /*! transaction: transaction checkpoint min time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1147 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1148 /*! transaction: transaction checkpoint most recent time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1148 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1149 /*! transaction: transaction checkpoint total time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1149 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1150 /*! transaction: transactions committed */ -#define WT_STAT_CONN_TXN_COMMIT 1150 +#define WT_STAT_CONN_TXN_COMMIT 1151 /*! transaction: transaction failures due to cache overflow */ -#define WT_STAT_CONN_TXN_FAIL_CACHE 1151 +#define WT_STAT_CONN_TXN_FAIL_CACHE 1152 /*! transaction: transaction range of IDs currently pinned by a checkpoint */ -#define WT_STAT_CONN_TXN_PINNED_CHECKPOINT_RANGE 1152 +#define WT_STAT_CONN_TXN_PINNED_CHECKPOINT_RANGE 1153 /*! transaction: transaction range of IDs currently pinned */ -#define WT_STAT_CONN_TXN_PINNED_RANGE 1153 +#define WT_STAT_CONN_TXN_PINNED_RANGE 1154 /*! transaction: transactions rolled back */ -#define WT_STAT_CONN_TXN_ROLLBACK 1154 +#define WT_STAT_CONN_TXN_ROLLBACK 1155 /*! transaction: transaction sync calls */ -#define WT_STAT_CONN_TXN_SYNC 1155 +#define WT_STAT_CONN_TXN_SYNC 1156 /*! connection: total write I/Os */ -#define WT_STAT_CONN_WRITE_IO 1156 +#define WT_STAT_CONN_WRITE_IO 1157 /*! * @} @@ -4125,23 +4127,25 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection); #define WT_STAT_DSRC_REC_OVERFLOW_VALUE 2088 /*! reconciliation: pages deleted */ #define WT_STAT_DSRC_REC_PAGE_DELETE 2089 +/*! reconciliation: fast-path pages deleted */ +#define WT_STAT_DSRC_REC_PAGE_DELETE_FAST 2090 /*! reconciliation: page checksum matches */ -#define WT_STAT_DSRC_REC_PAGE_MATCH 2090 +#define WT_STAT_DSRC_REC_PAGE_MATCH 2091 /*! reconciliation: page reconciliation calls */ -#define WT_STAT_DSRC_REC_PAGES 2091 +#define WT_STAT_DSRC_REC_PAGES 2092 /*! reconciliation: page reconciliation calls for eviction */ -#define WT_STAT_DSRC_REC_PAGES_EVICTION 2092 +#define WT_STAT_DSRC_REC_PAGES_EVICTION 2093 /*! reconciliation: leaf page key bytes discarded using prefix compression */ -#define WT_STAT_DSRC_REC_PREFIX_COMPRESSION 2093 +#define WT_STAT_DSRC_REC_PREFIX_COMPRESSION 2094 /*! reconciliation: internal page key bytes discarded using suffix * compression */ -#define WT_STAT_DSRC_REC_SUFFIX_COMPRESSION 2094 +#define WT_STAT_DSRC_REC_SUFFIX_COMPRESSION 2095 /*! session: object compaction */ -#define WT_STAT_DSRC_SESSION_COMPACT 2095 +#define WT_STAT_DSRC_SESSION_COMPACT 2096 /*! session: open cursor count */ -#define WT_STAT_DSRC_SESSION_CURSOR_OPEN 2096 +#define WT_STAT_DSRC_SESSION_CURSOR_OPEN 2097 /*! transaction: update conflicts */ -#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2097 +#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2098 /*! @} */ /* * Statistics section: END diff --git a/src/support/stat.c b/src/support/stat.c index 82377d843ee..7ca9186ea8d 100644 --- a/src/support/stat.c +++ b/src/support/stat.c @@ -93,6 +93,7 @@ static const char * const __stats_dsrc_desc[] = { "reconciliation: leaf-page overflow keys", "reconciliation: overflow values written", "reconciliation: pages deleted", + "reconciliation: fast-path pages deleted", "reconciliation: page checksum matches", "reconciliation: page reconciliation calls", "reconciliation: page reconciliation calls for eviction", @@ -212,6 +213,7 @@ __wt_stat_dsrc_clear_single(WT_DSRC_STATS *stats) stats->lsm_merge_throttle = 0; stats->bloom_size = 0; stats->rec_dictionary = 0; + stats->rec_page_delete_fast = 0; stats->rec_suffix_compression = 0; stats->rec_multiblock_internal = 0; stats->rec_overflow_key_internal = 0; @@ -333,6 +335,7 @@ __wt_stat_dsrc_aggregate_single( to->lsm_merge_throttle += from->lsm_merge_throttle; to->bloom_size += from->bloom_size; to->rec_dictionary += from->rec_dictionary; + to->rec_page_delete_fast += from->rec_page_delete_fast; to->rec_suffix_compression += from->rec_suffix_compression; to->rec_multiblock_internal += from->rec_multiblock_internal; to->rec_overflow_key_internal += from->rec_overflow_key_internal; @@ -470,6 +473,7 @@ __wt_stat_dsrc_aggregate( to->lsm_merge_throttle += WT_STAT_READ(from, lsm_merge_throttle); to->bloom_size += WT_STAT_READ(from, bloom_size); to->rec_dictionary += WT_STAT_READ(from, rec_dictionary); + to->rec_page_delete_fast += WT_STAT_READ(from, rec_page_delete_fast); to->rec_suffix_compression += WT_STAT_READ(from, rec_suffix_compression); to->rec_multiblock_internal += @@ -629,6 +633,7 @@ static const char * const __stats_connection_desc[] = { "thread-yield: page acquire time sleeping (usecs)", "connection: total read I/Os", "reconciliation: pages deleted", + "reconciliation: fast-path pages deleted", "reconciliation: page reconciliation calls", "reconciliation: page reconciliation calls for eviction", "reconciliation: split bytes currently awaiting free", @@ -811,6 +816,7 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats) stats->lsm_work_units_done = 0; stats->lsm_work_units_created = 0; stats->lsm_work_queue_max = 0; + stats->rec_page_delete_fast = 0; stats->rec_pages = 0; stats->rec_pages_eviction = 0; stats->rec_page_delete = 0; @@ -1009,6 +1015,7 @@ __wt_stat_connection_aggregate( to->lsm_work_units_created += WT_STAT_READ(from, lsm_work_units_created); to->lsm_work_queue_max += WT_STAT_READ(from, lsm_work_queue_max); + to->rec_page_delete_fast += WT_STAT_READ(from, rec_page_delete_fast); to->rec_pages += WT_STAT_READ(from, rec_pages); to->rec_pages_eviction += WT_STAT_READ(from, rec_pages_eviction); to->rec_page_delete += WT_STAT_READ(from, rec_page_delete); -- cgit v1.2.1 From 30321fc90d18a0f2790ab00023a83e4ca5637807 Mon Sep 17 00:00:00 2001 From: Michael Cahill Date: Thu, 19 Nov 2015 11:15:49 +1100 Subject: WT-1315 Change a few more size_t variables to u_int. --- src/cursor/cur_join.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index af53dc0a271..be121daa61f 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -260,9 +260,10 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, const char *raw_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), "raw", NULL }; const char *mainkey_str, *p; - int cmp, skip; - size_t i, mainkey_len, size; void *allocbuf; + size_t mainkey_len, size; + u_int i; + int cmp, skip; c = NULL; allocbuf = NULL; @@ -689,7 +690,7 @@ __curjoin_next(WT_CURSOR *cursor) WT_DECL_RET; WT_SESSION_IMPL *session; bool skip_left; - size_t count; + u_int i; cjoin = (WT_CURSOR_JOIN *)cursor; @@ -713,9 +714,9 @@ nextkey: * using that in our iteration. */ skip_left = F_ISSET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT); - for (count = 0; count < cjoin->entries_next; count++) { + for (i = 0; i < cjoin->entries_next; i++) { ret = __curjoin_entry_member(session, cjoin, - &cjoin->entries[count], skip_left); + &cjoin->entries[i], skip_left); if (ret == WT_NOTFOUND) goto nextkey; skip_left = false; @@ -762,7 +763,7 @@ __curjoin_close(WT_CURSOR *cursor) WT_CURSOR_JOIN_ENTRY *entry; WT_DECL_RET; WT_SESSION_IMPL *session; - size_t i; + u_int i; cjoin = (WT_CURSOR_JOIN *)cursor; -- cgit v1.2.1 From 6746688f9c41052791e644fe2f97a550c96c3341 Mon Sep 17 00:00:00 2001 From: Michael Cahill Date: Thu, 19 Nov 2015 11:26:48 +1100 Subject: WT-1315 Fix a few script warnings about unused functions / macros. --- dist/s_define.list | 1 + dist/s_funcs.list | 2 ++ src/session/session_api.c | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dist/s_define.list b/dist/s_define.list index a2b86610755..8b0d9a0bdcd 100644 --- a/dist/s_define.list +++ b/dist/s_define.list @@ -4,6 +4,7 @@ API_CALL API_CALL_NOCONF API_SESSION_INIT FLD_MASK +JOINABLE_CURSOR_CALL_CHECK LF_MASK LLONG_MAX LLONG_MIN diff --git a/dist/s_funcs.list b/dist/s_funcs.list index 3b5690a4bc2..ed6cf43bb2f 100644 --- a/dist/s_funcs.list +++ b/dist/s_funcs.list @@ -27,6 +27,8 @@ __wt_log_scan __wt_nlpo2 __wt_nlpo2_round __wt_print_huffman_code +__wt_stat_join_aggregate +__wt_stat_join_clear_all __wt_try_readlock wiredtiger_config_parser_open wiredtiger_config_validate diff --git a/src/session/session_api.c b/src/session/session_api.c index 67e26618cf6..0174fbf4a4e 100644 --- a/src/session/session_api.c +++ b/src/session/session_api.c @@ -703,7 +703,7 @@ __session_join(WT_SESSION *wt_session, WT_CURSOR *join_cursor, else if (WT_STRING_MATCH("lt", cval.str, cval.len)) range = WT_CURJOIN_END_LT; else if (WT_STRING_MATCH("le", cval.str, cval.len)) - range = WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ; + range = WT_CURJOIN_END_LE; else if (WT_STRING_MATCH("eq", cval.str, cval.len)) range = WT_CURJOIN_END_EQ; else if (!WT_STRING_MATCH("ge", cval.str, cval.len)) -- cgit v1.2.1