diff options
18 files changed, 1435 insertions, 37 deletions
diff --git a/src/third_party/wiredtiger/dist/api_data.py b/src/third_party/wiredtiger/dist/api_data.py index 8f9dacf4aeb..026ca22efd9 100644 --- a/src/third_party/wiredtiger/dist/api_data.py +++ b/src/third_party/wiredtiger/dist/api_data.py @@ -1493,10 +1493,14 @@ methods = { configure debug specific behavior on a cursor. Generally only used for internal testing purposes''', type='category', subconfig=[ + Config('dump_version', 'false', r''' + open a version cursor, which is a debug cursor on a table that + enables iteration through the history of values for a given key.''', + type='boolean'), Config('release_evict', 'false', r''' Configure the cursor to evict the page positioned on when the reset API is used''', - type='boolean') + type='boolean'), ]), Config('dump', '', r''' configure the cursor for dump format inputs and outputs: "hex" diff --git a/src/third_party/wiredtiger/dist/filelist b/src/third_party/wiredtiger/dist/filelist index a37d0e116e0..552f0f1b5f7 100644 --- a/src/third_party/wiredtiger/dist/filelist +++ b/src/third_party/wiredtiger/dist/filelist @@ -94,6 +94,7 @@ src/cursor/cur_metadata.c src/cursor/cur_stat.c src/cursor/cur_std.c src/cursor/cur_table.c +src/cursor/cur_version.c src/evict/evict_file.c src/evict/evict_lru.c src/evict/evict_page.c diff --git a/src/third_party/wiredtiger/dist/s_string.ok b/src/third_party/wiredtiger/dist/s_string.ok index c517aa093e8..3b1c4d9cfcb 100644 --- a/src/third_party/wiredtiger/dist/s_string.ok +++ b/src/third_party/wiredtiger/dist/s_string.ok @@ -689,6 +689,7 @@ cursorp curstat curtable curtiered +curversion cust customp cv diff --git a/src/third_party/wiredtiger/examples/c/ex_cursor.c b/src/third_party/wiredtiger/examples/c/ex_cursor.c index 14b8e866e8f..5957582ebdf 100644 --- a/src/third_party/wiredtiger/examples/c/ex_cursor.c +++ b/src/third_party/wiredtiger/examples/c/ex_cursor.c @@ -39,6 +39,7 @@ int cursor_insert(WT_CURSOR *cursor); int cursor_update(WT_CURSOR *cursor); int cursor_remove(WT_CURSOR *cursor); int cursor_largest_key(WT_CURSOR *cursor); +int version_cursor_dump(WT_CURSOR *cursor); static const char *home; @@ -164,6 +165,23 @@ cursor_largest_key(WT_CURSOR *cursor) } /*! [cursor largest key] */ +/*! [version cursor dump] */ +int +version_cursor_dump(WT_CURSOR *cursor) +{ + wt_timestamp_t start_ts, start_durable_ts, stop_ts, stop_durable_ts; + uint64_t start_txnid, stop_txnid; + uint8_t flags, location, prepare, type; + const char *value; + cursor->set_key(cursor, "foo"); + error_check(cursor->search(cursor)); + error_check(cursor->get_value(cursor, &start_txnid, &start_ts, &start_durable_ts, &stop_txnid, + &stop_ts, &stop_durable_ts, &type, &prepare, &flags, &location, &value)); + + return (0); +} +/*! [version cursor dump] */ + int main(int argc, char *argv[]) { @@ -206,9 +224,17 @@ main(int argc, char *argv[]) error_check(cursor_search_near(cursor)); error_check(cursor_update(cursor)); error_check(cursor_remove(cursor)); + error_check(cursor_insert(cursor)); error_check(cursor_largest_key(cursor)); error_check(cursor->close(cursor)); + /* Create a version cursor. */ + error_check(session->begin_transaction(session, "read_timestamp=1")); + error_check( + session->open_cursor(session, "file:map.wt", NULL, "debug=(dump_version=true)", &cursor)); + error_check(version_cursor_dump(cursor)); + error_check(cursor->close(cursor)); + /* Note: closing the connection implicitly closes open session(s). */ error_check(conn->close(conn, NULL)); diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 2437574408b..1fa8bd259f9 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-5.0", - "commit": "ad583b158e9ccb035d29c1efaa7bce4c6907851b" + "commit": "8dc28197e96983d6441dbefdc679312b01c7dfae" } diff --git a/src/third_party/wiredtiger/lang/python/wiredtiger.i b/src/third_party/wiredtiger/lang/python/wiredtiger.i index 57ba8eac321..11d64fa4588 100644 --- a/src/third_party/wiredtiger/lang/python/wiredtiger.i +++ b/src/third_party/wiredtiger/lang/python/wiredtiger.i @@ -119,15 +119,18 @@ from packing import pack, unpack SWIGTYPE_p___wt_cursor, 0); if (*$1 != NULL) { PY_CALLBACK *pcb; - uint32_t json; + uint32_t json, version_cursor; json = (*$1)->flags & WT_CURSTD_DUMP_JSON; + version_cursor = (*$1)->flags & WT_CURSTD_VERSION_CURSOR; if (!json) (*$1)->flags |= WT_CURSTD_RAW; PyObject_SetAttrString($result, "is_json", PyBool_FromLong(json != 0)); PyObject_SetAttrString($result, "is_column", PyBool_FromLong(strcmp((*$1)->key_format, "r") == 0)); + PyObject_SetAttrString($result, "is_version_cursor", + PyBool_FromLong(version_cursor != 0)); PyObject_SetAttrString($result, "key_format", PyString_InternFromString((*$1)->key_format)); PyObject_SetAttrString($result, "value_format", @@ -630,7 +633,9 @@ OVERRIDE_METHOD(__wt_cursor, WT_CURSOR, search_near, (self)) /* Handle binary data returns from get_key/value -- avoid cstring.i: it creates a list of returns. */ %typemap(in,numinputs=0) (char **datap, int *sizep) (char *data, int size) { $1 = &data; $2 = &size; } %typemap(in,numinputs=0) (char **charp, int *sizep) (char *data, int size) { $1 = &data; $2 = &size; } +%typemap(in,numinputs=0) (char **metadatap, int *metadatasizep, char **datap, int *datasizep) (char *metadata, int metadatasize, char *data, int datasize) { $1 = &metadata; $2 = &metadatasize; $3 = &data; $4 = &datasize; } %typemap(frearg) (char **datap, int *sizep) ""; +%typemap(frearg) (char **metadatap, int *metadatasizep, char **datap, int *datasizep) ""; %typemap(argout) (char **charp, int *sizep) { if (*$1) $result = PyUnicode_FromStringAndSize(*$1, *$2); @@ -639,7 +644,17 @@ OVERRIDE_METHOD(__wt_cursor, WT_CURSOR, search_near, (self)) %typemap(argout) (char **datap, int *sizep) { if (*$1) $result = PyBytes_FromStringAndSize(*$1, *$2); - } +} + +%typemap(argout)(char **metadatap, int *metadatasizep, char **datap, int *datasizep) ( + PyObject *metadata, PyObject *data) { + if (*$1 && *$3) { + metadata = PyBytes_FromStringAndSize(*$1, *$2); + $result = metadata; + data = PyBytes_FromStringAndSize(*$3, *$4); + $result = SWIG_Python_AppendOutput($result, data); + } +} /* Handle binary data input from FILE_HANDLE->fh_write. */ %typemap(in,numinputs=1) (size_t length, const void *buf) (Py_ssize_t length, const void *buf = NULL) { @@ -770,6 +785,19 @@ typedef int int_void; return (ret); } + int_void _get_version_cursor_value(char **metadatap, int *metadatasizep, char **datap, int *datasizep) { + WT_ITEM metadata; + WT_ITEM v; + int ret = $self->get_value($self, &metadata, &v); + if (ret == 0) { + *metadatap = (char *)metadata.data; + *metadatasizep = (int)metadata.size; + *datap = (char *)v.data; + *datasizep = (int)v.size; + } + return (ret); + } + /* compare: special handling. */ int _compare(WT_CURSOR *other) { int cmp = 0; @@ -875,6 +903,11 @@ typedef int int_void; @copydoc WT_CURSOR::get_value''' if self.is_json: return [self._get_json_value()] + elif self.is_version_cursor: + result = self._get_version_cursor_value() + metadata = unpack("QQQQQQBBBB", result[0]) + data = unpack(self.value_format[10:], result[1]) + return metadata + data else: return unpack(self.value_format, self._get_value()) diff --git a/src/third_party/wiredtiger/src/btree/bt_cursor.c b/src/third_party/wiredtiger/src/btree/bt_cursor.c index ecaf3bc74a7..0a36ca12a8a 100644 --- a/src/third_party/wiredtiger/src/btree/bt_cursor.c +++ b/src/third_party/wiredtiger/src/btree/bt_cursor.c @@ -569,12 +569,20 @@ __wt_btcur_search(WT_CURSOR_BTREE *cbt) if (btree->type == BTREE_ROW) { WT_ERR(__cursor_row_search(cbt, false, cbt->ref, &leaf_found)); - if (leaf_found && cbt->compare == 0) - WT_ERR(__wt_cursor_valid(cbt, cbt->tmp, WT_RECNO_OOB, &valid)); + if (leaf_found && cbt->compare == 0) { + if (F_ISSET(cursor, WT_CURSTD_KEY_ONLY)) + valid = true; + else + WT_ERR(__wt_cursor_valid(cbt, cbt->tmp, WT_RECNO_OOB, &valid)); + } } else { WT_ERR(__cursor_col_search(cbt, cbt->ref, &leaf_found)); - if (leaf_found && cbt->compare == 0) - WT_ERR(__wt_cursor_valid(cbt, NULL, cbt->recno, &valid)); + if (leaf_found && cbt->compare == 0) { + if (F_ISSET(cursor, WT_CURSTD_KEY_ONLY)) + valid = true; + else + WT_ERR(__wt_cursor_valid(cbt, NULL, cbt->recno, &valid)); + } } } if (!valid) { @@ -582,17 +590,26 @@ __wt_btcur_search(WT_CURSOR_BTREE *cbt) if (btree->type == BTREE_ROW) { WT_ERR(__cursor_row_search(cbt, false, NULL, NULL)); - if (cbt->compare == 0) - WT_ERR(__wt_cursor_valid(cbt, cbt->tmp, WT_RECNO_OOB, &valid)); + if (cbt->compare == 0) { + if (F_ISSET(cursor, WT_CURSTD_KEY_ONLY)) + valid = true; + else + WT_ERR(__wt_cursor_valid(cbt, cbt->tmp, WT_RECNO_OOB, &valid)); + } } else { WT_ERR(__cursor_col_search(cbt, NULL, NULL)); - if (cbt->compare == 0) - WT_ERR(__wt_cursor_valid(cbt, NULL, cbt->recno, &valid)); + if (cbt->compare == 0) { + if (F_ISSET(cursor, WT_CURSTD_KEY_ONLY)) + valid = true; + else + WT_ERR(__wt_cursor_valid(cbt, NULL, cbt->recno, &valid)); + } } } if (valid) - ret = __cursor_kv_return(cbt, cbt->upd_value); + ret = F_ISSET(cursor, WT_CURSTD_KEY_ONLY) ? __wt_key_return(cbt) : + __cursor_kv_return(cbt, cbt->upd_value); else if (__cursor_fix_implicit(btree, cbt)) { /* * Creating a record past the end of the tree in a fixed-length column-store implicitly diff --git a/src/third_party/wiredtiger/src/config/config_def.c b/src/third_party/wiredtiger/src/config/config_def.c index 272ff43f42a..936b6de4a43 100644 --- a/src/third_party/wiredtiger/src/config/config_def.c +++ b/src/third_party/wiredtiger/src/config/config_def.c @@ -362,6 +362,7 @@ static const WT_CONFIG_CHECK confchk_WT_SESSION_log_flush[] = { {"sync", "string", NULL, "choices=[\"off\",\"on\"]", NULL, 0}, {NULL, NULL, NULL, NULL, NULL, 0}}; static const WT_CONFIG_CHECK confchk_WT_SESSION_open_cursor_debug_subconfigs[] = { + {"dump_version", "boolean", NULL, NULL, NULL, 0}, {"release_evict", "boolean", NULL, NULL, NULL, 0}, {NULL, NULL, NULL, NULL, NULL, 0}}; static const WT_CONFIG_CHECK confchk_WT_SESSION_open_cursor_incremental_subconfigs[] = { @@ -375,7 +376,7 @@ static const WT_CONFIG_CHECK confchk_WT_SESSION_open_cursor[] = { {"append", "boolean", NULL, NULL, NULL, 0}, {"bulk", "string", NULL, NULL, NULL, 0}, {"checkpoint", "string", NULL, NULL, NULL, 0}, {"checkpoint_wait", "boolean", NULL, NULL, NULL, 0}, - {"debug", "category", NULL, NULL, confchk_WT_SESSION_open_cursor_debug_subconfigs, 1}, + {"debug", "category", NULL, NULL, confchk_WT_SESSION_open_cursor_debug_subconfigs, 2}, {"dump", "string", NULL, "choices=[\"hex\",\"json\",\"pretty\",\"pretty_hex\"," "\"print\"]", @@ -1266,11 +1267,12 @@ static const WT_CONFIG_ENTRY config_entries[] = {{"WT_CONNECTION.add_collator", {"WT_SESSION.log_printf", "", NULL, 0}, {"WT_SESSION.open_cursor", "append=false,bulk=false,checkpoint=,checkpoint_wait=true," - "debug=(release_evict=false),dump=,incremental=(consolidate=false" - ",enabled=false,file=,force_stop=false,granularity=16MB,src_id=," - "this_id=),next_random=false,next_random_sample_size=0," - "overwrite=true,prefix_search=false,raw=false,read_once=false," - "readonly=false,skip_sort_check=false,statistics=,target=", + "debug=(dump_version=false,release_evict=false),dump=," + "incremental=(consolidate=false,enabled=false,file=," + "force_stop=false,granularity=16MB,src_id=,this_id=)," + "next_random=false,next_random_sample_size=0,overwrite=true," + "prefix_search=false,raw=false,read_once=false,readonly=false," + "skip_sort_check=false,statistics=,target=", confchk_WT_SESSION_open_cursor, 17}, {"WT_SESSION.prepare_transaction", "prepare_timestamp=", confchk_WT_SESSION_prepare_transaction, 1}, diff --git a/src/third_party/wiredtiger/src/cursor/cur_hs.c b/src/third_party/wiredtiger/src/cursor/cur_hs.c index c96010613dd..e2375f9c997 100644 --- a/src/third_party/wiredtiger/src/cursor/cur_hs.c +++ b/src/third_party/wiredtiger/src/cursor/cur_hs.c @@ -9,7 +9,7 @@ #include "wt_internal.h" static int __curhs_file_cursor_next(WT_SESSION_IMPL *, WT_CURSOR *); -static int __curhs_file_cursor_open(WT_SESSION_IMPL *, WT_CURSOR **); +static int __curhs_file_cursor_open(WT_SESSION_IMPL *, WT_CURSOR *, WT_CURSOR **); static int __curhs_file_cursor_prev(WT_SESSION_IMPL *, WT_CURSOR *); static int __curhs_file_cursor_search_near(WT_SESSION_IMPL *, WT_CURSOR *, int *); static int __curhs_prev_visible(WT_SESSION_IMPL *, WT_CURSOR_HS *); @@ -20,14 +20,14 @@ static int __curhs_search_near_helper(WT_SESSION_IMPL *, WT_CURSOR *, bool); * Open a new history store table cursor, internal function. */ static int -__curhs_file_cursor_open(WT_SESSION_IMPL *session, WT_CURSOR **cursorp) +__curhs_file_cursor_open(WT_SESSION_IMPL *session, WT_CURSOR *owner, WT_CURSOR **cursorp) { WT_CURSOR *cursor; WT_DECL_RET; const char *open_cursor_cfg[] = {WT_CONFIG_BASE(session, WT_SESSION_open_cursor), NULL}; WT_WITHOUT_DHANDLE( - session, ret = __wt_open_cursor(session, WT_HS_URI, NULL, open_cursor_cfg, &cursor)); + session, ret = __wt_open_cursor(session, WT_HS_URI, owner, open_cursor_cfg, &cursor)); WT_RET(ret); /* History store cursors should always ignore tombstones. */ @@ -74,7 +74,7 @@ __wt_curhs_cache(WT_SESSION_IMPL *session) (session->dhandle != NULL && WT_IS_METADATA(S2BT(session)->dhandle)) || session == conn->default_session) return (0); - WT_RET(__curhs_file_cursor_open(session, &cursor)); + WT_RET(__curhs_file_cursor_open(session, NULL, &cursor)); WT_RET(cursor->close(cursor)); return (0); } @@ -1125,7 +1125,7 @@ __wt_curhs_open(WT_SESSION_IMPL *session, WT_CURSOR *owner, WT_CURSOR **cursorp) WT_ERR(__wt_strdup(session, WT_HS_URI, &cursor->uri)); /* Open the file cursor for operations on the regular history store .*/ - WT_ERR(__curhs_file_cursor_open(session, &hs_cursor->file_cursor)); + WT_ERR(__curhs_file_cursor_open(session, owner, &hs_cursor->file_cursor)); WT_WITH_BTREE(session, CUR2BT(hs_cursor->file_cursor), ret = __wt_cursor_init(cursor, WT_HS_URI, owner, NULL, cursorp)); diff --git a/src/third_party/wiredtiger/src/cursor/cur_std.c b/src/third_party/wiredtiger/src/cursor/cur_std.c index 32e9973caaa..8858bca2b9a 100644 --- a/src/third_party/wiredtiger/src/cursor/cur_std.c +++ b/src/third_party/wiredtiger/src/cursor/cur_std.c @@ -557,7 +557,7 @@ __wt_cursor_set_value(WT_CURSOR *cursor, ...) va_list ap; va_start(ap, cursor); - WT_IGNORE_RET(__wt_cursor_set_valuev(cursor, ap)); + WT_IGNORE_RET(__wt_cursor_set_valuev(cursor, cursor->value_format, ap)); va_end(ap); } @@ -566,13 +566,13 @@ __wt_cursor_set_value(WT_CURSOR *cursor, ...) * WT_CURSOR->set_value worker implementation. */ int -__wt_cursor_set_valuev(WT_CURSOR *cursor, va_list ap) +__wt_cursor_set_valuev(WT_CURSOR *cursor, const char *fmt, va_list ap) { WT_DECL_RET; WT_ITEM *buf, *item, tmp; WT_SESSION_IMPL *session; size_t sz; - const char *fmt, *str; + const char *str; va_list ap_copy; buf = &cursor->value; @@ -589,7 +589,6 @@ __wt_cursor_set_valuev(WT_CURSOR *cursor, va_list ap) F_CLR(cursor, WT_CURSTD_VALUE_SET); /* Fast path some common cases. */ - fmt = cursor->value_format; if (F_ISSET(cursor, WT_CURSOR_RAW_OK | WT_CURSTD_DUMP_JSON) || WT_STREQ(fmt, "u")) { item = va_arg(ap, WT_ITEM *); sz = item->size; @@ -604,11 +603,11 @@ __wt_cursor_set_valuev(WT_CURSOR *cursor, va_list ap) *(uint8_t *)buf->mem = (uint8_t)va_arg(ap, int); } else { va_copy(ap_copy, ap); - ret = __wt_struct_sizev(session, &sz, cursor->value_format, ap_copy); + ret = __wt_struct_sizev(session, &sz, fmt, ap_copy); va_end(ap_copy); WT_ERR(ret); WT_ERR(__wt_buf_initsize(session, buf, sz)); - WT_ERR(__wt_struct_packv(session, buf->mem, sz, cursor->value_format, ap)); + WT_ERR(__wt_struct_packv(session, buf->mem, sz, fmt, ap)); } F_SET(cursor, WT_CURSTD_VALUE_EXT); buf->size = sz; diff --git a/src/third_party/wiredtiger/src/cursor/cur_version.c b/src/third_party/wiredtiger/src/cursor/cur_version.c new file mode 100644 index 00000000000..28d6af0e9b2 --- /dev/null +++ b/src/third_party/wiredtiger/src/cursor/cur_version.c @@ -0,0 +1,676 @@ +/*- + * Copyright (c) 2014-present MongoDB, Inc. + * Copyright (c) 2008-2014 WiredTiger, Inc. + * All rights reserved. + * + * See the file LICENSE for redistribution information. + */ + +#include "wt_internal.h" + +#define WT_CURVERSION_METADATA_FORMAT WT_UNCHECKED_STRING(QQQQQQBBBB) +/* + * __curversion_set_key -- + * WT_CURSOR->set_key implementation for version cursors. + */ +static void +__curversion_set_key(WT_CURSOR *cursor, ...) +{ + WT_CURSOR *file_cursor; + WT_CURSOR_VERSION *version_cursor; + WT_DECL_RET; + WT_SESSION_IMPL *session; + uint32_t flags; + va_list ap; + + session = CUR2S(cursor); + + /* Reset the cursor every time for a new key. */ + if ((ret = cursor->reset(cursor)) != 0) + WT_IGNORE_RET(__wt_panic(session, ret, "failed to reset cursor")); + + version_cursor = (WT_CURSOR_VERSION *)cursor; + file_cursor = version_cursor->file_cursor; + va_start(ap, cursor); + flags = file_cursor->flags; + /* Pass on the raw flag. */ + if (F_ISSET(cursor, WT_CURSTD_RAW)) + LF_SET(WT_CURSTD_RAW); + if ((ret = __wt_cursor_set_keyv(file_cursor, flags, ap)) != 0) + WT_IGNORE_RET(__wt_panic(session, ret, "failed to set key")); + va_end(ap); +} + +/* + * __curversion_get_key -- + * WT_CURSOR->get_key implementation for version cursors. + */ +static int +__curversion_get_key(WT_CURSOR *cursor, ...) +{ + WT_CURSOR *file_cursor; + WT_CURSOR_VERSION *version_cursor; + WT_DECL_RET; + uint32_t flags; + va_list ap; + + version_cursor = (WT_CURSOR_VERSION *)cursor; + file_cursor = version_cursor->file_cursor; + va_start(ap, cursor); + flags = file_cursor->flags; + /* Pass on the raw flag. */ + if (F_ISSET(cursor, WT_CURSTD_RAW)) + flags |= WT_CURSTD_RAW; + WT_ERR(__wt_cursor_get_keyv(file_cursor, flags, ap)); + +err: + va_end(ap); + return (ret); +} + +/* + * __curversion_get_value -- + * WT_CURSOR->get_value implementation for version cursors. + */ +static int +__curversion_get_value(WT_CURSOR *cursor, ...) +{ + WT_CURSOR *file_cursor; + WT_CURSOR_VERSION *version_cursor; + WT_DECL_ITEM(data); + WT_DECL_ITEM(metadata); + WT_DECL_PACK_VALUE(pv); + WT_DECL_RET; + WT_PACK pack; + WT_SESSION_IMPL *session; + const uint8_t *p, *end; + va_list ap; + + version_cursor = (WT_CURSOR_VERSION *)cursor; + file_cursor = version_cursor->file_cursor; + + CURSOR_API_CALL(cursor, session, get_value, NULL); + WT_ERR(__cursor_checkvalue(cursor)); + WT_ERR(__cursor_checkvalue(file_cursor)); + + va_start(ap, cursor); + if (F_ISSET(cursor, WT_CURSTD_RAW)) { + /* Extract metadata and value separately as raw data. */ + metadata = va_arg(ap, WT_ITEM *); + metadata->data = cursor->value.data; + metadata->size = cursor->value.size; + data = va_arg(ap, WT_ITEM *); + data->data = file_cursor->value.data; + data->size = file_cursor->value.size; + } else { + /* + * Unpack the metadata. We cannot use the standard get value function here because variable + * arguments cannot be partially extracted by different function calls. + */ + WT_ASSERT(session, cursor->value.data != NULL); + p = (uint8_t *)cursor->value.data; + end = p + cursor->value.size; + + WT_ERR(__pack_init(session, &pack, WT_CURVERSION_METADATA_FORMAT)); + while ((ret = __pack_next(&pack, &pv)) == 0) { + WT_ERR(__unpack_read(session, &pv, &p, (size_t)(end - p))); + WT_UNPACK_PUT(session, pv, ap); + } + WT_ERR_NOTFOUND_OK(ret, false); + + WT_ASSERT(session, p <= end); + WT_ERR(__wt_cursor_get_valuev(file_cursor, ap)); + } + +err: + va_end(ap); + API_END_RET(session, ret); +} + +/* + * __curversion_set_value_with_format -- + * Set version cursor value with the given format. + */ +static int +__curversion_set_value_with_format(WT_CURSOR *cursor, const char *fmt, ...) +{ + WT_DECL_RET; + va_list ap; + + va_start(ap, fmt); + ret = __wt_cursor_set_valuev(cursor, fmt, ap); + va_end(ap); + + return (ret); +} + +/* + * __curversion_next_int -- + * Internal implementation for version cursor next api. + */ +static int +__curversion_next_int(WT_CURSOR *cursor) +{ + WT_CURSOR *hs_cursor, *file_cursor; + WT_CURSOR_BTREE *cbt; + WT_CURSOR_VERSION *version_cursor; + WT_DECL_ITEM(hs_value); + WT_DECL_ITEM(key); + WT_DECL_RET; + WT_PAGE *page; + WT_SESSION_IMPL *session; + WT_TIME_WINDOW *twp; + WT_UPDATE *first, *next_upd, *upd, *tombstone; + wt_timestamp_t durable_start_ts, durable_stop_ts, stop_ts; + uint64_t stop_txn, hs_upd_type; + uint32_t raw; + uint8_t *p, version_prepare_state; + bool upd_found; + + session = CUR2S(cursor); + version_cursor = (WT_CURSOR_VERSION *)cursor; + hs_cursor = version_cursor->hs_cursor; + file_cursor = version_cursor->file_cursor; + cbt = (WT_CURSOR_BTREE *)file_cursor; + page = cbt->ref->page; + twp = NULL; + upd_found = false; + first = upd = tombstone = NULL; + + /* Temporarily clear the raw flag. We need to pack the data according to the format. */ + raw = F_MASK(cursor, WT_CURSTD_RAW); + F_CLR(cursor, WT_CURSTD_RAW); + + /* The cursor should be positioned, otherwise the next call will fail. */ + if (!F_ISSET(file_cursor, WT_CURSTD_KEY_INT)) + WT_ERR(__wt_txn_rollback_required( + session, "rolling back version_cursor->next due to no initial position")); + + if (!F_ISSET(version_cursor, WT_CURVERSION_UPDATE_EXHAUSTED)) { + upd = version_cursor->next_upd; + + if (upd == NULL) { + version_cursor->next_upd = NULL; + F_SET(version_cursor, WT_CURVERSION_UPDATE_EXHAUSTED); + } else { + if (upd->type == WT_UPDATE_TOMBSTONE) { + tombstone = upd; + + /* + * If the update is a tombstone, we still want to record the stop information but we + * also need traverse to the next update to get the full value. If the tombstone was + * the last update in the update list, retrieve the ondisk value. + */ + version_cursor->upd_stop_txnid = upd->txnid; + version_cursor->upd_durable_stop_ts = upd->durable_ts; + version_cursor->upd_stop_ts = upd->start_ts; + + /* No need to check the next update if the tombstone is globally visible. */ + if (__wt_txn_upd_visible_all(session, upd)) + upd = NULL; + else + upd = upd->next; + + /* Make sure the next update is not an aborted update. */ + while (upd != NULL && upd->txnid == WT_TXN_ABORTED) + upd = upd->next; + } + + if (upd == NULL) { + version_cursor->next_upd = NULL; + F_SET(version_cursor, WT_CURVERSION_UPDATE_EXHAUSTED); + } else { + if (upd->prepare_state == WT_PREPARE_INPROGRESS || + upd->prepare_state == WT_PREPARE_LOCKED) + version_prepare_state = 1; + else + version_prepare_state = 0; + + /* + * Copy the update value into the version cursor as we don't know the value format. + * If the update is a modify, reconstruct the value. + */ + if (upd->type != WT_UPDATE_MODIFY) + __wt_upd_value_assign(cbt->upd_value, upd); + else + WT_ERR( + __wt_modify_reconstruct_from_upd_list(session, cbt, upd, cbt->upd_value)); + + /* + * Set the version cursor's value, which also contains all the record metadata for + * that particular version of the update. + */ + WT_ERR(__curversion_set_value_with_format(cursor, WT_CURVERSION_METADATA_FORMAT, + upd->txnid, upd->start_ts, upd->durable_ts, version_cursor->upd_stop_txnid, + version_cursor->upd_stop_ts, version_cursor->upd_durable_stop_ts, upd->type, + version_prepare_state, upd->flags, WT_CURVERSION_UPDATE_CHAIN)); + + version_cursor->upd_stop_txnid = upd->txnid; + version_cursor->upd_durable_stop_ts = upd->durable_ts; + version_cursor->upd_stop_ts = upd->start_ts; + + upd_found = true; + + /* Walk to the next non-obsolete update. */ + for (next_upd = upd; next_upd != NULL; next_upd = next_upd->next) { + if (next_upd->txnid == WT_TXN_ABORTED) + continue; + + if (first != NULL) { + next_upd = NULL; + break; + } + + /* We have traversed all the non-obsolete updates. */ + if (WT_UPDATE_DATA_VALUE(next_upd) && + __wt_txn_upd_visible_all(session, next_upd)) + first = next_upd; + + if (next_upd != upd) + break; + } + version_cursor->next_upd = next_upd; + if (next_upd == NULL) + F_SET(version_cursor, WT_CURVERSION_UPDATE_EXHAUSTED); + } + } + } + + if (!upd_found && !F_ISSET(version_cursor, WT_CURVERSION_ON_DISK_EXHAUSTED)) { + switch (page->type) { + case WT_PAGE_ROW_LEAF: + if (cbt->ins != NULL) { + F_SET(version_cursor, WT_CURVERSION_ON_DISK_EXHAUSTED); + F_SET(version_cursor, WT_CURVERSION_HS_EXHAUSTED); + WT_ERR(WT_NOTFOUND); + } + break; + case WT_PAGE_COL_FIX: + /* + * If search returned an insert, we might be past the end of page in the append list, so + * there's no on-disk value. + */ + if (cbt->recno >= cbt->ref->ref_recno + page->entries) { + F_SET(version_cursor, WT_CURVERSION_ON_DISK_EXHAUSTED); + F_SET(version_cursor, WT_CURVERSION_HS_EXHAUSTED); + WT_ERR(WT_NOTFOUND); + } + break; + case WT_PAGE_COL_VAR: + /* Empty page doesn't have any on page value. */ + if (page->entries == 0) { + F_SET(version_cursor, WT_CURVERSION_ON_DISK_EXHAUSTED); + F_SET(version_cursor, WT_CURVERSION_HS_EXHAUSTED); + WT_ERR(WT_NOTFOUND); + } + break; + default: + WT_ERR(__wt_illegal_value(session, page->type)); + } + + /* Get the ondisk value. */ + WT_ERR(__wt_value_return_buf(cbt, cbt->ref, &cbt->upd_value->buf, &cbt->upd_value->tw)); + + if (!WT_TIME_WINDOW_HAS_STOP(&cbt->upd_value->tw)) { + durable_stop_ts = version_cursor->upd_durable_stop_ts; + stop_ts = version_cursor->upd_stop_ts; + stop_txn = version_cursor->upd_stop_txnid; + } else { + durable_stop_ts = cbt->upd_value->tw.durable_stop_ts; + stop_ts = cbt->upd_value->tw.stop_ts; + stop_txn = cbt->upd_value->tw.stop_txn; + } + + if (tombstone != NULL && + (tombstone->prepare_state == WT_PREPARE_INPROGRESS || + tombstone->prepare_state == WT_PREPARE_LOCKED)) + version_prepare_state = 1; + else + version_prepare_state = cbt->upd_value->tw.prepare; + + WT_ERR(__curversion_set_value_with_format(cursor, WT_CURVERSION_METADATA_FORMAT, + cbt->upd_value->tw.start_txn, cbt->upd_value->tw.start_ts, + cbt->upd_value->tw.durable_start_ts, stop_txn, stop_ts, durable_stop_ts, + WT_UPDATE_STANDARD, version_prepare_state, 0, WT_CURVERSION_DISK_IMAGE)); + + upd_found = true; + F_SET(version_cursor, WT_CURVERSION_ON_DISK_EXHAUSTED); + } + + if (!upd_found && !F_ISSET(version_cursor, WT_CURVERSION_HS_EXHAUSTED)) { + /* Ensure we can see all the content in the history store. */ + F_SET(hs_cursor, WT_CURSTD_HS_READ_COMMITTED); + + if (!F_ISSET(hs_cursor, WT_CURSTD_KEY_INT)) { + if (page->type == WT_PAGE_ROW_LEAF) + hs_cursor->set_key( + hs_cursor, 4, S2BT(session)->id, &file_cursor->key, WT_TS_MAX, UINT64_MAX); + else { + /* Ensure enough room for a column-store key without checking. */ + WT_ERR(__wt_scr_alloc(session, WT_INTPACK64_MAXSIZE, &key)); + + p = key->mem; + WT_ERR(__wt_vpack_uint(&p, 0, cbt->recno)); + key->size = WT_PTRDIFF(p, key->data); + hs_cursor->set_key(hs_cursor, 4, S2BT(session)->id, key, WT_TS_MAX, UINT64_MAX); + } + WT_ERR(__wt_curhs_search_near_before(session, hs_cursor)); + } else + WT_ERR(hs_cursor->prev(hs_cursor)); + + WT_ERR(__wt_scr_alloc(session, 0, &hs_value)); + + /* + * If there are no history store records for the given key or if we have iterated through + * all the records already, we have exhausted the history store. + */ + WT_ASSERT(session, ret == 0); + + __wt_hs_upd_time_window(hs_cursor, &twp); + WT_ERR(hs_cursor->get_value( + hs_cursor, &durable_stop_ts, &durable_start_ts, &hs_upd_type, hs_value)); + + WT_ERR(__curversion_set_value_with_format(cursor, WT_CURVERSION_METADATA_FORMAT, + twp->start_txn, twp->start_ts, twp->durable_start_ts, twp->stop_txn, twp->stop_ts, + twp->durable_stop_ts, hs_upd_type, 0, 0, WT_CURVERSION_HISTORY_STORE)); + + /* + * Reconstruct the history store value if needed. Since we save the value inside the version + * cursor every time we traverse a version, we can simply apply the modify onto the latest + * value. + */ + if (hs_upd_type == WT_UPDATE_MODIFY) { + WT_ERR(__wt_modify_apply_item( + session, file_cursor->value_format, &cbt->upd_value->buf, hs_value->data)); + } else { + WT_ASSERT(session, hs_upd_type == WT_UPDATE_STANDARD); + cbt->upd_value->buf.data = hs_value->data; + cbt->upd_value->buf.size = hs_value->size; + } + upd_found = true; + } + + if (!upd_found) + ret = WT_NOTFOUND; + else { + cbt->upd_value->type = WT_UPDATE_STANDARD; + WT_ERR(__wt_value_return(cbt, cbt->upd_value)); + } + +err: + __wt_scr_free(session, &key); + __wt_scr_free(session, &hs_value); + F_SET(cursor, raw); + return (ret); +} + +/* + * __curversion_next -- + * WT_CURSOR->next method for version cursors. The next function will position the cursor on the + * next update of the key it is positioned at. We traverse through updates on the update chain, + * then the ondisk value, and finally from the history store. + */ +static int +__curversion_next(WT_CURSOR *cursor) +{ + WT_CURSOR_VERSION *version_cursor; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + version_cursor = (WT_CURSOR_VERSION *)cursor; + + CURSOR_API_CALL(cursor, session, next, CUR2BT(version_cursor->file_cursor)); + WT_ERR(__curversion_next_int(cursor)); + +err: + if (ret != 0) + WT_TRET(cursor->reset(cursor)); + API_END_RET(session, ret); +} + +/* + * __curversion_reset -- + * WT_CURSOR::reset for version cursors. + */ +static int +__curversion_reset(WT_CURSOR *cursor) +{ + WT_CURSOR *hs_cursor, *file_cursor; + WT_CURSOR_VERSION *version_cursor; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + version_cursor = (WT_CURSOR_VERSION *)cursor; + hs_cursor = version_cursor->hs_cursor; + file_cursor = version_cursor->file_cursor; + CURSOR_API_CALL(cursor, session, reset, NULL); + + if (file_cursor != NULL) + WT_TRET(file_cursor->reset(file_cursor)); + if (hs_cursor != NULL) + WT_TRET(hs_cursor->reset(hs_cursor)); + version_cursor->next_upd = NULL; + version_cursor->flags = 0; + F_CLR(cursor, WT_CURSTD_KEY_SET); + F_CLR(cursor, WT_CURSTD_VALUE_SET); + + /* Clear the information used to track update metadata. */ + version_cursor->upd_stop_txnid = WT_TXN_MAX; + version_cursor->upd_durable_stop_ts = WT_TS_MAX; + version_cursor->upd_stop_ts = WT_TS_MAX; + +err: + API_END_RET(session, ret); +} + +/* + * __curversion_search -- + * WT_CURSOR->search method for version cursors. + */ +static int +__curversion_search(WT_CURSOR *cursor) +{ + WT_CURSOR *file_cursor; + WT_CURSOR_BTREE *cbt; + WT_CURSOR_VERSION *version_cursor; + WT_DECL_RET; + WT_PAGE *page; + WT_ROW *rip; + WT_SESSION_IMPL *session; + WT_TXN *txn; + WT_TXN_SHARED *txn_shared; + wt_timestamp_t oldest_ts; + + version_cursor = (WT_CURSOR_VERSION *)cursor; + file_cursor = version_cursor->file_cursor; + cbt = (WT_CURSOR_BTREE *)file_cursor; + + CURSOR_API_CALL(cursor, session, search, CUR2BT(cbt)); + txn = session->txn; + txn_shared = WT_SESSION_TXN_SHARED(session); + + /* + * Check that we have the current transaction's read timestamp pinged as the oldest timestamp to + * ensure that the global visibility will not change during the life of this transaction. + */ + WT_ERR_NOTFOUND_OK( + __wt_txn_get_pinned_timestamp(session, &oldest_ts, WT_TXN_TS_INCLUDE_OLDEST), true); + if (!F_ISSET(txn, WT_TXN_SHARED_TS_READ) || + (ret == 0 && oldest_ts < txn_shared->read_timestamp)) + WT_ERR(__wt_txn_rollback_required(session, + "version cursor can only be called with the read timestamp as the oldest timestamp")); + if (ret == WT_NOTFOUND && txn_shared->read_timestamp > 1) + WT_ERR(__wt_txn_rollback_required(session, + "version cursor can only be called with read timestamp 1 if there is no oldest " + "timestamp")); + + WT_ERR(__cursor_checkkey(file_cursor)); + if (F_ISSET(file_cursor, WT_CURSTD_KEY_INT)) + WT_ERR(__wt_txn_rollback_required( + session, "version cursor cannot be called when it is positioned")); + + /* Do a search and position on the key if it is found */ + F_SET(file_cursor, WT_CURSTD_KEY_ONLY); + WT_ERR(__wt_btcur_search(cbt)); + WT_ASSERT(session, F_ISSET(file_cursor, WT_CURSTD_KEY_INT)); + + /* + * If we position on a key, set next update of the version cursor to be the first update on the + * key if any. + */ + page = cbt->ref->page; + switch (page->type) { + case WT_PAGE_ROW_LEAF: + if (cbt->ins != NULL) + version_cursor->next_upd = cbt->ins->upd; + else { + rip = &page->pg_row[cbt->slot]; + version_cursor->next_upd = WT_ROW_UPDATE(page, rip); + } + break; + case WT_PAGE_COL_FIX: + case WT_PAGE_COL_VAR: + if (cbt->ins != NULL) + version_cursor->next_upd = cbt->ins->upd; + else + version_cursor->next_upd = NULL; + break; + default: + WT_ERR(__wt_illegal_value(session, page->type)); + } + + /* Walk to the first non aborted update. This update cannot be obsolete if exists. */ + while (version_cursor->next_upd != NULL && version_cursor->next_upd->txnid == WT_TXN_ABORTED) + version_cursor->next_upd = version_cursor->next_upd->next; + + if (version_cursor->next_upd == NULL) + F_SET(version_cursor, WT_CURVERSION_UPDATE_EXHAUSTED); + + /* Point to the newest version. */ + WT_ERR(__curversion_next_int(cursor)); + +err: + if (ret != 0) + WT_TRET(cursor->reset(cursor)); + API_END_RET(session, ret); +} + +/* + * __curversion_close -- + * WT_CURSOR->close method for version cursors. + */ +static int +__curversion_close(WT_CURSOR *cursor) +{ + WT_CURSOR *hs_cursor, *file_cursor; + WT_CURSOR_VERSION *version_cursor; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + version_cursor = (WT_CURSOR_VERSION *)cursor; + hs_cursor = version_cursor->hs_cursor; + file_cursor = version_cursor->file_cursor; + CURSOR_API_CALL(cursor, session, close, NULL); +err: + version_cursor->next_upd = NULL; + if (file_cursor != NULL) { + WT_TRET(file_cursor->close(file_cursor)); + version_cursor->file_cursor = NULL; + } + if (hs_cursor != NULL) { + WT_TRET(hs_cursor->close(hs_cursor)); + version_cursor->hs_cursor = NULL; + } + __wt_free(session, cursor->value_format); + __wt_cursor_close(cursor); + + API_END_RET(session, ret); +} + +/* + * __wt_curversion_open -- + * Initialize a version cursor. + */ +int +__wt_curversion_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], + WT_CURSOR **cursorp) +{ + WT_CURSOR_STATIC_INIT(iface, __curversion_get_key, /* get-key */ + __curversion_get_value, /* get-value */ + __curversion_set_key, /* set-key */ + __wt_cursor_set_value_notsup, /* set-value */ + __wt_cursor_compare_notsup, /* compare */ + __wt_cursor_equals_notsup, /* equals */ + __curversion_next, /* next */ + __wt_cursor_notsup, /* prev */ + __curversion_reset, /* reset */ + __curversion_search, /* search */ + __wt_cursor_search_near_notsup, /* search-near */ + __wt_cursor_notsup, /* insert */ + __wt_cursor_modify_notsup, /* modify */ + __wt_cursor_notsup, /* update */ + __wt_cursor_notsup, /* remove */ + __wt_cursor_notsup, /* reserve */ + __wt_cursor_reconfigure_notsup, /* reconfigure */ + __wt_cursor_notsup, /* largest_key */ + __wt_cursor_notsup, /* cache */ + __wt_cursor_reopen_notsup, /* reopen */ + __curversion_close); /* close */ + + WT_CURSOR *cursor; + WT_CURSOR_VERSION *version_cursor; + WT_DECL_RET; + /* The file cursor is read only. */ + const char *file_cursor_cfg[] = { + WT_CONFIG_BASE(session, WT_SESSION_open_cursor), "read_only=true", NULL}; + char *version_cursor_value_format; + size_t format_len; + + *cursorp = NULL; + WT_RET(__wt_calloc_one(session, &version_cursor)); + cursor = (WT_CURSOR *)version_cursor; + *cursor = iface; + cursor->session = (WT_SESSION *)session; + version_cursor_value_format = NULL; + + /* Open the file cursor to check the key and value format. */ + WT_ERR(__wt_open_cursor(session, uri, NULL, file_cursor_cfg, &version_cursor->file_cursor)); + cursor->key_format = version_cursor->file_cursor->key_format; + format_len = + strlen(WT_CURVERSION_METADATA_FORMAT) + strlen(version_cursor->file_cursor->value_format) + 1; + WT_ERR(__wt_malloc(session, format_len, &version_cursor_value_format)); + WT_ERR(__wt_snprintf(version_cursor_value_format, format_len, "%s%s", + WT_CURVERSION_METADATA_FORMAT, version_cursor->file_cursor->value_format)); + cursor->value_format = version_cursor_value_format; + version_cursor_value_format = NULL; + + WT_ERR(__wt_strdup(session, uri, &cursor->uri)); + WT_ERR(__wt_cursor_init(cursor, cursor->uri, owner, cfg, cursorp)); + + /* Reopen the file cursor with the version cursor as owner. */ + WT_ERR(version_cursor->file_cursor->close(version_cursor->file_cursor)); + WT_ERR(__wt_open_cursor(session, uri, cursor, file_cursor_cfg, &version_cursor->file_cursor)); + + /* Open the history store cursor for operations on the regular history store .*/ + if (F_ISSET(S2C(session), WT_CONN_HS_OPEN)) { + WT_ERR(__wt_curhs_open(session, cursor, &version_cursor->hs_cursor)); + F_SET(version_cursor->hs_cursor, WT_CURSTD_HS_READ_COMMITTED); + } else + F_SET(version_cursor, WT_CURVERSION_HS_EXHAUSTED); + + /* Initialize information used to track update metadata. */ + version_cursor->upd_stop_txnid = WT_TXN_MAX; + version_cursor->upd_durable_stop_ts = WT_TS_MAX; + version_cursor->upd_stop_ts = WT_TS_MAX; + + /* Mark the cursor as version cursor for python api. */ + F_SET(cursor, WT_CURSTD_VERSION_CURSOR); + + if (0) { +err: + __wt_free(session, version_cursor_value_format); + WT_TRET(cursor->close(cursor)); + *cursorp = NULL; + } + return (ret); +} diff --git a/src/third_party/wiredtiger/src/include/cursor.h b/src/third_party/wiredtiger/src/include/cursor.h index 0bfbb66fe11..9f9dce453a9 100644 --- a/src/third_party/wiredtiger/src/include/cursor.h +++ b/src/third_party/wiredtiger/src/include/cursor.h @@ -527,6 +527,36 @@ struct __wt_cursor_table { WT_CURSOR **idx_cursors; }; +struct __wt_cursor_version { + WT_CURSOR iface; + + WT_CURSOR *hs_cursor; /* Queries of history cursor. */ + WT_CURSOR *file_cursor; /* Queries of regular file cursor. */ + WT_UPDATE *next_upd; + + /* + * While we are iterating through updates on the update list, we need to remember information + * about the previous update we have just traversed so that we can record this as part of the + * debug metadata in the version cursor's key. + */ + uint64_t upd_stop_txnid; + /* The previous traversed update's durable_ts will become the durable_stop_ts. */ + wt_timestamp_t upd_durable_stop_ts; + /* The previous traversed update's start_ts will become the stop_ts. */ + wt_timestamp_t upd_stop_ts; + +#define WT_CURVERSION_UPDATE_CHAIN 0 +#define WT_CURVERSION_DISK_IMAGE 1 +#define WT_CURVERSION_HISTORY_STORE 2 + +/* AUTOMATIC FLAG VALUE GENERATION START 0 */ +#define WT_CURVERSION_HS_EXHAUSTED 0x1u +#define WT_CURVERSION_ON_DISK_EXHAUSTED 0x2u +#define WT_CURVERSION_UPDATE_EXHAUSTED 0x4u + /* AUTOMATIC FLAG VALUE GENERATION STOP 8 */ + uint8_t flags; +}; + #define WT_CURSOR_PRIMARY(cursor) (((WT_CURSOR_TABLE *)(cursor))->cg_cursors[0]) #define WT_CURSOR_RECNO(cursor) WT_STREQ((cursor)->key_format, "r") diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h index e227555d9b1..9066bda30dc 100644 --- a/src/third_party/wiredtiger/src/include/extern.h +++ b/src/third_party/wiredtiger/src/include/extern.h @@ -600,7 +600,7 @@ extern int __wt_cursor_search_near_notsup(WT_CURSOR *cursor, int *exact) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_cursor_set_keyv(WT_CURSOR *cursor, uint32_t flags, va_list ap) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); -extern int __wt_cursor_set_valuev(WT_CURSOR *cursor, va_list ap) +extern int __wt_cursor_set_valuev(WT_CURSOR *cursor, const char *fmt, va_list ap) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_cursor_valid(WT_CURSOR_BTREE *cbt, WT_ITEM *key, uint64_t recno, bool *valid) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); @@ -622,6 +622,8 @@ extern int __wt_curtable_get_value(WT_CURSOR *cursor, ...) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_curtable_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_curversion_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, + const char *cfg[], WT_CURSOR **cursorp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_debug_addr(WT_SESSION_IMPL *session, const uint8_t *addr, size_t addr_size, const char *ofile) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_debug_addr_print(WT_SESSION_IMPL *session, const uint8_t *addr, size_t addr_size) diff --git a/src/third_party/wiredtiger/src/include/wiredtiger.in b/src/third_party/wiredtiger/src/include/wiredtiger.in index 206dbd2a524..c632fa76d5c 100644 --- a/src/third_party/wiredtiger/src/include/wiredtiger.in +++ b/src/third_party/wiredtiger/src/include/wiredtiger.in @@ -736,6 +736,7 @@ struct __wt_cursor { #define WT_CURSTD_RAW_SEARCH 0x1000000u #define WT_CURSTD_VALUE_EXT 0x2000000u /* Value points out of tree. */ #define WT_CURSTD_VALUE_INT 0x4000000u /* Value points into tree. */ +#define WT_CURSTD_VERSION_CURSOR 0x8000000u /* Version cursor. */ /* AUTOMATIC FLAG VALUE GENERATION STOP 32 */ #define WT_CURSTD_KEY_SET (WT_CURSTD_KEY_EXT | WT_CURSTD_KEY_INT) #define WT_CURSTD_VALUE_SET (WT_CURSTD_VALUE_EXT | WT_CURSTD_VALUE_INT) @@ -921,10 +922,13 @@ struct __wt_session { * The cursor does not support data modification., a string; default empty.} * @config{debug = (, configure debug specific behavior on a cursor. Generally only used * for internal testing purposes., a set of related configuration options defined below.} - * @config{ release_evict, Configure the cursor to evict the page - * positioned on when the reset API is used., a boolean flag; default \c false.} - * @config{ - * ),,} + * @config{ dump_version, open a version cursor\, which is a debug + * cursor on a table that enables iteration through the history of values for a given key., + * a boolean flag; default \c false.} + * @config{ release_evict, + * Configure the cursor to evict the page positioned on when the reset API is used., a + * boolean flag; default \c false.} + * @config{ ),,} * @config{dump, configure the cursor for dump format inputs and outputs: "hex" selects a * simple hexadecimal format\, "json" selects a JSON format with each record formatted as * fields named by column names if available\, "pretty" selects a human-readable format diff --git a/src/third_party/wiredtiger/src/include/wt_internal.h b/src/third_party/wiredtiger/src/include/wt_internal.h index 22724bc2be3..4d9f35a68f2 100644 --- a/src/third_party/wiredtiger/src/include/wt_internal.h +++ b/src/third_party/wiredtiger/src/include/wt_internal.h @@ -177,6 +177,8 @@ struct __wt_cursor_stat; typedef struct __wt_cursor_stat WT_CURSOR_STAT; struct __wt_cursor_table; typedef struct __wt_cursor_table WT_CURSOR_TABLE; +struct __wt_cursor_version; +typedef struct __wt_cursor_version WT_CURSOR_VERSION; struct __wt_data_handle; typedef struct __wt_data_handle WT_DATA_HANDLE; struct __wt_data_handle_cache; diff --git a/src/third_party/wiredtiger/src/session/session_api.c b/src/third_party/wiredtiger/src/session/session_api.c index 199dcf21013..df63a3e44c2 100644 --- a/src/third_party/wiredtiger/src/session/session_api.c +++ b/src/third_party/wiredtiger/src/session/session_api.c @@ -443,6 +443,7 @@ __session_open_cursor_int(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR * WT_CURSOR *other, const char *cfg[], uint64_t hash_value, WT_CURSOR **cursorp) { WT_COLGROUP *colgroup; + WT_CONFIG_ITEM cval; WT_DATA_SOURCE *dsrc; WT_DECL_RET; @@ -494,8 +495,19 @@ __session_open_cursor_int(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR * * Less common cursor types. */ case 'f': - if (WT_PREFIX_MATCH(uri, "file:")) - WT_RET(__wt_curfile_open(session, uri, owner, cfg, cursorp)); + if (WT_PREFIX_MATCH(uri, "file:")) { + /* + * Open a version cursor instead of a table cursor if we are using the special debug + * configuration. + */ + if ((ret = __wt_config_gets_def(session, cfg, "debug.dump_version", 0, &cval)) == 0 && + cval.val) { + if (WT_STREQ(uri, WT_HS_URI)) + WT_RET_MSG(session, EINVAL, "cannot open version cursor on the history store"); + WT_RET(__wt_curversion_open(session, uri, owner, cfg, cursorp)); + } else + WT_RET(__wt_curfile_open(session, uri, owner, cfg, cursorp)); + } break; case 'm': if (WT_PREFIX_MATCH(uri, WT_METADATA_URI)) diff --git a/src/third_party/wiredtiger/test/suite/test_cursor18.py b/src/third_party/wiredtiger/test/suite/test_cursor18.py new file mode 100644 index 00000000000..cfc56b32d6a --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_cursor18.py @@ -0,0 +1,438 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# test_cursor18.py +# Test version cursor under various scenarios. +# +import wttest +import wiredtiger +from wtscenario import make_scenarios + +WT_TS_MAX = 2**64-1 + +class test_cursor18(wttest.WiredTigerTestCase): + uri = 'file:test_cursor18.wt' + + types = [ + ('row', dict(keyformat='i', valueformat='i')), + ('var', dict(keyformat='r', valueformat='i')), + # WiredTiger 5.0 didn't support timestamps on fixed length column stores + # ('fix', dict(keyformat='r', valueformat='8t')), + ] + + scenarios = make_scenarios(types) + + def create(self): + self.session.create(self.uri, 'key_format={},value_format={}'.format(self.keyformat, self.valueformat)) + + def verify_value(self, version_cursor, expected_start_ts, expected_start_durable_ts, expected_stop_ts, expected_stop_durable_ts, expected_type, expected_prepare_state, expected_flags, expected_location, expected_value): + values = version_cursor.get_values() + # Ignore the transaction ids from the value in the verification + self.assertEquals(values[1], expected_start_ts) + self.assertEquals(values[2], expected_start_durable_ts) + self.assertEquals(values[4], expected_stop_ts) + self.assertEquals(values[5], expected_stop_durable_ts) + self.assertEquals(values[6], expected_type) + self.assertEquals(values[7], expected_prepare_state) + self.assertEquals(values[8], expected_flags) + self.assertEquals(values[9], expected_location) + self.assertEquals(values[10], expected_value) + + def test_update_chain_only(self): + self.create() + + cursor = self.session.open_cursor(self.uri, None) + # Add a value to the update chain + self.session.begin_transaction() + cursor[1] = 0 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(1)) + + # Update the value + self.session.begin_transaction() + cursor[1] = 1 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(5)) + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 5, 5, WT_TS_MAX, WT_TS_MAX, 3, 0, 0, 0, 1) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, 5, 5, 3, 0, 0, 0, 0) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) + + def test_ondisk_only(self): + self.create() + + cursor = self.session.open_cursor(self.uri, None) + # Add a value to the update chain + self.session.begin_transaction() + cursor[1] = 0 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(1)) + + evict_cursor = self.session.open_cursor(self.uri, None, "debug=(release_evict)") + self.session.begin_transaction() + self.assertEquals(evict_cursor[1], 0) + evict_cursor.reset() + self.session.rollback_transaction() + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, WT_TS_MAX, WT_TS_MAX, 3, 0, 0, 1, 0) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) + + def test_ondisk_only_with_deletion(self): + self.create() + + cursor = self.session.open_cursor(self.uri, None) + # Add a value to the update chain + self.session.begin_transaction() + cursor[1] = 0 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(1)) + + # Delete the value + self.session.begin_transaction() + cursor.set_key(1) + self.assertEquals(cursor.remove(), 0) + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(5)) + + evict_cursor = self.session.open_cursor(self.uri, None, "debug=(release_evict)") + self.session.begin_transaction() + evict_cursor.set_key(1) + if self.valueformat == '8t': + self.assertEquals(evict_cursor.search(), 0) + else: + self.assertEquals(evict_cursor.search(), wiredtiger.WT_NOTFOUND) + evict_cursor.reset() + self.session.rollback_transaction() + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, 5, 5, 3, 0, 0, 1, 0) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) + + def test_ondisk_with_deletion_on_update_chain(self): + self.create() + + cursor = self.session.open_cursor(self.uri, None) + # Add a value to the update chain + self.session.begin_transaction() + cursor[1] = 0 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(1)) + + evict_cursor = self.session.open_cursor(self.uri, None, "debug=(release_evict)") + self.session.begin_transaction() + self.assertEquals(evict_cursor[1], 0) + evict_cursor.reset() + self.session.rollback_transaction() + + # Delete the value + self.session.begin_transaction() + cursor.set_key(1) + self.assertEquals(cursor.remove(), 0) + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(5)) + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, 5, 5, 3, 0, 0, 1, 0) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) + + def test_ondisk_with_hs(self): + self.create() + + cursor = self.session.open_cursor(self.uri, None) + # Add a value to the update chain + self.session.begin_transaction() + cursor[1] = 0 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(1)) + + # Update the value + self.session.begin_transaction() + cursor[1] = 1 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(5)) + + evict_cursor = self.session.open_cursor(self.uri, None, "debug=(release_evict)") + self.session.begin_transaction() + self.assertEquals(evict_cursor[1], 1) + evict_cursor.reset() + self.session.rollback_transaction() + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 5, 5, WT_TS_MAX, WT_TS_MAX, 3, 0, 0, 1, 1) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, 5, 5, 3, 0, 0, 2, 0) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) + + def test_update_chain_ondisk_hs(self): + self.create() + + cursor = self.session.open_cursor(self.uri, None) + # Add a value to the update chain + self.session.begin_transaction() + cursor[1] = 0 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(1)) + + # Update the value + self.session.begin_transaction() + cursor[1] = 1 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(5)) + + evict_cursor = self.session.open_cursor(self.uri, None, "debug=(release_evict)") + self.session.begin_transaction() + self.assertEquals(evict_cursor[1], 1) + evict_cursor.reset() + self.session.rollback_transaction() + + # Update the value + self.session.begin_transaction() + cursor[1] = 2 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(10)) + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 10, 10, WT_TS_MAX, WT_TS_MAX, 3, 0, 0, 0, 2) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 5, 5, 10, 10, 3, 0, 0, 1, 1) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, 5, 5, 3, 0, 0, 2, 0) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) + self.session.rollback_transaction() + + # We want to test that the version cursor is working as expected when used multiple times + # on the same table on different keys. + self.session.begin_transaction() + cursor[2] = 1 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(11)) + + self.session.begin_transaction() + cursor[2] = 2 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(15)) + + self.session.begin_transaction() + self.assertEquals(evict_cursor[2], 2) + evict_cursor.reset() + self.session.rollback_transaction() + + self.session.begin_transaction() + cursor[2] = 3 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(20)) + + # Ensure that we are able to correctly traverse all versions of this new key. + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor.set_key(2) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 2) + self.verify_value(version_cursor, 20, 20, WT_TS_MAX, WT_TS_MAX, 3, 0, 0, 0, 3) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 2) + self.verify_value(version_cursor, 15, 15, 20, 20, 3, 0, 0, 1, 2) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 2) + self.verify_value(version_cursor, 11, 11, 15, 15, 3, 0, 0, 2, 1) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) + + def test_prepare(self): + self.create() + + session2 = self.conn.open_session() + cursor = session2.open_cursor(self.uri, None) + # Add a value to the update chain + session2.begin_transaction() + cursor[1] = 0 + session2.prepare_transaction("prepare_timestamp=" + self.timestamp_str(1)) + + evict_cursor = self.session.open_cursor(self.uri, None, "debug=(release_evict)") + self.session.begin_transaction() + evict_cursor.set_key(1) + try: + evict_cursor.search() + except wiredtiger.WiredTigerError as e: + if wiredtiger.wiredtiger_strerror(wiredtiger.WT_PREPARE_CONFLICT) not in str(e): + raise e + evict_cursor.reset() + self.session.rollback_transaction() + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 0, WT_TS_MAX, WT_TS_MAX, 3, 1, 8, 0, 0) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, 1, 0, 3, 1, 0, 1, 0) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) + + def test_reuse_version_cursor(self): + self.create() + + cursor = self.session.open_cursor(self.uri, None) + # Add a value to the update chain + self.session.begin_transaction() + cursor[1] = 0 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(1)) + + evict_cursor = self.session.open_cursor(self.uri, None, "debug=(release_evict)") + self.session.begin_transaction() + self.assertEquals(evict_cursor[1], 0) + evict_cursor.reset() + self.session.rollback_transaction() + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, WT_TS_MAX, WT_TS_MAX, 3, 0, 0, 1, 0) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) + + # Repeat after reset + version_cursor.reset() + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, WT_TS_MAX, WT_TS_MAX, 3, 0, 0, 1, 0) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) + + def test_prepare_tombstone(self): + self.create() + + cursor = self.session.open_cursor(self.uri, None) + # Add a value to the update chain + self.session.begin_transaction() + cursor[1] = 0 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(1)) + + evict_cursor = self.session.open_cursor(self.uri, None, "debug=(release_evict)") + self.session.begin_transaction() + self.assertEquals(evict_cursor[1], 0) + evict_cursor.reset() + self.session.rollback_transaction() + + # Delete the value with prepare + session2 = self.conn.open_session() + cursor2 = session2.open_cursor(self.uri, None) + # Add a value to the update chain + session2.begin_transaction() + cursor2.set_key(1) + self.assertEquals(cursor2.remove(), 0) + session2.prepare_transaction("prepare_timestamp=" + self.timestamp_str(2)) + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, 2, 0, 3, 1, 0, 1, 0) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) + + def test_search_when_positioned(self): + self.create() + + cursor = self.session.open_cursor(self.uri, None) + # Add a value to the update chain + self.session.begin_transaction() + cursor[1] = 0 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(1)) + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + try: + version_cursor.search() + except wiredtiger.WiredTigerError as e: + gotException = True + self.pr('got expected exception: ' + str(e)) + self.assertTrue(str(e).find('WT_ROLLBACK') >= 0) + self.assertTrue(gotException, msg = 'expected exception') + + def test_concurrent_insert(self): + self.create() + + cursor = self.session.open_cursor(self.uri, None) + # Add a value to the update chain + self.session.begin_transaction() + cursor[1] = 0 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(1)) + + # Update the value + self.session.begin_transaction() + cursor[1] = 1 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(5)) + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 5, 5, WT_TS_MAX, WT_TS_MAX, 3, 0, 0, 0, 1) + + # Update the value + session2 = self.conn.open_session() + cursor2 = session2.open_cursor(self.uri, None) + session2.begin_transaction() + cursor2[1] = 2 + session2.commit_transaction("commit_timestamp=" + self.timestamp_str(5)) + + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, 5, 5, 3, 0, 0, 0, 0) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) diff --git a/src/third_party/wiredtiger/test/suite/test_cursor19.py b/src/third_party/wiredtiger/test/suite/test_cursor19.py new file mode 100644 index 00000000000..dd3c0164451 --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_cursor19.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# test_cursor19.py +# Test version cursor for modifies. +# +import wttest +import wiredtiger +from wtscenario import make_scenarios + +WT_TS_MAX = 2**64-1 + +class test_cursor19(wttest.WiredTigerTestCase): + uri = 'file:test_cursor19.wt' + + types = [ + ('row', dict(keyformat='i')), + ('var', dict(keyformat='r')) + ] + + scenarios = make_scenarios(types) + + def create(self): + self.session.create(self.uri, 'key_format={},value_format=S'.format(self.keyformat)) + + def verify_value(self, version_cursor, expected_start_ts, expected_start_durable_ts, expected_stop_ts, expected_stop_durable_ts, expected_type, expected_prepare_state, expected_flags, expected_location, expected_value): + values = version_cursor.get_values() + # Ignore the transaction ids from the value in the verification + self.assertEquals(values[1], expected_start_ts) + self.assertEquals(values[2], expected_start_durable_ts) + self.assertEquals(values[4], expected_stop_ts) + self.assertEquals(values[5], expected_stop_durable_ts) + self.assertEquals(values[6], expected_type) + self.assertEquals(values[7], expected_prepare_state) + self.assertEquals(values[8], expected_flags) + self.assertEquals(values[9], expected_location) + self.assertEquals(values[10], expected_value) + + def test_modify(self): + self.create() + + value1 = "a" * 100 + value2 = "b" + "a" * 99 + value3 = "c" + "a" * 99 + value4 = "d" + "a" * 99 + value5 = "e" + "a" * 99 + value6 = "f" + "a" * 99 + cursor = self.session.open_cursor(self.uri, None) + # Add a value to the update chain + self.session.begin_transaction() + cursor[1] = value1 + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(1)) + + # Modify the value + self.session.begin_transaction() + cursor.set_key(1) + mods = [wiredtiger.Modify("b", 0, 1)] + self.assertEquals(cursor.modify(mods), 0) + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(5)) + + # Modify the value + self.session.begin_transaction() + cursor.set_key(1) + mods = [wiredtiger.Modify("c", 0, 1)] + self.assertEquals(cursor.modify(mods), 0) + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(10)) + + # Modify the value + self.session.begin_transaction() + cursor.set_key(1) + mods = [wiredtiger.Modify("d", 0, 1)] + self.assertEquals(cursor.modify(mods), 0) + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(15)) + + cursor.reset() + + evict_cursor = self.session.open_cursor(self.uri, None, "debug=(release_evict)") + self.session.begin_transaction() + evict_cursor.set_key(1) + self.assertEquals(evict_cursor.search(), 0) + evict_cursor.reset() + self.session.rollback_transaction() + + # Modify the value + self.session.begin_transaction() + cursor.set_key(1) + mods = [wiredtiger.Modify("e", 0, 1)] + self.assertEquals(cursor.modify(mods), 0) + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(20)) + + # Modify the value + self.session.begin_transaction() + cursor.set_key(1) + mods = [wiredtiger.Modify("f", 0, 1)] + self.assertEquals(cursor.modify(mods), 0) + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(25)) + + # Delete the key + self.session.begin_transaction() + cursor.set_key(1) + self.assertEquals(cursor.remove(), 0) + self.session.commit_transaction("commit_timestamp=" + self.timestamp_str(30)) + + # Open a version cursor + self.session.begin_transaction("read_timestamp=" + self.timestamp_str(1)) + version_cursor = self.session.open_cursor(self.uri, None, "debug=(dump_version=true)") + version_cursor.set_key(1) + self.assertEquals(version_cursor.search(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 25, 25, 30, 30, 1, 0, 0, 0, value6) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 20, 20, 25, 25, 1, 0, 0, 0, value5) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 15, 15, 20, 20, 3, 0, 0, 1, value4) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 10, 10, 15, 15, 3, 0, 0, 2, value3) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 5, 5, 10, 10, 1, 0, 0, 2, value2) + self.assertEquals(version_cursor.next(), 0) + self.assertEquals(version_cursor.get_key(), 1) + self.verify_value(version_cursor, 1, 1, 5, 5, 1, 0, 0, 2, value1) + self.assertEquals(version_cursor.next(), wiredtiger.WT_NOTFOUND) |