diff options
author | Alex Gorrod <alexg@wiredtiger.com> | 2015-11-19 00:56:54 +0000 |
---|---|---|
committer | Alex Gorrod <alexg@wiredtiger.com> | 2015-11-19 00:56:54 +0000 |
commit | 76d4b2f41e816cd496d577c6909ef0ac7101bfbf (patch) | |
tree | 416923934062b85a86c56a0a8ff107b9ddb3f370 /src/cursor | |
parent | b78e5b6d3581eed2f28c120992d0f66a14802ebe (diff) | |
parent | 4368d3975e0baa53508269f3fb2d712ecab7a584 (diff) | |
download | mongo-76d4b2f41e816cd496d577c6909ef0ac7101bfbf.tar.gz |
Merge branch 'develop' into wt-2222-snapshot-stats
Diffstat (limited to 'src/cursor')
-rw-r--r-- | src/cursor/cur_dump.c | 2 | ||||
-rw-r--r-- | src/cursor/cur_index.c | 54 | ||||
-rw-r--r-- | src/cursor/cur_join.c | 1049 | ||||
-rw-r--r-- | src/cursor/cur_stat.c | 146 | ||||
-rw-r--r-- | src/cursor/cur_table.c | 55 |
5 files changed, 1220 insertions, 86 deletions
diff --git a/src/cursor/cur_dump.c b/src/cursor/cur_dump.c index 6c11c4b407e..e5799fbad05 100644 --- a/src/cursor/cur_dump.c +++ b/src/cursor/cur_dump.c @@ -329,7 +329,7 @@ __curdump_close(WT_CURSOR *cursor) cdump = (WT_CURSOR_DUMP *)cursor; child = cdump->child; - CURSOR_API_CALL(cursor, session, get_key, NULL); + CURSOR_API_CALL(cursor, session, close, NULL); if (child != NULL) WT_TRET(child->close(child)); /* We shared the child's URI. */ diff --git a/src/cursor/cur_index.c b/src/cursor/cur_index.c index fd2a6cd7480..a909eaece99 100644 --- a/src/cursor/cur_index.c +++ b/src/cursor/cur_index.c @@ -8,6 +8,20 @@ #include "wt_internal.h" + /* + * __wt_curindex_joined -- + * Produce an error that this cursor is being used in a join call. + */ +int +__wt_curindex_joined(WT_CURSOR *cursor) +{ + WT_SESSION_IMPL *session; + + session = (WT_SESSION_IMPL *)cursor->session; + __wt_errx(session, "index cursor is being used in a join"); + return (ENOTSUP); +} + /* * __curindex_get_value -- * WT_CURSOR->get_value implementation for index cursors. @@ -15,32 +29,16 @@ static int __curindex_get_value(WT_CURSOR *cursor, ...) { - WT_CURSOR_INDEX *cindex; WT_DECL_RET; - WT_ITEM *item; WT_SESSION_IMPL *session; va_list ap; - cindex = (WT_CURSOR_INDEX *)cursor; - CURSOR_API_CALL(cursor, session, get_value, NULL); - WT_CURSOR_NEEDVALUE(cursor); - va_start(ap, cursor); - if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) { - ret = __wt_schema_project_merge(session, - cindex->cg_cursors, cindex->value_plan, - cursor->value_format, &cursor->value); - if (ret == 0) { - item = va_arg(ap, WT_ITEM *); - item->data = cursor->value.data; - item->size = cursor->value.size; - } - } else - ret = __wt_schema_project_out(session, - cindex->cg_cursors, cindex->value_plan, ap); - va_end(ap); + JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL); + WT_ERR(__wt_curindex_get_valuev(cursor, ap)); -err: API_END_RET(session, ret); +err: va_end(ap); + API_END_RET(session, ret); } /* @@ -53,7 +51,7 @@ __curindex_set_value(WT_CURSOR *cursor, ...) WT_DECL_RET; WT_SESSION_IMPL *session; - CURSOR_API_CALL(cursor, session, set_value, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, set_value, NULL); ret = ENOTSUP; err: cursor->saved_err = ret; F_CLR(cursor, WT_CURSTD_VALUE_SET); @@ -72,7 +70,7 @@ __curindex_compare(WT_CURSOR *a, WT_CURSOR *b, int *cmpp) WT_SESSION_IMPL *session; cindex = (WT_CURSOR_INDEX *)a; - CURSOR_API_CALL(a, session, compare, NULL); + JOINABLE_CURSOR_API_CALL(a, session, compare, NULL); /* Check both cursors are "index:" type. */ if (!WT_PREFIX_MATCH(a->uri, "index:") || @@ -150,7 +148,7 @@ __curindex_next(WT_CURSOR *cursor) WT_SESSION_IMPL *session; cindex = (WT_CURSOR_INDEX *)cursor; - CURSOR_API_CALL(cursor, session, next, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL); F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); if ((ret = cindex->child->next(cindex->child)) == 0) @@ -171,7 +169,7 @@ __curindex_prev(WT_CURSOR *cursor) WT_SESSION_IMPL *session; cindex = (WT_CURSOR_INDEX *)cursor; - CURSOR_API_CALL(cursor, session, prev, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, prev, NULL); F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); if ((ret = cindex->child->prev(cindex->child)) == 0) @@ -194,7 +192,7 @@ __curindex_reset(WT_CURSOR *cursor) u_int i; cindex = (WT_CURSOR_INDEX *)cursor; - CURSOR_API_CALL(cursor, session, reset, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL); F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); WT_TRET(cindex->child->reset(cindex->child)); @@ -225,7 +223,7 @@ __curindex_search(WT_CURSOR *cursor) cindex = (WT_CURSOR_INDEX *)cursor; child = cindex->child; - CURSOR_API_CALL(cursor, session, search, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, search, NULL); /* * We are searching using the application-specified key, which @@ -284,7 +282,7 @@ __curindex_search_near(WT_CURSOR *cursor, int *exact) WT_SESSION_IMPL *session; cindex = (WT_CURSOR_INDEX *)cursor; - CURSOR_API_CALL(cursor, session, search_near, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, search_near, NULL); __wt_cursor_set_raw_key(cindex->child, &cursor->key); if ((ret = cindex->child->search_near(cindex->child, exact)) == 0) ret = __curindex_move(cindex); @@ -311,7 +309,7 @@ __curindex_close(WT_CURSOR *cursor) cindex = (WT_CURSOR_INDEX *)cursor; idx = cindex->index; - CURSOR_API_CALL(cursor, session, close, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL); if ((cp = cindex->cg_cursors) != NULL) for (i = 0, cp = cindex->cg_cursors; diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c new file mode 100644 index 00000000000..be121daa61f --- /dev/null +++ b/src/cursor/cur_join.c @@ -0,0 +1,1049 @@ +/*- + * Copyright (c) 2014-2015 MongoDB, Inc. + * Copyright (c) 2008-2014 WiredTiger, Inc. + * All rights reserved. + * + * See the file LICENSE for redistribution information. + */ + +#include "wt_internal.h" + +/* + * __curjoin_entry_iter_init -- + * Initialize an iteration for the index managed by a join entry. + * + */ +static int +__curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ITER **iterp) +{ + WT_CURSOR *newcur; + WT_CURSOR *to_dup; + WT_DECL_RET; + const char *raw_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), "raw", NULL }; + const char *def_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), NULL }; + const char *uri, **config; + char *uribuf; + WT_CURSOR_JOIN_ITER *iter; + size_t size; + + iter = NULL; + uribuf = NULL; + to_dup = entry->ends[0].cursor; + + uri = to_dup->uri; + if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW)) + config = &raw_cfg[0]; + else + config = &def_cfg[0]; + + if (cjoin->projection != NULL) { + size = strlen(uri) + strlen(cjoin->projection) + 1; + WT_ERR(__wt_calloc(session, size, 1, &uribuf)); + snprintf(uribuf, size, "%s%s", uri, cjoin->projection); + uri = uribuf; + } + WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, config, + &newcur)); + WT_ERR(__wt_cursor_dup_position(to_dup, newcur)); + WT_ERR(__wt_calloc_one(session, &iter)); + iter->cjoin = cjoin; + iter->session = session; + iter->entry = entry; + iter->cursor = newcur; + iter->advance = false; + *iterp = iter; + + if (0) { +err: __wt_free(session, iter); + } + __wt_free(session, uribuf); + return (ret); +} + +/* + * __curjoin_pack_recno -- + * Pack the given recno into a buffer; prepare an item referencing it. + * + */ +static int +__curjoin_pack_recno(WT_SESSION_IMPL *session, uint64_t r, uint8_t *buf, + size_t bufsize, WT_ITEM *item) +{ + WT_DECL_RET; + WT_SESSION *wtsession; + size_t sz; + + wtsession = (WT_SESSION *)session; + WT_ERR(wiredtiger_struct_size(wtsession, &sz, "r", r)); + WT_ASSERT(session, sz < bufsize); + WT_ERR(wiredtiger_struct_pack(wtsession, buf, bufsize, "r", r)); + item->size = sz; + item->data = buf; + +err: return (ret); +} + +/* + * __curjoin_entry_iter_next -- + * Get the next item in an iteration. + * + */ +static int +__curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey, + uint64_t *rp) +{ + WT_CURSOR *firstcg_cur; + WT_CURSOR_JOIN *cjoin; + WT_DECL_RET; + WT_SESSION_IMPL *session; + uint64_t r; + + if (iter->advance) + WT_ERR(iter->cursor->next(iter->cursor)); + else + iter->advance = true; + + session = iter->session; + cjoin = iter->cjoin; + + /* + * Set our key to the primary key, we'll also need this + * to check membership. + */ + if (iter->entry->index != NULL) + firstcg_cur = ((WT_CURSOR_INDEX *)iter->cursor)->cg_cursors[0]; + else + firstcg_cur = ((WT_CURSOR_TABLE *)iter->cursor)->cg_cursors[0]; + if (WT_CURSOR_RECNO(&cjoin->iface)) { + r = *(uint64_t *)firstcg_cur->key.data; + WT_ERR(__curjoin_pack_recno(session, r, cjoin->recno_buf, + sizeof(cjoin->recno_buf), primkey)); + *rp = r; + } else { + WT_ITEM_SET(*primkey, firstcg_cur->key); + *rp = 0; + } + iter->curkey = primkey; + iter->entry->stats.actual_count++; + iter->entry->stats.accesses++; + +err: return (ret); +} + +/* + * __curjoin_entry_iter_reset -- + * Reset an iteration to the starting point. + * + */ +static int +__curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter) +{ + WT_DECL_RET; + + if (iter->advance) { + WT_ERR(iter->cursor->reset(iter->cursor)); + WT_ERR(__wt_cursor_dup_position( + iter->cjoin->entries[0].ends[0].cursor, iter->cursor)); + iter->advance = false; + iter->entry->stats.actual_count = 0; + } + +err: return (ret); +} + +/* + * __curjoin_entry_iter_ready -- + * The iterator is positioned. + * + */ +static bool +__curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter) +{ + return (iter->advance); +} + +/* + * __curjoin_entry_iter_close -- + * Close the iteration, release resources. + * + */ +static int +__curjoin_entry_iter_close(WT_CURSOR_JOIN_ITER *iter) +{ + if (iter->cursor != NULL) + return (iter->cursor->close(iter->cursor)); + else + return (0); + __wt_free(iter->session, iter); +} + +/* + * __curjoin_get_key -- + * WT_CURSOR->get_key for join cursors. + */ +static int +__curjoin_get_key(WT_CURSOR *cursor, ...) +{ + WT_CURSOR_JOIN *cjoin; + WT_DECL_RET; + WT_SESSION_IMPL *session; + va_list ap; + + cjoin = (WT_CURSOR_JOIN *)cursor; + + va_start(ap, cursor); + CURSOR_API_CALL(cursor, session, get_key, NULL); + + if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) || + !__curjoin_entry_iter_ready(cjoin->iter)) { + __wt_errx(session, "join cursor must be advanced with next()"); + WT_ERR(EINVAL); + } + WT_ERR(__wt_cursor_get_keyv(cursor, cursor->flags, ap)); + +err: va_end(ap); + API_END_RET(session, ret); +} + +/* + * __curjoin_get_value -- + * WT_CURSOR->get_value for join cursors. + */ +static int +__curjoin_get_value(WT_CURSOR *cursor, ...) +{ + WT_CURSOR_JOIN *cjoin; + WT_CURSOR_JOIN_ITER *iter; + WT_DECL_RET; + WT_SESSION_IMPL *session; + va_list ap; + + cjoin = (WT_CURSOR_JOIN *)cursor; + iter = cjoin->iter; + + va_start(ap, cursor); + CURSOR_API_CALL(cursor, session, get_value, NULL); + + if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED) || + !__curjoin_entry_iter_ready(iter)) { + __wt_errx(session, "join cursor must be advanced with next()"); + WT_ERR(EINVAL); + } + if (iter->entry->index != NULL) + WT_ERR(__wt_curindex_get_valuev(iter->cursor, ap)); + else + WT_ERR(__wt_curtable_get_valuev(iter->cursor, ap)); + +err: va_end(ap); + API_END_RET(session, ret); +} + +/* + * __curjoin_init_bloom -- + * Populate Bloom filters + */ +static int +__curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_CURSOR_JOIN_ENTRY *entry, WT_BLOOM *bloom) +{ + WT_COLLATOR *collator; + WT_CURSOR *c; + WT_CURSOR_INDEX *cindex; + WT_CURSOR_JOIN_ENDPOINT *end, *endmax; + WT_DECL_RET; + WT_ITEM curkey, curvalue, *k; + WT_TABLE *maintable; + char *uri; + const char *raw_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), "raw", NULL }; + const char *mainkey_str, *p; + void *allocbuf; + size_t mainkey_len, size; + u_int i; + int cmp, skip; + + c = NULL; + allocbuf = NULL; + skip = 0; + + if (entry->index != NULL) { + /* + * Open a cursor having a projection of the keys of the + * index we're comparing against. Open it raw, we're + * going to compare it to the raw keys of the + * reference cursors. + */ + maintable = ((WT_CURSOR_TABLE *)entry->main)->table; + mainkey_str = maintable->colconf.str + 1; + for (p = mainkey_str, i = 0; + p != NULL && i < maintable->nkey_columns; i++) + p = strchr(p + 1, ','); + WT_ASSERT(session, p != 0); + mainkey_len = WT_PTRDIFF(p, mainkey_str); + size = strlen(entry->index->name) + mainkey_len + 3; + WT_ERR(__wt_calloc(session, size, 1, &uri)); + snprintf(uri, size, "%s(%.*s)", entry->index->name, + (int)mainkey_len, mainkey_str); + } else { + /* + * For joins on the main table, we just need the primary + * key for comparison, we don't need any values. + */ + size = strlen(cjoin->table->name) + 3; + WT_ERR(__wt_calloc(session, size, 1, &uri)); + snprintf(uri, size, "%s()", cjoin->table->name); + } + WT_ERR(__wt_open_cursor(session, uri, (WT_CURSOR *)cjoin, raw_cfg, &c)); + + /* Initially position the cursor if necessary. */ + endmax = &entry->ends[entry->ends_next]; + if ((end = &entry->ends[0]) < endmax && + F_ISSET(end, WT_CURJOIN_END_GE)) { + WT_ERR(__wt_cursor_dup_position(end->cursor, c)); + if (end->flags == WT_CURJOIN_END_GE) + skip = 1; + } + collator = (entry->index == NULL) ? NULL : entry->index->collator; + while (ret == 0) { + c->get_key(c, &curkey); + if (entry->index != NULL) { + cindex = (WT_CURSOR_INDEX *)c; + if (cindex->index->extractor == NULL) { + /* + * Repack so it's comparable to the + * reference endpoints. + */ + k = &cindex->child->key; + WT_ERR(__wt_struct_repack(session, + cindex->child->key_format, + entry->main->value_format, k, &curkey, + &allocbuf)); + } else + curkey = cindex->child->key; + } + for (end = &entry->ends[skip]; end < endmax; end++) { + WT_ERR(__wt_compare(session, collator, &curkey, + &end->key, &cmp)); + if (!F_ISSET(end, WT_CURJOIN_END_LT)) { + if (cmp < 0 || (cmp == 0 && + !F_ISSET(end, WT_CURJOIN_END_EQ))) + goto advance; + if (cmp > 0) { + if (F_ISSET(end, WT_CURJOIN_END_GT)) + skip = 1; + else + goto done; + } + } else { + if (cmp > 0 || (cmp == 0 && + !F_ISSET(end, WT_CURJOIN_END_EQ))) + goto done; + } + } + if (entry->index != NULL) + c->get_value(c, &curvalue); + else + c->get_key(c, &curvalue); + WT_ERR(__wt_bloom_insert(bloom, &curvalue)); + entry->stats.actual_count++; +advance: + if ((ret = c->next(c)) == WT_NOTFOUND) + break; + } +done: + WT_ERR_NOTFOUND_OK(ret); + +err: if (c != NULL) + WT_TRET(c->close(c)); + __wt_free(session, allocbuf); + return (ret); +} + +/* + * __curjoin_endpoint_init_key -- + * Set the key in the reference endpoint. + */ +static int +__curjoin_endpoint_init_key(WT_SESSION_IMPL *session, + WT_CURSOR_JOIN_ENTRY *entry, WT_CURSOR_JOIN_ENDPOINT *endpoint) +{ + WT_CURSOR *cursor; + WT_CURSOR_INDEX *cindex; + WT_DECL_RET; + WT_ITEM *k; + uint64_t r; + void *allocbuf; + + allocbuf = NULL; + if ((cursor = endpoint->cursor) != NULL) { + if (entry->index != NULL) { + cindex = (WT_CURSOR_INDEX *)endpoint->cursor; + if (cindex->index->extractor == NULL) { + WT_ERR(__wt_struct_repack(session, + cindex->child->key_format, + entry->main->value_format, + &cindex->child->key, &endpoint->key, + &allocbuf)); + if (allocbuf != NULL) + F_SET(endpoint, WT_CURJOIN_END_OWN_KEY); + } else + endpoint->key = cindex->child->key; + } else { + k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key; + if (WT_CURSOR_RECNO(cursor)) { + r = *(uint64_t *)k->data; + WT_ERR(__curjoin_pack_recno(session, r, + endpoint->recno_buf, + sizeof(endpoint->recno_buf), + &endpoint->key)); + } + else + endpoint->key = *k; + } + } + if (0) { +err: __wt_free(session, allocbuf); + } + return (ret); +} + +/* + * __curjoin_init_iter -- + * Initialize before any iteration. + */ +static int +__curjoin_init_iter(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin) +{ + WT_BLOOM *bloom; + WT_DECL_RET; + WT_CURSOR_JOIN_ENTRY *je, *jeend, *je2; + WT_CURSOR_JOIN_ENDPOINT *end; + uint64_t k, m; + + if (cjoin->entries_next == 0) { + __wt_errx(session, "join cursor has not yet been joined " + "with any other cursors"); + WT_ERR(EINVAL); + } + + je = &cjoin->entries[0]; + WT_ERR(__curjoin_entry_iter_init(session, cjoin, je, &cjoin->iter)); + + jeend = &cjoin->entries[cjoin->entries_next]; + for (je = cjoin->entries; je < jeend; je++) { + __wt_stat_join_init_single(&je->stats); + for (end = &je->ends[0]; end < &je->ends[je->ends_next]; + end++) + WT_ERR(__curjoin_endpoint_init_key(session, je, end)); + + /* + * The first entry is iterated as the 'outermost' cursor. + * For the common GE case, we don't have to test against + * the left reference key, we know it will be true since + * the btree is ordered. + */ + if (je == cjoin->entries && je->ends[0].flags == + (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ)) + F_SET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT); + + if (F_ISSET(je, WT_CURJOIN_ENTRY_BLOOM)) { + if (je->bloom == NULL) { + /* + * Look for compatible filters to be shared, + * pick compatible numbers for bit counts + * and number of hashes. + */ + m = je->bloom_bit_count; + k = je->bloom_hash_count; + for (je2 = je + 1; je2 < jeend; je2++) + if (F_ISSET(je2, + WT_CURJOIN_ENTRY_BLOOM) && + je2->count == je->count) { + m = WT_MAX( + je2->bloom_bit_count, m); + k = WT_MAX( + je2->bloom_hash_count, k); + } + je->bloom_bit_count = m; + je->bloom_hash_count = k; + WT_ERR(__wt_bloom_create(session, NULL, + NULL, je->count, m, k, &je->bloom)); + F_SET(je, WT_CURJOIN_ENTRY_OWN_BLOOM); + WT_ERR(__curjoin_init_bloom(session, cjoin, + je, je->bloom)); + /* + * Share the Bloom filter, making all + * config info consistent. + */ + for (je2 = je + 1; je2 < jeend; je2++) + if (F_ISSET(je2, + WT_CURJOIN_ENTRY_BLOOM) && + je2->count == je->count) { + WT_ASSERT(session, + je2->bloom == NULL); + je2->bloom = je->bloom; + je2->bloom_bit_count = m; + je2->bloom_hash_count = k; + } + } else { + /* + * Create a temporary filter that we'll + * merge into the shared one. The Bloom + * parameters of the two filters must match. + */ + WT_ERR(__wt_bloom_create(session, NULL, + NULL, je->count, je->bloom_bit_count, + je->bloom_hash_count, &bloom)); + WT_ERR(__curjoin_init_bloom(session, cjoin, + je, bloom)); + WT_ERR(__wt_bloom_intersection(je->bloom, + bloom)); + WT_ERR(__wt_bloom_close(bloom)); + } + } + } + F_SET(cjoin, WT_CURJOIN_INITIALIZED); + +err: + return (ret); +} + +/* + * __curjoin_entry_in_range -- + * Check if a key is in the range specified by the entry, returning + * WT_NOTFOUND if not. + */ +static int +__curjoin_entry_in_range(WT_SESSION_IMPL *session, WT_CURSOR_JOIN_ENTRY *entry, + WT_ITEM *curkey, bool skip_left) +{ + WT_COLLATOR *collator; + WT_CURSOR_JOIN_ENDPOINT *end, *endmax; + WT_DECL_RET; + int cmp; + + collator = (entry->index != NULL) ? entry->index->collator : NULL; + endmax = &entry->ends[entry->ends_next]; + for (end = &entry->ends[skip_left ? 1 : 0]; end < endmax; end++) { + WT_ERR(__wt_compare(session, collator, curkey, &end->key, + &cmp)); + if (!F_ISSET(end, WT_CURJOIN_END_LT)) { + if (cmp < 0 || + (cmp == 0 && + !F_ISSET(end, WT_CURJOIN_END_EQ)) || + (cmp > 0 && !F_ISSET(end, WT_CURJOIN_END_GT))) + WT_ERR(WT_NOTFOUND); + } else { + if (cmp > 0 || + (cmp == 0 && + !F_ISSET(end, WT_CURJOIN_END_EQ)) || + (cmp < 0 && !F_ISSET(end, WT_CURJOIN_END_LT))) + WT_ERR(WT_NOTFOUND); + } + } +err: return (ret); +} + +typedef struct { + WT_CURSOR iface; + WT_CURSOR_JOIN_ENTRY *entry; + int ismember; +} WT_CURJOIN_EXTRACTOR; + +/* + * __curjoin_extract_insert -- + * Handle a key produced by a custom extractor. + */ +static int +__curjoin_extract_insert(WT_CURSOR *cursor) { + WT_CURJOIN_EXTRACTOR *cextract; + WT_DECL_RET; + WT_ITEM ikey; + WT_SESSION_IMPL *session; + + cextract = (WT_CURJOIN_EXTRACTOR *)cursor; + /* + * This insert method may be called multiple times during a single + * extraction. If we already have a definitive answer to the + * membership question, exit early. + */ + if (cextract->ismember) + return (0); + + session = (WT_SESSION_IMPL *)cursor->session; + + WT_ITEM_SET(ikey, cursor->key); + /* + * We appended a padding byte to the key to avoid rewriting the last + * column. Strip that away here. + */ + WT_ASSERT(session, ikey.size > 0); + --ikey.size; + + ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false); + if (ret == WT_NOTFOUND) + ret = 0; + else + cextract->ismember = 1; + + return (ret); +} + +/* + * __curjoin_entry_member -- + * Do a membership check for a particular index that was joined, + * if not a member, returns WT_NOTFOUND. + */ +static int +__curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_CURSOR_JOIN_ENTRY *entry, bool skip_left) +{ + WT_CURJOIN_EXTRACTOR extract_cursor; + WT_CURSOR *c; + WT_CURSOR_STATIC_INIT(iface, + __wt_cursor_get_key, /* get-key */ + __wt_cursor_get_value, /* get-value */ + __wt_cursor_set_key, /* set-key */ + __wt_cursor_set_value, /* set-value */ + __wt_cursor_notsup, /* compare */ + __wt_cursor_notsup, /* equals */ + __wt_cursor_notsup, /* next */ + __wt_cursor_notsup, /* prev */ + __wt_cursor_notsup, /* reset */ + __wt_cursor_notsup, /* search */ + __wt_cursor_notsup, /* search-near */ + __curjoin_extract_insert, /* insert */ + __wt_cursor_notsup, /* update */ + __wt_cursor_notsup, /* reconfigure */ + __wt_cursor_notsup, /* remove */ + __wt_cursor_notsup); /* close */ + WT_DECL_RET; + WT_INDEX *idx; + WT_ITEM *key, v; + bool bloom_found; + + key = cjoin->iter->curkey; + entry->stats.accesses++; + bloom_found = false; + + if (entry->bloom != NULL) { + /* + * If we don't own the Bloom filter, we must be sharing one + * in a previous entry. So the shared filter has already + * been checked and passed. + */ + if (!F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) + return (0); + + /* + * If the item is not in the Bloom filter, we return + * immediately, otherwise, we still need to check the + * long way. + */ + WT_ERR(__wt_bloom_inmem_get(entry->bloom, key)); + bloom_found = true; + } + if (entry->index != NULL) { + c = entry->main; + c->set_key(c, key); + if ((ret = c->search(c)) == 0) + ret = c->get_value(c, &v); + else if (ret == WT_NOTFOUND) + WT_ERR_MSG(session, WT_ERROR, + "main table for join is missing entry."); + c->reset(c); + WT_ERR(ret); + } else + v = *key; + + if ((idx = entry->index) != NULL && idx->extractor != NULL) { + extract_cursor.iface = iface; + extract_cursor.iface.session = &session->iface; + extract_cursor.iface.key_format = idx->exkey_format; + extract_cursor.ismember = 0; + extract_cursor.entry = entry; + WT_ERR(idx->extractor->extract(idx->extractor, + &session->iface, key, &v, &extract_cursor.iface)); + if (!extract_cursor.ismember) + WT_ERR(WT_NOTFOUND); + } else + WT_ERR(__curjoin_entry_in_range(session, entry, &v, skip_left)); + + if (0) { +err: if (ret == WT_NOTFOUND && bloom_found) + entry->stats.bloom_false_positive++; + } + return (ret); +} + +/* + * __curjoin_next -- + * WT_CURSOR::next for join cursors. + */ +static int +__curjoin_next(WT_CURSOR *cursor) +{ + WT_CURSOR_JOIN *cjoin; + WT_DECL_RET; + WT_SESSION_IMPL *session; + bool skip_left; + u_int i; + + cjoin = (WT_CURSOR_JOIN *)cursor; + + CURSOR_API_CALL(cursor, session, next, NULL); + + if (F_ISSET(cjoin, WT_CURJOIN_ERROR)) { + __wt_errx(session, "join cursor encountered previous error"); + WT_ERR(WT_ERROR); + } + if (!F_ISSET(cjoin, WT_CURJOIN_INITIALIZED)) + WT_ERR(__curjoin_init_iter(session, cjoin)); + +nextkey: + if ((ret = __curjoin_entry_iter_next(cjoin->iter, &cursor->key, + &cursor->recno)) == 0) { + F_SET(cursor, WT_CURSTD_KEY_EXT); + + /* + * We may have already established membership for the + * 'left' case for the first entry, since we're + * using that in our iteration. + */ + skip_left = F_ISSET(cjoin, WT_CURJOIN_SKIP_FIRST_LEFT); + for (i = 0; i < cjoin->entries_next; i++) { + ret = __curjoin_entry_member(session, cjoin, + &cjoin->entries[i], skip_left); + if (ret == WT_NOTFOUND) + goto nextkey; + skip_left = false; + WT_ERR(ret); + } + } + + if (0) { +err: F_SET(cjoin, WT_CURJOIN_ERROR); + } + API_END_RET(session, ret); +} + +/* + * __curjoin_reset -- + * WT_CURSOR::reset for join cursors. + */ +static int +__curjoin_reset(WT_CURSOR *cursor) +{ + WT_CURSOR_JOIN *cjoin; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + cjoin = (WT_CURSOR_JOIN *)cursor; + + CURSOR_API_CALL(cursor, session, reset, NULL); + + if (F_ISSET(cjoin, WT_CURJOIN_INITIALIZED)) + WT_ERR(__curjoin_entry_iter_reset(cjoin->iter)); + +err: API_END_RET(session, ret); +} + +/* + * __curjoin_close -- + * WT_CURSOR::close for join cursors. + */ +static int +__curjoin_close(WT_CURSOR *cursor) +{ + WT_CURSOR_JOIN *cjoin; + WT_CURSOR_JOIN_ENDPOINT *end; + WT_CURSOR_JOIN_ENTRY *entry; + WT_DECL_RET; + WT_SESSION_IMPL *session; + u_int i; + + cjoin = (WT_CURSOR_JOIN *)cursor; + + CURSOR_API_CALL(cursor, session, close, NULL); + + __wt_schema_release_table(session, cjoin->table); + /* These are owned by the table */ + cursor->internal_uri = NULL; + cursor->key_format = NULL; + if (cjoin->projection != NULL) { + __wt_free(session, cjoin->projection); + __wt_free(session, cursor->value_format); + } + + for (entry = cjoin->entries, i = 0; i < cjoin->entries_next; + entry++, i++) { + if (entry->main != NULL) + WT_TRET(entry->main->close(entry->main)); + if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) + WT_TRET(__wt_bloom_close(entry->bloom)); + for (end = &entry->ends[0]; + end < &entry->ends[entry->ends_next]; end++) { + F_CLR(end->cursor, WT_CURSTD_JOINED); + if (F_ISSET(end, WT_CURJOIN_END_OWN_KEY)) + __wt_free(session, end->key.data); + } + } + + if (cjoin->iter != NULL) + WT_TRET(__curjoin_entry_iter_close(cjoin->iter)); + __wt_free(session, cjoin->entries); + WT_TRET(__wt_cursor_close(cursor)); + +err: API_END_RET(session, ret); +} + +/* + * __wt_curjoin_open -- + * Initialize a join cursor. + * + * Join cursors are read-only. + */ +int +__wt_curjoin_open(WT_SESSION_IMPL *session, + const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp) +{ + WT_CURSOR_STATIC_INIT(iface, + __curjoin_get_key, /* get-key */ + __curjoin_get_value, /* get-value */ + __wt_cursor_notsup, /* set-key */ + __wt_cursor_notsup, /* set-value */ + __wt_cursor_notsup, /* compare */ + __wt_cursor_notsup, /* equals */ + __curjoin_next, /* next */ + __wt_cursor_notsup, /* prev */ + __curjoin_reset, /* reset */ + __wt_cursor_notsup, /* search */ + __wt_cursor_notsup, /* search-near */ + __wt_cursor_notsup, /* insert */ + __wt_cursor_notsup, /* update */ + __wt_cursor_notsup, /* remove */ + __wt_cursor_notsup, /* reconfigure */ + __curjoin_close); /* close */ + WT_CURSOR *cursor; + WT_CURSOR_JOIN *cjoin; + WT_DECL_ITEM(tmp); + WT_DECL_RET; + WT_TABLE *table; + size_t size; + const char *tablename, *columns; + + WT_STATIC_ASSERT(offsetof(WT_CURSOR_JOIN, iface) == 0); + + if (!WT_PREFIX_SKIP(uri, "join:")) + return (EINVAL); + tablename = uri; + if (!WT_PREFIX_SKIP(tablename, "table:")) + return (EINVAL); + + columns = strchr(tablename, '('); + if (columns == NULL) + size = strlen(tablename); + else + size = WT_PTRDIFF(columns, tablename); + WT_RET(__wt_schema_get_table(session, tablename, size, 0, &table)); + + WT_RET(__wt_calloc_one(session, &cjoin)); + cursor = &cjoin->iface; + *cursor = iface; + cursor->session = &session->iface; + cursor->internal_uri = table->name; + cursor->key_format = table->key_format; + cursor->value_format = table->value_format; + cjoin->table = table; + + /* Handle projections. */ + WT_ERR(__wt_scr_alloc(session, 0, &tmp)); + if (columns != NULL) { + WT_ERR(__wt_struct_reformat(session, table, + columns, strlen(columns), NULL, 1, tmp)); + WT_ERR(__wt_strndup( + session, tmp->data, tmp->size, &cursor->value_format)); + WT_ERR(__wt_strdup(session, columns, &cjoin->projection)); + } + + if (owner != NULL) + WT_ERR(EINVAL); + + WT_ERR(__wt_cursor_init(cursor, uri, owner, cfg, cursorp)); + + if (0) { +err: WT_TRET(__curjoin_close(cursor)); + *cursorp = NULL; + } + + __wt_scr_free(session, &tmp); + return (ret); +} + +/* + * __wt_curjoin_join -- + * Add a new join to a join cursor. + */ +int +__wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, + WT_INDEX *idx, WT_CURSOR *ref_cursor, uint32_t flags, uint32_t range, + uint64_t count, uint64_t bloom_bit_count, uint64_t bloom_hash_count) +{ + WT_CURSOR_JOIN_ENTRY *entry; + WT_DECL_RET; + WT_CURSOR_JOIN_ENDPOINT *end, *newend; + bool hasins, needbloom, range_eq; + u_int i, ins, nonbloom; + const char *raw_cfg[] = { WT_CONFIG_BASE( + session, WT_SESSION_open_cursor), "raw", NULL }; + char *main_uri; + size_t namesize, newsize; + + entry = NULL; + hasins = needbloom = false; + ins = 0; /* -Wuninitialized */ + main_uri = NULL; + nonbloom = 0; /* -Wuninitialized */ + namesize = strlen(cjoin->table->name); + + for (i = 0; i < cjoin->entries_next; i++) { + if (cjoin->entries[i].index == idx) { + entry = &cjoin->entries[i]; + break; + } + if (!needbloom && i > 0 && + !F_ISSET(&cjoin->entries[i], WT_CURJOIN_ENTRY_BLOOM)) { + needbloom = true; + nonbloom = i; + } + } + if (entry == NULL) { + WT_ERR(__wt_realloc_def(session, &cjoin->entries_allocated, + cjoin->entries_next + 1, &cjoin->entries)); + if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM) && needbloom) { + /* + * Reorder the list so that after the first entry, + * the Bloom filtered entries come next, followed by + * the non-Bloom entries. Once the Bloom filters + * are built, determining membership via Bloom is + * faster than without Bloom, so we can answer + * membership questions more quickly, and with less + * I/O, with the Bloom entries first. + */ + entry = &cjoin->entries[nonbloom]; + memmove(entry + 1, entry, + (cjoin->entries_next - nonbloom) * + sizeof(WT_CURSOR_JOIN_ENTRY)); + memset(entry, 0, sizeof(WT_CURSOR_JOIN_ENTRY)); + } + else + entry = &cjoin->entries[cjoin->entries_next]; + entry->index = idx; + entry->flags = flags; + entry->count = count; + entry->bloom_bit_count = bloom_bit_count; + entry->bloom_hash_count = bloom_hash_count; + ++cjoin->entries_next; + } else { + /* Merge the join into an existing entry for this index */ + if (count != 0 && entry->count != 0 && entry->count != count) { + __wt_errx(session, "count=%" PRIu64 " does not match " + "previous count=%" PRIu64 " for this index", + count, entry->count); + WT_ERR(EINVAL); + } + if (LF_ISSET(WT_CURJOIN_ENTRY_BLOOM) != + F_ISSET(entry, WT_CURJOIN_ENTRY_BLOOM)) { + __wt_errx(session, "join has incompatible strategy " + "values for the same index"); + WT_ERR(EINVAL); + } + /* + * Check against other comparisons (we call them endpoints) + * already set up for this index. + * We allow either: + * - one or more "eq" (with disjunction) + * - exactly one "eq" (with conjunction) + * - exactly one of "gt" or "ge" (conjunction or disjunction) + * - exactly one of "lt" or "le" (conjunction or disjunction) + * - one of "gt"/"ge" along with one of "lt"/"le" + * (currently restricted to conjunction). + * + * Some other combinations, although expressible either do + * not make sense (X == 3 AND X == 5) or are reducible (X < + * 7 AND X < 9). Other specific cases of (X < 7 OR X > 15) + * or (X == 4 OR X > 15) make sense but we don't handle yet. + */ + for (i = 0; i < entry->ends_next; i++) { + end = &entry->ends[i]; + range_eq = (range == WT_CURJOIN_END_EQ); + if ((F_ISSET(end, WT_CURJOIN_END_GT) && + ((range & WT_CURJOIN_END_GT) != 0 || range_eq)) || + (F_ISSET(end, WT_CURJOIN_END_LT) && + ((range & WT_CURJOIN_END_LT) != 0 || range_eq)) || + (end->flags == WT_CURJOIN_END_EQ && + (range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT)) + != 0)) { + __wt_errx(session, + "join has overlapping ranges"); + WT_ERR(EINVAL); + } + if (range == WT_CURJOIN_END_EQ && + end->flags == WT_CURJOIN_END_EQ && + !F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)) { + __wt_errx(session, + "compare=eq can only be combined " + "using operation=or"); + WT_ERR(EINVAL); + } + + /* + * Sort "gt"/"ge" to the front, followed by any number + * of "eq", and finally "lt"/"le". + */ + if (!hasins && + ((range & WT_CURJOIN_END_GT) != 0 || + (range == WT_CURJOIN_END_EQ && + !F_ISSET(end, WT_CURJOIN_END_GT)))) { + ins = i; + hasins = true; + } + } + /* All checks completed, merge any new configuration now */ + entry->count = count; + entry->bloom_bit_count = + WT_MAX(entry->bloom_bit_count, bloom_bit_count); + entry->bloom_hash_count = + WT_MAX(entry->bloom_hash_count, bloom_hash_count); + } + WT_ERR(__wt_realloc_def(session, &entry->ends_allocated, + entry->ends_next + 1, &entry->ends)); + if (!hasins) + ins = entry->ends_next; + newend = &entry->ends[ins]; + memmove(newend + 1, newend, + (entry->ends_next - ins) * sizeof(WT_CURSOR_JOIN_ENDPOINT)); + memset(newend, 0, sizeof(WT_CURSOR_JOIN_ENDPOINT)); + entry->ends_next++; + newend->cursor = ref_cursor; + F_SET(newend, range); + + /* Open the main file with a projection of the indexed columns. */ + if (entry->main == NULL && entry->index != NULL) { + namesize = strlen(cjoin->table->name); + newsize = namesize + entry->index->colconf.len + 1; + WT_ERR(__wt_calloc(session, 1, newsize, &main_uri)); + snprintf(main_uri, newsize, "%s%.*s", + cjoin->table->name, (int)entry->index->colconf.len, + entry->index->colconf.str); + WT_ERR(__wt_open_cursor(session, main_uri, + (WT_CURSOR *)cjoin, raw_cfg, &entry->main)); + } + +err: if (main_uri != NULL) + __wt_free(session, main_uri); + return (ret); +} diff --git a/src/cursor/cur_stat.c b/src/cursor/cur_stat.c index 81d028c165a..65d2dc81406 100644 --- a/src/cursor/cur_stat.c +++ b/src/cursor/cur_stat.c @@ -103,7 +103,7 @@ __curstat_get_value(WT_CURSOR *cursor, ...) va_list ap; size_t size; uint64_t *v; - const char **p; + const char *desc, **p; cst = (WT_CURSOR_STAT *)cursor; va_start(ap, cursor); @@ -111,15 +111,13 @@ __curstat_get_value(WT_CURSOR *cursor, ...) WT_CURSOR_NEEDVALUE(cursor); + WT_ERR(cst->stats_desc(cst, WT_STAT_KEY_OFFSET(cst), &desc)); if (F_ISSET(cursor, WT_CURSTD_RAW)) { WT_ERR(__wt_struct_size(session, &size, cursor->value_format, - cst->stats_desc(WT_STAT_KEY_OFFSET(cst)), - cst->pv.data, cst->v)); + desc, cst->pv.data, cst->v)); WT_ERR(__wt_buf_initsize(session, &cursor->value, size)); WT_ERR(__wt_struct_pack(session, cursor->value.mem, size, - cursor->value_format, - cst->stats_desc(WT_STAT_KEY_OFFSET(cst)), - cst->pv.data, cst->v)); + cursor->value_format, desc, cst->pv.data, cst->v)); item = va_arg(ap, WT_ITEM *); item->data = cursor->value.data; @@ -130,7 +128,7 @@ __curstat_get_value(WT_CURSOR *cursor, ...) * pointer support isn't documented, but it's a cheap test. */ if ((p = va_arg(ap, const char **)) != NULL) - *p = cst->stats_desc(WT_STAT_KEY_OFFSET(cst)); + *p = desc; if ((p = va_arg(ap, const char **)) != NULL) *p = cst->pv.data; if ((v = va_arg(ap, uint64_t *)) != NULL) @@ -201,7 +199,9 @@ __curstat_next(WT_CURSOR *cursor) /* Initialize on demand. */ if (cst->notinitialized) { WT_ERR(__wt_curstat_init( - session, cursor->internal_uri, cst->cfg, cst)); + session, cursor->internal_uri, NULL, cst->cfg, cst)); + if (cst->next_set != NULL) + WT_ERR((*cst->next_set)(session, cst, true, true)); cst->notinitialized = false; } @@ -211,15 +211,19 @@ __curstat_next(WT_CURSOR *cursor) cst->key = WT_STAT_KEY_MIN(cst); } else if (cst->key < WT_STAT_KEY_MAX(cst)) ++cst->key; - else { - F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); + else if (cst->next_set != NULL) + WT_ERR((*cst->next_set)(session, cst, true, false)); + else WT_ERR(WT_NOTFOUND); - } + cst->v = (uint64_t)cst->stats[WT_STAT_KEY_OFFSET(cst)]; WT_ERR(__curstat_print_value(session, cst->v, &cst->pv)); F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT); -err: API_END_RET(session, ret); + if (0) { +err: F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); + } + API_END_RET(session, ret); } /* @@ -239,7 +243,9 @@ __curstat_prev(WT_CURSOR *cursor) /* Initialize on demand. */ if (cst->notinitialized) { WT_ERR(__wt_curstat_init( - session, cursor->internal_uri, cst->cfg, cst)); + session, cursor->internal_uri, NULL, cst->cfg, cst)); + if (cst->next_set != NULL) + WT_ERR((*cst->next_set)(session, cst, false, true)); cst->notinitialized = false; } @@ -249,16 +255,19 @@ __curstat_prev(WT_CURSOR *cursor) cst->key = WT_STAT_KEY_MAX(cst); } else if (cst->key > WT_STAT_KEY_MIN(cst)) --cst->key; - else { - F_CLR(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT); + else if (cst->next_set != NULL) + WT_ERR((*cst->next_set)(session, cst, false, false)); + else WT_ERR(WT_NOTFOUND); - } cst->v = (uint64_t)cst->stats[WT_STAT_KEY_OFFSET(cst)]; WT_ERR(__curstat_print_value(session, cst->v, &cst->pv)); F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT); -err: API_END_RET(session, ret); + if (0) { +err: F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); + } + API_END_RET(session, ret); } /* @@ -301,7 +310,7 @@ __curstat_search(WT_CURSOR *cursor) /* Initialize on demand. */ if (cst->notinitialized) { WT_ERR(__wt_curstat_init( - session, cursor->internal_uri, cst->cfg, cst)); + session, cursor->internal_uri, NULL, cst->cfg, cst)); cst->notinitialized = false; } @@ -332,6 +341,7 @@ __curstat_close(WT_CURSOR *cursor) __curstat_free_config(session, cst); __wt_buf_free(session, &cst->pv); + __wt_free(session, cst->desc_buf); WT_ERR(__wt_cursor_close(cursor)); @@ -426,12 +436,102 @@ __wt_curstat_dsrc_final(WT_CURSOR_STAT *cst) } /* + * __curstat_join_next_set -- + * Advance to another index used in a join to give another set of + * statistics. + */ +static int +__curstat_join_next_set(WT_SESSION_IMPL *session, WT_CURSOR_STAT *cst, + bool forw, bool init) +{ + WT_CURSOR_JOIN *cjoin; + WT_JOIN_STATS_GROUP *join_group; + ssize_t pos; + + WT_ASSERT(session, WT_STREQ(cst->iface.uri, "statistics:join")); + join_group = &cst->u.join_stats_group; + cjoin = join_group->join_cursor; + if (init) + pos = forw ? 0 : cjoin->entries_next - 1; + else + pos = join_group->join_cursor_entry + (forw ? 1 : -1); + if (pos < 0 || (size_t)pos >= cjoin->entries_next) + return (WT_NOTFOUND); + + join_group->join_cursor_entry = pos; + if (cjoin->entries[pos].index == NULL) { + WT_ASSERT(session, WT_PREFIX_MATCH(cjoin->iface.uri, "join:")); + join_group->desc_prefix = cjoin->iface.uri + 5; + } else + join_group->desc_prefix = cjoin->entries[pos].index->name; + join_group->join_stats = cjoin->entries[pos].stats; + if (!init) + cst->key = forw ? WT_STAT_KEY_MIN(cst) : WT_STAT_KEY_MAX(cst); + return (0); +} + +/* + * __curstat_join_desc -- + * Assemble the description field based on current index and statistic. + */ +static int +__curstat_join_desc(WT_CURSOR_STAT *cst, int slot, const char **resultp) +{ + size_t len; + const char *static_desc; + WT_JOIN_STATS_GROUP *sgrp; + WT_SESSION_IMPL *session; + + sgrp = &cst->u.join_stats_group; + session = (WT_SESSION_IMPL *)sgrp->join_cursor->iface.session; + WT_RET(__wt_stat_join_desc(cst, slot, &static_desc)); + len = strlen("join: ") + strlen(sgrp->desc_prefix) + + strlen(static_desc) + 1; + WT_RET(__wt_realloc(session, NULL, len, &cst->desc_buf)); + snprintf(cst->desc_buf, len, "join: %s%s", sgrp->desc_prefix, + static_desc); + *resultp = cst->desc_buf; + return (0); +} + +/* + * __curstat_join_init -- + * Initialize the statistics for a joined cursor. + */ +static int +__curstat_join_init(WT_SESSION_IMPL *session, + WT_CURSOR *curjoin, const char *cfg[], WT_CURSOR_STAT *cst) +{ + WT_CURSOR_JOIN *cjoin; + WT_DECL_RET; + + WT_UNUSED(cfg); + + if (curjoin == NULL && cst->u.join_stats_group.join_cursor != NULL) + curjoin = &cst->u.join_stats_group.join_cursor->iface; + if (curjoin == NULL || !WT_PREFIX_MATCH(curjoin->uri, "join:")) + WT_ERR_MSG(session, EINVAL, + "join cursor must be used with statistics:join"); + cjoin = (WT_CURSOR_JOIN *)curjoin; + memset(&cst->u.join_stats_group, 0, sizeof(WT_JOIN_STATS_GROUP)); + cst->u.join_stats_group.join_cursor = cjoin; + + cst->stats = (int64_t *)&cst->u.join_stats_group.join_stats; + cst->stats_base = WT_JOIN_STATS_BASE; + cst->stats_count = sizeof(WT_JOIN_STATS) / sizeof(int64_t); + cst->stats_desc = __curstat_join_desc; + cst->next_set = __curstat_join_next_set; + +err: return (ret); +} + +/* * __wt_curstat_init -- * Initialize a statistics cursor. */ int __wt_curstat_init(WT_SESSION_IMPL *session, - const char *uri, const char *cfg[], WT_CURSOR_STAT *cst) + const char *uri, WT_CURSOR *curjoin, const char *cfg[], WT_CURSOR_STAT *cst) { const char *dsrc_uri; @@ -442,6 +542,10 @@ __wt_curstat_init(WT_SESSION_IMPL *session, dsrc_uri = uri + strlen("statistics:"); + if (WT_STREQ(dsrc_uri, "join")) + return ( + __curstat_join_init(session, curjoin, cfg, cst)); + if (WT_PREFIX_MATCH(dsrc_uri, "colgroup:")) return ( __wt_curstat_colgroup_init(session, dsrc_uri, cfg, cst)); @@ -467,7 +571,7 @@ __wt_curstat_init(WT_SESSION_IMPL *session, */ int __wt_curstat_open(WT_SESSION_IMPL *session, - const char *uri, const char *cfg[], WT_CURSOR **cursorp) + const char *uri, WT_CURSOR *other, const char *cfg[], WT_CURSOR **cursorp) { WT_CONNECTION_IMPL *conn; WT_CURSOR_STATIC_INIT(iface, @@ -581,7 +685,7 @@ __wt_curstat_open(WT_SESSION_IMPL *session, * objects like tables, we need to a valid set of statistics when before * the open returns. */ - WT_ERR(__wt_curstat_init(session, uri, cst->cfg, cst)); + WT_ERR(__wt_curstat_init(session, uri, other, cst->cfg, cst)); cst->notinitialized = false; /* The cursor isn't yet positioned. */ diff --git a/src/cursor/cur_table.c b/src/cursor/cur_table.c index c5a00649334..dca72a16ee5 100644 --- a/src/cursor/cur_table.c +++ b/src/cursor/cur_table.c @@ -186,34 +186,16 @@ __wt_curtable_get_key(WT_CURSOR *cursor, ...) int __wt_curtable_get_value(WT_CURSOR *cursor, ...) { - WT_CURSOR *primary; - WT_CURSOR_TABLE *ctable; WT_DECL_RET; - WT_ITEM *item; WT_SESSION_IMPL *session; va_list ap; - ctable = (WT_CURSOR_TABLE *)cursor; - primary = *ctable->cg_cursors; - CURSOR_API_CALL(cursor, session, get_value, NULL); - WT_CURSOR_NEEDVALUE(primary); - va_start(ap, cursor); - if (F_ISSET(cursor, WT_CURSOR_RAW_OK)) { - ret = __wt_schema_project_merge(session, - ctable->cg_cursors, ctable->plan, - cursor->value_format, &cursor->value); - if (ret == 0) { - item = va_arg(ap, WT_ITEM *); - item->data = cursor->value.data; - item->size = cursor->value.size; - } - } else - ret = __wt_schema_project_out(session, - ctable->cg_cursors, ctable->plan, ap); - va_end(ap); + JOINABLE_CURSOR_API_CALL(cursor, session, get_value, NULL); + WT_ERR(__wt_curtable_get_valuev(cursor, ap)); -err: API_END_RET(session, ret); +err: va_end(ap); + API_END_RET(session, ret); } /* @@ -264,7 +246,7 @@ __wt_curtable_set_value(WT_CURSOR *cursor, ...) u_int i; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, set_value, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, set_value, NULL); va_start(ap, cursor); if (F_ISSET(cursor, WT_CURSOR_RAW_OK | WT_CURSTD_DUMP_JSON)) { @@ -332,7 +314,7 @@ __curtable_compare(WT_CURSOR *a, WT_CURSOR *b, int *cmpp) WT_DECL_RET; WT_SESSION_IMPL *session; - CURSOR_API_CALL(a, session, compare, NULL); + JOINABLE_CURSOR_API_CALL(a, session, compare, NULL); /* * Confirm both cursors refer to the same source and have keys, then @@ -362,7 +344,7 @@ __curtable_next(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, next, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL); APPLY_CG(ctable, next); err: API_END_RET(session, ret); @@ -383,7 +365,7 @@ __curtable_next_random(WT_CURSOR *cursor) u_int i; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, next, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, next, NULL); cp = ctable->cg_cursors; /* Split out the first next, it retrieves the random record. */ @@ -414,7 +396,7 @@ __curtable_prev(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, prev, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, prev, NULL); APPLY_CG(ctable, prev); err: API_END_RET(session, ret); @@ -432,7 +414,7 @@ __curtable_reset(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, reset, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, reset, NULL); APPLY_CG(ctable, reset); err: API_END_RET(session, ret); @@ -450,7 +432,7 @@ __curtable_search(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, search, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, search, NULL); APPLY_CG(ctable, search); err: API_END_RET(session, ret); @@ -470,7 +452,7 @@ __curtable_search_near(WT_CURSOR *cursor, int *exact) u_int i; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, search_near, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, search_near, NULL); cp = ctable->cg_cursors; primary = *cp; WT_ERR(primary->search_near(primary, exact)); @@ -501,7 +483,7 @@ __curtable_insert(WT_CURSOR *cursor) u_int i; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_UPDATE_API_CALL(cursor, session, insert, NULL); + JOINABLE_CURSOR_UPDATE_API_CALL(cursor, session, insert, NULL); WT_ERR(__curtable_open_indices(ctable)); /* @@ -568,7 +550,7 @@ __curtable_update(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_UPDATE_API_CALL(cursor, session, update, NULL); + JOINABLE_CURSOR_UPDATE_API_CALL(cursor, session, update, NULL); WT_ERR(__curtable_open_indices(ctable)); /* @@ -619,7 +601,7 @@ __curtable_remove(WT_CURSOR *cursor) WT_SESSION_IMPL *session; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_REMOVE_API_CALL(cursor, session, NULL); + JOINABLE_CURSOR_REMOVE_API_CALL(cursor, session, NULL); WT_ERR(__curtable_open_indices(ctable)); /* Find the old record so it can be removed from indices */ @@ -659,6 +641,7 @@ __wt_table_range_truncate(WT_CURSOR_TABLE *start, WT_CURSOR_TABLE *stop) /* Open any indices. */ WT_RET(__curtable_open_indices(ctable)); WT_RET(__wt_scr_alloc(session, 128, &key)); + WT_STAT_FAST_DATA_INCR(session, cursor_truncate); /* * Step through the cursor range, removing the index entries. @@ -730,7 +713,7 @@ __curtable_close(WT_CURSOR *cursor) u_int i; ctable = (WT_CURSOR_TABLE *)cursor; - CURSOR_API_CALL(cursor, session, close, NULL); + JOINABLE_CURSOR_API_CALL(cursor, session, close, NULL); if (ctable->cg_cursors != NULL) for (i = 0, cp = ctable->cg_cursors; @@ -853,7 +836,7 @@ __curtable_open_indices(WT_CURSOR_TABLE *ctable) */ int __wt_curtable_open(WT_SESSION_IMPL *session, - const char *uri, const char *cfg[], WT_CURSOR **cursorp) + const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp) { WT_CURSOR_STATIC_INIT(iface, __wt_curtable_get_key, /* get-key */ @@ -944,7 +927,7 @@ __wt_curtable_open(WT_SESSION_IMPL *session, } WT_ERR(__wt_cursor_init( - cursor, cursor->internal_uri, NULL, cfg, cursorp)); + cursor, cursor->internal_uri, owner, cfg, cursorp)); if (F_ISSET(cursor, WT_CURSTD_DUMP_JSON)) WT_ERR(__wt_json_column_init(cursor, table->key_format, |