diff options
author | Luke Chen <luke.chen@mongodb.com> | 2021-02-01 17:52:01 +1100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-01 07:24:25 +0000 |
commit | de1a08ce57ff10e5b7c79448214b781e9ae5417e (patch) | |
tree | d0714d7913074e235f57b1e7cf53d8b5bb0a96c4 /src/third_party/wiredtiger | |
parent | d0e596d3822a67ee557fa59d66df215c7cb35d20 (diff) | |
download | mongo-de1a08ce57ff10e5b7c79448214b781e9ae5417e.tar.gz |
Import wiredtiger: d8cc45b419a54d6c23c2365de352e5ed396b2392 from branch mongodb-5.0
ref: 0459375c52..d8cc45b419
for: 4.9.0
WT-7117 RTS to skip modifies that are more than on-disk base update while restoring an update
Diffstat (limited to 'src/third_party/wiredtiger')
-rwxr-xr-x | src/third_party/wiredtiger/dist/s_void | 1 | ||||
-rw-r--r-- | src/third_party/wiredtiger/dist/stat_data.py | 1 | ||||
-rw-r--r-- | src/third_party/wiredtiger/import.data | 2 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/btree/bt_curnext.c | 8 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/btree/bt_curprev.c | 8 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/btree/bt_cursor.c | 2 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/history/hs_cursor.c | 31 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/include/extern.h | 7 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/include/stat.h | 2 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/include/txn_inline.h | 142 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/include/wiredtiger.in | 20 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/support/stat.c | 7 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c | 40 | ||||
-rwxr-xr-x | src/third_party/wiredtiger/test/suite/test_rollback_to_stable14.py | 304 |
14 files changed, 479 insertions, 96 deletions
diff --git a/src/third_party/wiredtiger/dist/s_void b/src/third_party/wiredtiger/dist/s_void index 5beea1cddc1..fca1ccc9810 100755 --- a/src/third_party/wiredtiger/dist/s_void +++ b/src/third_party/wiredtiger/dist/s_void @@ -78,6 +78,7 @@ func_ok() -e '/int __wt_stat_dsrc_desc$/d' \ -e '/int __wt_stat_join_desc$/d' \ -e '/int __wt_stat_session_desc/d' \ + -e '/int __wt_txn_read_upd_list$/d' \ -e '/int __wt_txn_rollback_required$/d' \ -e '/int __wt_win_directory_list_free$/d' \ -e '/int bdb_compare_reverse$/d' \ diff --git a/src/third_party/wiredtiger/dist/stat_data.py b/src/third_party/wiredtiger/dist/stat_data.py index e975a4f7e6f..3ccc3e0b57e 100644 --- a/src/third_party/wiredtiger/dist/stat_data.py +++ b/src/third_party/wiredtiger/dist/stat_data.py @@ -859,6 +859,7 @@ conn_dsrc_stats = [ TxnStat('txn_checkpoint_obsolete_applied', 'transaction checkpoints due to obsolete pages'), TxnStat('txn_read_race_prepare_update', 'race to read prepared update retry'), TxnStat('txn_rts_hs_removed', 'rollback to stable updates removed from history store'), + TxnStat('txn_rts_hs_restore_updates', 'rollback to stable restored updates from history store'), TxnStat('txn_rts_hs_restore_tombstones', 'rollback to stable restored tombstones from history store'), TxnStat('txn_rts_hs_stop_older_than_newer_start', 'rollback to stable hs records with stop timestamps older than newer records'), TxnStat('txn_rts_keys_removed', 'rollback to stable keys removed'), diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 87b83dbc105..0ca2360ea6a 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-5.0", - "commit": "0459375c522737a7a678971b89aaf3082d206e12" + "commit": "d8cc45b419a54d6c23c2365de352e5ed396b2392" } diff --git a/src/third_party/wiredtiger/src/btree/bt_curnext.c b/src/third_party/wiredtiger/src/btree/bt_curnext.c index 05394d0ae98..63d187b7442 100644 --- a/src/third_party/wiredtiger/src/btree/bt_curnext.c +++ b/src/third_party/wiredtiger/src/btree/bt_curnext.c @@ -57,7 +57,7 @@ __cursor_fix_append_next(WT_CURSOR_BTREE *cbt, bool newpage, bool restart) cbt->iface.value.data = &cbt->v; } else { restart_read: - WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd, NULL)); + WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd)); if (cbt->upd_value->type == WT_UPDATE_INVALID) { cbt->v = 0; cbt->iface.value.data = &cbt->v; @@ -157,7 +157,7 @@ new_page: __cursor_set_recno(cbt, WT_INSERT_RECNO(cbt->ins)); restart_read: - WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd, NULL)); + WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd)); if (cbt->upd_value->type == WT_UPDATE_INVALID) { ++*skippedp; @@ -232,7 +232,7 @@ restart_read: cbt->ins = __col_insert_search_match(cbt->ins_head, cbt->recno); __wt_upd_value_clear(cbt->upd_value); if (cbt->ins != NULL) - WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd, NULL)); + WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd)); if (cbt->upd_value->type != WT_UPDATE_INVALID) { if (cbt->upd_value->type == WT_UPDATE_TOMBSTONE) { if (cbt->upd_value->tw.stop_txn != WT_TXN_NONE && @@ -365,7 +365,7 @@ restart_read_insert: if ((ins = cbt->ins) != NULL) { key->data = WT_INSERT_KEY(ins); key->size = WT_INSERT_KEY_SIZE(ins); - WT_RET(__wt_txn_read_upd_list(session, cbt, ins->upd, NULL)); + WT_RET(__wt_txn_read_upd_list(session, cbt, ins->upd)); if (cbt->upd_value->type == WT_UPDATE_INVALID) { ++*skippedp; continue; diff --git a/src/third_party/wiredtiger/src/btree/bt_curprev.c b/src/third_party/wiredtiger/src/btree/bt_curprev.c index bb2c3a9e05c..7fe24232317 100644 --- a/src/third_party/wiredtiger/src/btree/bt_curprev.c +++ b/src/third_party/wiredtiger/src/btree/bt_curprev.c @@ -197,7 +197,7 @@ __cursor_fix_append_prev(WT_CURSOR_BTREE *cbt, bool newpage, bool restart) cbt->iface.value.data = &cbt->v; } else { restart_read: - WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd, NULL)); + WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd)); if (cbt->upd_value->type == WT_UPDATE_INVALID) { cbt->v = 0; cbt->iface.value.data = &cbt->v; @@ -297,7 +297,7 @@ new_page: __cursor_set_recno(cbt, WT_INSERT_RECNO(cbt->ins)); restart_read: - WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd, NULL)); + WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd)); if (cbt->upd_value->type == WT_UPDATE_INVALID) { ++*skippedp; continue; @@ -372,7 +372,7 @@ restart_read: cbt->ins = __col_insert_search_match(cbt->ins_head, cbt->recno); __wt_upd_value_clear(cbt->upd_value); if (cbt->ins != NULL) - WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd, NULL)); + WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd)); if (cbt->upd_value->type != WT_UPDATE_INVALID) { if (cbt->upd_value->type == WT_UPDATE_TOMBSTONE) { if (cbt->upd_value->tw.stop_txn != WT_TXN_NONE && @@ -514,7 +514,7 @@ restart_read_insert: if ((ins = cbt->ins) != NULL) { key->data = WT_INSERT_KEY(ins); key->size = WT_INSERT_KEY_SIZE(ins); - WT_RET(__wt_txn_read_upd_list(session, cbt, ins->upd, NULL)); + WT_RET(__wt_txn_read_upd_list(session, cbt, ins->upd)); if (cbt->upd_value->type == WT_UPDATE_INVALID) { ++*skippedp; continue; diff --git a/src/third_party/wiredtiger/src/btree/bt_cursor.c b/src/third_party/wiredtiger/src/btree/bt_cursor.c index fc7e542a12e..e4f538d80df 100644 --- a/src/third_party/wiredtiger/src/btree/bt_cursor.c +++ b/src/third_party/wiredtiger/src/btree/bt_cursor.c @@ -230,7 +230,7 @@ __wt_cursor_valid(WT_CURSOR_BTREE *cbt, WT_ITEM *key, uint64_t recno, bool *vali * update that's been deleted is not a valid key/value pair). */ if (cbt->ins != NULL) { - WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd, NULL)); + WT_RET(__wt_txn_read_upd_list(session, cbt, cbt->ins->upd)); if (cbt->upd_value->type != WT_UPDATE_INVALID) { if (cbt->upd_value->type == WT_UPDATE_TOMBSTONE) return (0); diff --git a/src/third_party/wiredtiger/src/history/hs_cursor.c b/src/third_party/wiredtiger/src/history/hs_cursor.c index 6480d014ecc..6baaf94847c 100644 --- a/src/third_party/wiredtiger/src/history/hs_cursor.c +++ b/src/third_party/wiredtiger/src/history/hs_cursor.c @@ -180,7 +180,7 @@ __wt_hs_cursor_position(WT_SESSION_IMPL *session, WT_CURSOR *cursor, uint32_t bt static int __hs_find_upd_int(WT_SESSION_IMPL *session, uint32_t btree_id, WT_ITEM *key, const char *value_format, uint64_t recno, WT_UPDATE_VALUE *upd_value, bool allow_prepare, - WT_ITEM *on_disk_buf) + WT_ITEM *base_value_buf) { WT_CURSOR *hs_cursor; WT_CURSOR_BTREE *hs_cbt; @@ -333,14 +333,14 @@ __hs_find_upd_int(WT_SESSION_IMPL *session, uint32_t btree_id, WT_ITEM *key, WT_ERR_NOTFOUND_OK(__wt_hs_cursor_next(session, hs_cursor), true); if (ret == WT_NOTFOUND) { /* - * Fallback to the onpage value as the base value. + * Fallback to the provided value as the base value. * * Work around of clang analyzer complaining the value is never read as it is reset * again by the following WT_ERR macro. */ WT_NOT_READ(ret, 0); orig_hs_value_buf = hs_value; - hs_value = on_disk_buf; + hs_value = base_value_buf; upd_type = WT_UPDATE_STANDARD; break; } @@ -356,9 +356,9 @@ __hs_find_upd_int(WT_SESSION_IMPL *session, uint32_t btree_id, WT_ITEM *key, hs_cursor, &hs_btree_id, &hs_key, &hs_start_ts_tmp, &hs_counter_tmp)); if (hs_btree_id != btree_id) { - /* Fallback to the onpage value as the base value. */ + /* Fallback to the provided value as the base value. */ orig_hs_value_buf = hs_value; - hs_value = on_disk_buf; + hs_value = base_value_buf; upd_type = WT_UPDATE_STANDARD; break; } @@ -366,9 +366,22 @@ __hs_find_upd_int(WT_SESSION_IMPL *session, uint32_t btree_id, WT_ITEM *key, WT_ERR(__wt_compare(session, NULL, &hs_key, key, &cmp)); if (cmp != 0) { - /* Fallback to the onpage value as the base value. */ + /* Fallback to the provided value as the base value. */ orig_hs_value_buf = hs_value; - hs_value = on_disk_buf; + hs_value = base_value_buf; + upd_type = WT_UPDATE_STANDARD; + break; + } + + /* + * If the stop time pair on the tombstone in the history store is already globally + * visible fall back to the base value. This is possible in scenarios where the latest + * updates are aborted by RTS according to stable timestamp. + */ + if (__wt_txn_tw_stop_visible_all(session, &hs_cbt->upd_value->tw)) { + /* Fallback to the provided value as the base value. */ + orig_hs_value_buf = hs_value; + hs_value = base_value_buf; upd_type = WT_UPDATE_STANDARD; break; } @@ -436,7 +449,7 @@ err: */ int __wt_hs_find_upd(WT_SESSION_IMPL *session, WT_ITEM *key, const char *value_format, uint64_t recno, - WT_UPDATE_VALUE *upd_value, bool allow_prepare, WT_ITEM *on_disk_buf) + WT_UPDATE_VALUE *upd_value, bool allow_prepare, WT_ITEM *base_value_buf) { WT_BTREE *btree; WT_DECL_RET; @@ -446,7 +459,7 @@ __wt_hs_find_upd(WT_SESSION_IMPL *session, WT_ITEM *key, const char *value_forma WT_RET(__wt_hs_cursor_open(session)); WT_WITH_BTREE(session, CUR2BT(session->hs_cursor), (ret = __hs_find_upd_int( - session, btree->id, key, value_format, recno, upd_value, allow_prepare, on_disk_buf))); + session, btree->id, key, value_format, recno, upd_value, allow_prepare, base_value_buf))); WT_TRET(__wt_hs_cursor_close(session)); return (ret); } diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h index 8c6b2d06c58..405ae73401b 100644 --- a/src/third_party/wiredtiger/src/include/extern.h +++ b/src/third_party/wiredtiger/src/include/extern.h @@ -763,7 +763,7 @@ extern int __wt_hs_delete_key_from_ts( WT_SESSION_IMPL *session, uint32_t btree_id, const WT_ITEM *key, wt_timestamp_t ts, bool reinsert) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_hs_find_upd(WT_SESSION_IMPL *session, WT_ITEM *key, const char *value_format, - uint64_t recno, WT_UPDATE_VALUE *upd_value, bool allow_prepare, WT_ITEM *on_disk_buf) + uint64_t recno, WT_UPDATE_VALUE *upd_value, bool allow_prepare, WT_ITEM *base_value_buf) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_hs_get_btree(WT_SESSION_IMPL *session, WT_BTREE **hs_btreep) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); @@ -2070,7 +2070,10 @@ static inline int __wt_txn_read(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, uint64_t recno, WT_UPDATE *upd, WT_CELL_UNPACK_KV *vpack) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); static inline int __wt_txn_read_upd_list(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, - WT_UPDATE *upd, WT_UPDATE **prepare_updp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); + WT_UPDATE *upd) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +static inline int __wt_txn_read_upd_list_internal(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, + WT_UPDATE *upd, WT_UPDATE **prepare_updp, WT_UPDATE **restored_updp) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); static inline int __wt_txn_search_check(WT_SESSION_IMPL *session) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); static inline int __wt_txn_update_check(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, diff --git a/src/third_party/wiredtiger/src/include/stat.h b/src/third_party/wiredtiger/src/include/stat.h index b93b545d766..3635dd41d1b 100644 --- a/src/third_party/wiredtiger/src/include/stat.h +++ b/src/third_party/wiredtiger/src/include/stat.h @@ -783,6 +783,7 @@ struct __wt_connection_stats { int64_t txn_rts_keys_removed; int64_t txn_rts_keys_restored; int64_t txn_rts_hs_restore_tombstones; + int64_t txn_rts_hs_restore_updates; int64_t txn_rts_sweep_hs_keys; int64_t txn_rts_hs_removed; int64_t txn_checkpoint_obsolete_applied; @@ -997,6 +998,7 @@ struct __wt_dsrc_stats { int64_t txn_rts_keys_removed; int64_t txn_rts_keys_restored; int64_t txn_rts_hs_restore_tombstones; + int64_t txn_rts_hs_restore_updates; int64_t txn_rts_sweep_hs_keys; int64_t txn_rts_hs_removed; int64_t txn_checkpoint_obsolete_applied; diff --git a/src/third_party/wiredtiger/src/include/txn_inline.h b/src/third_party/wiredtiger/src/include/txn_inline.h index ad0a87291e6..f4a4c552ddb 100644 --- a/src/third_party/wiredtiger/src/include/txn_inline.h +++ b/src/third_party/wiredtiger/src/include/txn_inline.h @@ -836,18 +836,21 @@ __wt_upd_alloc_tombstone(WT_SESSION_IMPL *session, WT_UPDATE **updp, size_t *siz } /* - * __wt_txn_read_upd_list -- - * Get the first visible update in a list (or NULL if none are visible). + * __wt_txn_read_upd_list_internal -- + * Internal helper function to get the first visible update in a list (or NULL if none are + * visible). */ static inline int -__wt_txn_read_upd_list( - WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, WT_UPDATE *upd, WT_UPDATE **prepare_updp) +__wt_txn_read_upd_list_internal(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, WT_UPDATE *upd, + WT_UPDATE **prepare_updp, WT_UPDATE **restored_updp) { WT_VISIBLE_TYPE upd_visible; uint8_t prepare_state, type; if (prepare_updp != NULL) *prepare_updp = NULL; + if (restored_updp != NULL) + *restored_updp = NULL; __wt_upd_value_clear(cbt->upd_value); for (; upd != NULL; upd = upd->next) { @@ -888,6 +891,16 @@ __wt_txn_read_upd_list( F_ISSET(upd, WT_UPDATE_PREPARE_RESTORED_FROM_DS)) *prepare_updp = upd; + /* + * Save the restored update to use it as base value update in case if we need to reach + * history store instead of on-disk value. + */ + if (restored_updp != NULL && F_ISSET(upd, WT_UPDATE_RESTORED_FROM_HS) && + type == WT_UPDATE_STANDARD) { + WT_ASSERT(session, *restored_updp == NULL); + *restored_updp = upd; + } + if (upd_visible == WT_VISIBLE_PREPARE) { /* Ignore the prepared update, if transaction configuration says so. */ if (F_ISSET(session->txn, WT_TXN_IGNORE_PREPARE)) @@ -916,6 +929,16 @@ __wt_txn_read_upd_list( } /* + * __wt_txn_read_upd_list -- + * Get the first visible update in a list (or NULL if none are visible). + */ +static inline int +__wt_txn_read_upd_list(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, WT_UPDATE *upd) +{ + return __wt_txn_read_upd_list_internal(session, cbt, upd, NULL, NULL); +} + +/* * __wt_txn_read -- * Get the first visible update in a chain. This function will first check the update list * supplied as a function argument. If there is no visible update, it will check the onpage @@ -927,14 +950,14 @@ __wt_txn_read(WT_SESSION_IMPL *session, WT_CURSOR_BTREE *cbt, WT_ITEM *key, uint WT_UPDATE *upd, WT_CELL_UNPACK_KV *vpack) { WT_TIME_WINDOW tw; - WT_UPDATE *prepare_upd; + WT_UPDATE *prepare_upd, *restored_upd; bool have_stop_tw, retry; - prepare_upd = NULL; + prepare_upd = restored_upd = NULL; retry = true; retry: - WT_RET(__wt_txn_read_upd_list(session, cbt, upd, &prepare_upd)); + WT_RET(__wt_txn_read_upd_list_internal(session, cbt, upd, &prepare_upd, &restored_upd)); if (WT_UPDATE_DATA_VALUE(cbt->upd_value) || (cbt->upd_value->type == WT_UPDATE_MODIFY && cbt->upd_value->skip_buf)) return (0); @@ -947,59 +970,70 @@ retry: } /* - * When we inspected the update list we may have seen a tombstone leaving us with a valid stop - * time window, we don't want to overwrite this stop time window. + * Skip retrieving the on-disk value when there exists a restored update from history store in + * the update list. Having a restored update as part of the update list indicates that the + * existing on-disk value is unstable. */ - have_stop_tw = WT_TIME_WINDOW_HAS_STOP(&cbt->upd_value->tw); - - /* Check the ondisk value. */ - if (vpack == NULL) { - WT_TIME_WINDOW_INIT(&tw); - WT_RET(__wt_value_return_buf(cbt, cbt->ref, &cbt->upd_value->buf, &tw)); + if (restored_upd != NULL) { + WT_ASSERT(session, !WT_IS_HS(session->dhandle)); + cbt->upd_value->buf.data = restored_upd->data; + cbt->upd_value->buf.size = restored_upd->size; } else { - WT_TIME_WINDOW_COPY(&tw, &vpack->tw); - cbt->upd_value->buf.data = vpack->data; - cbt->upd_value->buf.size = vpack->size; - } - - /* - * If the stop time point is set, that means that there is a tombstone at that time. If it is - * not prepared and it is visible to our txn it means we've just spotted a tombstone and should - * return "not found", except scanning the history store during rollback to stable and when we - * are told to ignore non-globally visible tombstones. - */ - if (!have_stop_tw && __wt_txn_tw_stop_visible(session, &tw) && - !F_ISSET(&cbt->iface, WT_CURSTD_IGNORE_TOMBSTONE)) { - cbt->upd_value->buf.data = NULL; - cbt->upd_value->buf.size = 0; - cbt->upd_value->tw.durable_stop_ts = tw.durable_stop_ts; - cbt->upd_value->tw.stop_ts = tw.stop_ts; - cbt->upd_value->tw.stop_txn = tw.stop_txn; - cbt->upd_value->tw.prepare = tw.prepare; - cbt->upd_value->type = WT_UPDATE_TOMBSTONE; - return (0); - } - - /* Store the stop time pair of the history store record that is returning. */ - if (!have_stop_tw && WT_TIME_WINDOW_HAS_STOP(&tw) && WT_IS_HS(session->dhandle)) { - cbt->upd_value->tw.durable_stop_ts = tw.durable_stop_ts; - cbt->upd_value->tw.stop_ts = tw.stop_ts; - cbt->upd_value->tw.stop_txn = tw.stop_txn; - cbt->upd_value->tw.prepare = tw.prepare; - } + /* + * When we inspected the update list we may have seen a tombstone leaving us with a valid + * stop time window, we don't want to overwrite this stop time window. + */ + have_stop_tw = WT_TIME_WINDOW_HAS_STOP(&cbt->upd_value->tw); + + /* Check the ondisk value. */ + if (vpack == NULL) { + WT_TIME_WINDOW_INIT(&tw); + WT_RET(__wt_value_return_buf(cbt, cbt->ref, &cbt->upd_value->buf, &tw)); + } else { + WT_TIME_WINDOW_COPY(&tw, &vpack->tw); + cbt->upd_value->buf.data = vpack->data; + cbt->upd_value->buf.size = vpack->size; + } - /* If the start time point is visible then we need to return the ondisk value. */ - if (WT_IS_HS(session->dhandle) || __wt_txn_tw_start_visible(session, &tw)) { - if (cbt->upd_value->skip_buf) { + /* + * If the stop time point is set, that means that there is a tombstone at that time. If it + * is not prepared and it is visible to our txn it means we've just spotted a tombstone and + * should return "not found", except scanning the history store during rollback to stable + * and when we are told to ignore non-globally visible tombstones. + */ + if (!have_stop_tw && __wt_txn_tw_stop_visible(session, &tw) && + !F_ISSET(&cbt->iface, WT_CURSTD_IGNORE_TOMBSTONE)) { cbt->upd_value->buf.data = NULL; cbt->upd_value->buf.size = 0; + cbt->upd_value->tw.durable_stop_ts = tw.durable_stop_ts; + cbt->upd_value->tw.stop_ts = tw.stop_ts; + cbt->upd_value->tw.stop_txn = tw.stop_txn; + cbt->upd_value->tw.prepare = tw.prepare; + cbt->upd_value->type = WT_UPDATE_TOMBSTONE; + return (0); + } + + /* Store the stop time pair of the history store record that is returning. */ + if (!have_stop_tw && WT_TIME_WINDOW_HAS_STOP(&tw) && WT_IS_HS(session->dhandle)) { + cbt->upd_value->tw.durable_stop_ts = tw.durable_stop_ts; + cbt->upd_value->tw.stop_ts = tw.stop_ts; + cbt->upd_value->tw.stop_txn = tw.stop_txn; + cbt->upd_value->tw.prepare = tw.prepare; + } + + /* If the start time point is visible then we need to return the ondisk value. */ + if (WT_IS_HS(session->dhandle) || __wt_txn_tw_start_visible(session, &tw)) { + if (cbt->upd_value->skip_buf) { + cbt->upd_value->buf.data = NULL; + cbt->upd_value->buf.size = 0; + } + cbt->upd_value->tw.durable_start_ts = tw.durable_start_ts; + cbt->upd_value->tw.start_ts = tw.start_ts; + cbt->upd_value->tw.start_txn = tw.start_txn; + cbt->upd_value->tw.prepare = tw.prepare; + cbt->upd_value->type = WT_UPDATE_STANDARD; + return (0); } - cbt->upd_value->tw.durable_start_ts = tw.durable_start_ts; - cbt->upd_value->tw.start_ts = tw.start_ts; - cbt->upd_value->tw.start_txn = tw.start_txn; - cbt->upd_value->tw.prepare = tw.prepare; - cbt->upd_value->type = WT_UPDATE_STANDARD; - return (0); } /* If there's no visible update in the update chain or ondisk, check the history store file. */ diff --git a/src/third_party/wiredtiger/src/include/wiredtiger.in b/src/third_party/wiredtiger/src/include/wiredtiger.in index 3fb4ac71456..8b760c6df42 100644 --- a/src/third_party/wiredtiger/src/include/wiredtiger.in +++ b/src/third_party/wiredtiger/src/include/wiredtiger.in @@ -5875,14 +5875,16 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection); #define WT_STAT_CONN_TXN_RTS_KEYS_RESTORED 1468 /*! transaction: rollback to stable restored tombstones from history store */ #define WT_STAT_CONN_TXN_RTS_HS_RESTORE_TOMBSTONES 1469 +/*! transaction: rollback to stable restored updates from history store */ +#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_UPDATES 1470 /*! transaction: rollback to stable sweeping history store keys */ -#define WT_STAT_CONN_TXN_RTS_SWEEP_HS_KEYS 1470 +#define WT_STAT_CONN_TXN_RTS_SWEEP_HS_KEYS 1471 /*! transaction: rollback to stable updates removed from history store */ -#define WT_STAT_CONN_TXN_RTS_HS_REMOVED 1471 +#define WT_STAT_CONN_TXN_RTS_HS_REMOVED 1472 /*! transaction: transaction checkpoints due to obsolete pages */ -#define WT_STAT_CONN_TXN_CHECKPOINT_OBSOLETE_APPLIED 1472 +#define WT_STAT_CONN_TXN_CHECKPOINT_OBSOLETE_APPLIED 1473 /*! transaction: update conflicts */ -#define WT_STAT_CONN_TXN_UPDATE_CONFLICT 1473 +#define WT_STAT_CONN_TXN_UPDATE_CONFLICT 1474 /*! * @} @@ -6486,14 +6488,16 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection); #define WT_STAT_DSRC_TXN_RTS_KEYS_RESTORED 2201 /*! transaction: rollback to stable restored tombstones from history store */ #define WT_STAT_DSRC_TXN_RTS_HS_RESTORE_TOMBSTONES 2202 +/*! transaction: rollback to stable restored updates from history store */ +#define WT_STAT_DSRC_TXN_RTS_HS_RESTORE_UPDATES 2203 /*! transaction: rollback to stable sweeping history store keys */ -#define WT_STAT_DSRC_TXN_RTS_SWEEP_HS_KEYS 2203 +#define WT_STAT_DSRC_TXN_RTS_SWEEP_HS_KEYS 2204 /*! transaction: rollback to stable updates removed from history store */ -#define WT_STAT_DSRC_TXN_RTS_HS_REMOVED 2204 +#define WT_STAT_DSRC_TXN_RTS_HS_REMOVED 2205 /*! transaction: transaction checkpoints due to obsolete pages */ -#define WT_STAT_DSRC_TXN_CHECKPOINT_OBSOLETE_APPLIED 2205 +#define WT_STAT_DSRC_TXN_CHECKPOINT_OBSOLETE_APPLIED 2206 /*! transaction: update conflicts */ -#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2206 +#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2207 /*! * @} diff --git a/src/third_party/wiredtiger/src/support/stat.c b/src/third_party/wiredtiger/src/support/stat.c index c143fdfe1b1..100fa7b3ed8 100644 --- a/src/third_party/wiredtiger/src/support/stat.c +++ b/src/third_party/wiredtiger/src/support/stat.c @@ -210,6 +210,7 @@ static const char *const __stats_dsrc_desc[] = { "transaction: rollback to stable keys removed", "transaction: rollback to stable keys restored", "transaction: rollback to stable restored tombstones from history store", + "transaction: rollback to stable restored updates from history store", "transaction: rollback to stable sweeping history store keys", "transaction: rollback to stable updates removed from history store", "transaction: transaction checkpoints due to obsolete pages", @@ -457,6 +458,7 @@ __wt_stat_dsrc_clear_single(WT_DSRC_STATS *stats) stats->txn_rts_keys_removed = 0; stats->txn_rts_keys_restored = 0; stats->txn_rts_hs_restore_tombstones = 0; + stats->txn_rts_hs_restore_updates = 0; stats->txn_rts_sweep_hs_keys = 0; stats->txn_rts_hs_removed = 0; stats->txn_checkpoint_obsolete_applied = 0; @@ -691,6 +693,7 @@ __wt_stat_dsrc_aggregate_single(WT_DSRC_STATS *from, WT_DSRC_STATS *to) to->txn_rts_keys_removed += from->txn_rts_keys_removed; to->txn_rts_keys_restored += from->txn_rts_keys_restored; to->txn_rts_hs_restore_tombstones += from->txn_rts_hs_restore_tombstones; + to->txn_rts_hs_restore_updates += from->txn_rts_hs_restore_updates; to->txn_rts_sweep_hs_keys += from->txn_rts_sweep_hs_keys; to->txn_rts_hs_removed += from->txn_rts_hs_removed; to->txn_checkpoint_obsolete_applied += from->txn_checkpoint_obsolete_applied; @@ -931,6 +934,7 @@ __wt_stat_dsrc_aggregate(WT_DSRC_STATS **from, WT_DSRC_STATS *to) to->txn_rts_keys_removed += WT_STAT_READ(from, txn_rts_keys_removed); to->txn_rts_keys_restored += WT_STAT_READ(from, txn_rts_keys_restored); to->txn_rts_hs_restore_tombstones += WT_STAT_READ(from, txn_rts_hs_restore_tombstones); + to->txn_rts_hs_restore_updates += WT_STAT_READ(from, txn_rts_hs_restore_updates); to->txn_rts_sweep_hs_keys += WT_STAT_READ(from, txn_rts_sweep_hs_keys); to->txn_rts_hs_removed += WT_STAT_READ(from, txn_rts_hs_removed); to->txn_checkpoint_obsolete_applied += WT_STAT_READ(from, txn_checkpoint_obsolete_applied); @@ -1420,6 +1424,7 @@ static const char *const __stats_connection_desc[] = { "transaction: rollback to stable keys removed", "transaction: rollback to stable keys restored", "transaction: rollback to stable restored tombstones from history store", + "transaction: rollback to stable restored updates from history store", "transaction: rollback to stable sweeping history store keys", "transaction: rollback to stable updates removed from history store", "transaction: transaction checkpoints due to obsolete pages", @@ -1934,6 +1939,7 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats) stats->txn_rts_keys_removed = 0; stats->txn_rts_keys_restored = 0; stats->txn_rts_hs_restore_tombstones = 0; + stats->txn_rts_hs_restore_updates = 0; stats->txn_rts_sweep_hs_keys = 0; stats->txn_rts_hs_removed = 0; stats->txn_checkpoint_obsolete_applied = 0; @@ -2459,6 +2465,7 @@ __wt_stat_connection_aggregate(WT_CONNECTION_STATS **from, WT_CONNECTION_STATS * to->txn_rts_keys_removed += WT_STAT_READ(from, txn_rts_keys_removed); to->txn_rts_keys_restored += WT_STAT_READ(from, txn_rts_keys_restored); to->txn_rts_hs_restore_tombstones += WT_STAT_READ(from, txn_rts_hs_restore_tombstones); + to->txn_rts_hs_restore_updates += WT_STAT_READ(from, txn_rts_hs_restore_updates); to->txn_rts_sweep_hs_keys += WT_STAT_READ(from, txn_rts_sweep_hs_keys); to->txn_rts_hs_removed += WT_STAT_READ(from, txn_rts_hs_removed); to->txn_checkpoint_obsolete_applied += WT_STAT_READ(from, txn_checkpoint_obsolete_applied); diff --git a/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c b/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c index b30cf03be69..9ad6b7abd6d 100644 --- a/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c +++ b/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c @@ -251,12 +251,22 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW WT_ERR(hs_cursor->get_value( hs_cursor, &hs_stop_durable_ts, &hs_durable_ts, &type_full, hs_value)); type = (uint8_t)type_full; - if (type == WT_UPDATE_MODIFY) - WT_ERR(__wt_modify_apply_item( - session, S2BT(session)->value_format, &full_value, hs_value->data)); - else { - WT_ASSERT(session, type == WT_UPDATE_STANDARD); - WT_ERR(__wt_buf_set(session, &full_value, hs_value->data, hs_value->size)); + + /* + * Do not include history store updates greater than on-disk data store version to construct + * a full update to restore. Comparing with timestamps here has no problem unlike in search + * flow where the timestamps may be reset during reconciliation. RTS detects an on-disk + * update is unstable based on the written proper timestamp, so comparing against it with + * history store shouldn't have any problem. + */ + if (hs_start_ts <= unpack->tw.start_ts) { + if (type == WT_UPDATE_MODIFY) + WT_ERR(__wt_modify_apply_item( + session, S2BT(session)->value_format, &full_value, hs_value->data)); + else { + WT_ASSERT(session, type == WT_UPDATE_STANDARD); + WT_ERR(__wt_buf_set(session, &full_value, hs_value->data, hs_value->size)); + } } /* @@ -280,9 +290,10 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW */ if (!replace && hs_stop_durable_ts <= rollback_timestamp) { __wt_verbose(session, WT_VERB_RECOVERY_RTS(session), - "history store update valid with stop timestamp: %s and stable timestamp: %s", + "history store update valid with stop timestamp: %s, stable timestamp: %s and type: " + "%" PRIu8, __wt_timestamp_to_string(hs_stop_durable_ts, ts_string[0]), - __wt_timestamp_to_string(rollback_timestamp, ts_string[1])); + __wt_timestamp_to_string(rollback_timestamp, ts_string[1]), type); break; } @@ -290,22 +301,23 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW if (hs_durable_ts <= rollback_timestamp) { __wt_verbose(session, WT_VERB_RECOVERY_RTS(session), "history store update valid with start timestamp: %s, durable timestamp: %s, stop " - "timestamp: %s and stable timestamp: %s", + "timestamp: %s, stable timestamp: %s and type: %" PRIu8, __wt_timestamp_to_string(hs_start_ts, ts_string[0]), __wt_timestamp_to_string(hs_durable_ts, ts_string[1]), __wt_timestamp_to_string(hs_stop_durable_ts, ts_string[2]), - __wt_timestamp_to_string(rollback_timestamp, ts_string[3])); + __wt_timestamp_to_string(rollback_timestamp, ts_string[3]), type); + WT_ASSERT(session, cbt->upd_value->tw.start_ts < unpack->tw.start_ts); valid_update_found = true; break; } __wt_verbose(session, WT_VERB_RECOVERY_RTS(session), "history store update aborted with start timestamp: %s, durable timestamp: %s, stop " - "timestamp: %s and stable timestamp: %s", + "timestamp: %s, stable timestamp: %s and type: %" PRIu8, __wt_timestamp_to_string(hs_start_ts, ts_string[0]), __wt_timestamp_to_string(hs_durable_ts, ts_string[1]), __wt_timestamp_to_string(hs_stop_durable_ts, ts_string[2]), - __wt_timestamp_to_string(rollback_timestamp, ts_string[3])); + __wt_timestamp_to_string(rollback_timestamp, ts_string[3]), type); /* * Start time point of the current record may be used as stop time point of the previous @@ -329,6 +341,7 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW * list. Otherwise remove the key by adding a tombstone. */ if (valid_update_found) { + WT_ASSERT(session, cbt->upd_value->tw.start_ts < unpack->tw.start_ts); WT_ERR(__wt_upd_alloc(session, &full_value, WT_UPDATE_STANDARD, &upd, NULL)); upd->txnid = cbt->upd_value->tw.start_txn; @@ -336,7 +349,7 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW upd->start_ts = cbt->upd_value->tw.start_ts; __wt_verbose(session, WT_VERB_RECOVERY_RTS(session), "update restored from history store (txnid: %" PRIu64 - ", start_ts: %s, durable_ts: %s", + ", start_ts: %s and durable_ts: %s", upd->txnid, __wt_timestamp_to_string(upd->start_ts, ts_string[0]), __wt_timestamp_to_string(upd->durable_ts, ts_string[1])); @@ -345,6 +358,7 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW * the rollback to stable operation. */ F_SET(upd, WT_UPDATE_RESTORED_FROM_HS); + WT_STAT_CONN_DATA_INCR(session, txn_rts_hs_restore_updates); /* * We have a tombstone on the original update chain and it is behind the stable diff --git a/src/third_party/wiredtiger/test/suite/test_rollback_to_stable14.py b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable14.py new file mode 100755 index 00000000000..ea88a33a066 --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable14.py @@ -0,0 +1,304 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2020 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import fnmatch, os, shutil, threading, time +from helper import copy_wiredtiger_home +from test_rollback_to_stable01 import test_rollback_to_stable_base +from wiredtiger import stat, wiredtiger_strerror, WiredTigerError, WT_ROLLBACK +from wtdataset import SimpleDataSet +from wtscenario import make_scenarios +from wtthread import checkpoint_thread, op_thread +from time import sleep + +def timestamp_str(t): + return '%x' % t + +def mod_val(value, char, location, nbytes=1): + return value[0:location] + char + value[location+nbytes:] + +def retry_rollback(self, name, txn_session, code): + retry_limit = 100 + retries = 0 + completed = False + saved_exception = None + while not completed and retries < retry_limit: + if retries != 0: + self.pr("Retrying operation for " + name) + if txn_session: + txn_session.rollback_transaction() + sleep(0.1) + if txn_session: + txn_session.begin_transaction('isolation=snapshot') + self.pr("Began new transaction for " + name) + try: + code() + completed = True + except WiredTigerError as e: + rollback_str = wiredtiger_strerror(WT_ROLLBACK) + if rollback_str not in str(e): + raise(e) + retries += 1 + saved_exception = e + if not completed and saved_exception: + raise(saved_exception) + +# test_rollback_to_stable14.py +# Test the rollback to stable operation uses proper base update while restoring modifies from history store. +class test_rollback_to_stable14(test_rollback_to_stable_base): + session_config = 'isolation=snapshot' + + prepare_values = [ + ('no_prepare', dict(prepare=False)), + ('prepare', dict(prepare=True)) + ] + + scenarios = make_scenarios(prepare_values) + + def conn_config(self): + config = 'cache_size=8MB,statistics=(all),statistics_log=(json,on_close,wait=1),log=(enabled=true),timing_stress_for_test=[history_store_checkpoint_delay]' + return config + + def simulate_crash_restart(self, olddir, newdir): + ''' Simulate a crash from olddir and restart in newdir. ''' + # with the connection still open, copy files to new directory + shutil.rmtree(newdir, ignore_errors=True) + os.mkdir(newdir) + for fname in os.listdir(olddir): + fullname = os.path.join(olddir, fname) + # Skip lock file on Windows since it is locked + if os.path.isfile(fullname) and \ + "WiredTiger.lock" not in fullname and \ + "Tmplog" not in fullname and \ + "Preplog" not in fullname: + shutil.copy(fullname, newdir) + # + # close the original connection and open to new directory + # NOTE: This really cannot test the difference between the + # write-no-sync (off) version of log_flush and the sync + # version since we're not crashing the system itself. + # + self.close_conn() + self.conn = self.setUpConnectionOpen(newdir) + self.session = self.setUpSessionOpen(self.conn) + + def test_rollback_to_stable(self): + nrows = 1500 + + # Create a table without logging. + self.pr("create/populate table") + uri = "table:rollback_to_stable14" + ds = SimpleDataSet( + self, uri, 0, key_format="i", value_format="S", config='log=(enabled=false)') + ds.populate() + + # Pin oldest and stable to timestamp 10. + self.conn.set_timestamp('oldest_timestamp=' + timestamp_str(10) + + ',stable_timestamp=' + timestamp_str(10)) + + value_a = "aaaaa" * 100 + + value_modQ = mod_val(value_a, 'Q', 0) + value_modR = mod_val(value_modQ, 'R', 1) + value_modS = mod_val(value_modR, 'S', 2) + value_modT = mod_val(value_modS, 'T', 3) + + # Perform a combination of modifies and updates. + self.pr("large updates and modifies") + self.large_updates(uri, value_a, ds, nrows, 20) + self.large_modifies(uri, 'Q', ds, 0, 1, nrows, 30) + self.large_modifies(uri, 'R', ds, 1, 1, nrows, 40) + self.large_modifies(uri, 'S', ds, 2, 1, nrows, 50) + self.large_modifies(uri, 'T', ds, 3, 1, nrows, 60) + + # Verify data is visible and correct. + self.check(value_a, uri, nrows, 20) + self.check(value_modQ, uri, nrows, 30) + self.check(value_modR, uri, nrows, 40) + self.check(value_modS, uri, nrows, 50) + self.check(value_modT, uri, nrows, 60) + + # Pin stable to timestamp 60 if prepare otherwise 50. + if self.prepare: + self.conn.set_timestamp('stable_timestamp=' + timestamp_str(60)) + else: + self.conn.set_timestamp('stable_timestamp=' + timestamp_str(50)) + + # Create a checkpoint thread + done = threading.Event() + ckpt = checkpoint_thread(self.conn, done) + try: + self.pr("start checkpoint") + ckpt.start() + + # Perform several modifies in parallel with checkpoint. + # Rollbacks may occur when checkpoint is running, so retry as needed. + self.pr("modifies") + retry_rollback(self, 'modify ds1, W', None, + lambda: self.large_modifies(uri, 'W', ds, 4, 1, nrows, 70)) + retry_rollback(self, 'modify ds1, X', None, + lambda: self.large_modifies(uri, 'X', ds, 5, 1, nrows, 80)) + retry_rollback(self, 'modify ds1, Y', None, + lambda: self.large_modifies(uri, 'Y', ds, 6, 1, nrows, 90)) + retry_rollback(self, 'modify ds1, Z', None, + lambda: self.large_modifies(uri, 'Z', ds, 7, 1, nrows, 100)) + finally: + done.set() + ckpt.join() + + # Simulate a server crash and restart. + self.pr("restart") + self.simulate_crash_restart(".", "RESTART") + self.pr("restart complete") + + stat_cursor = self.session.open_cursor('statistics:', None, None) + calls = stat_cursor[stat.conn.txn_rts][2] + hs_removed = stat_cursor[stat.conn.txn_rts_hs_removed][2] + hs_sweep = stat_cursor[stat.conn.txn_rts_sweep_hs_keys][2] + hs_restore_updates = stat_cursor[stat.conn.txn_rts_hs_restore_updates][2] + keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2] + keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2] + pages_visited = stat_cursor[stat.conn.txn_rts_pages_visited][2] + upd_aborted = stat_cursor[stat.conn.txn_rts_upd_aborted][2] + stat_cursor.close() + + self.assertEqual(calls, 0) + self.assertEqual(keys_removed, 0) + self.assertEqual(hs_restore_updates, nrows) + self.assertEqual(keys_restored, 0) + self.assertEqual(upd_aborted, 0) + self.assertGreater(pages_visited, 0) + self.assertGreaterEqual(hs_removed, nrows) + self.assertGreaterEqual(hs_sweep, 0) + + # Check that the correct data is seen at and after the stable timestamp. + self.check(value_a, uri, nrows, 20) + self.check(value_modQ, uri, nrows, 30) + self.check(value_modR, uri, nrows, 40) + self.check(value_modS, uri, nrows, 50) + + # The test may output the following message in eviction under cache pressure. Ignore that. + self.ignoreStdoutPatternIfExists("oldest pinned transaction ID rolled back for eviction") + + def test_rollback_to_stable_same_ts(self): + nrows = 1500 + + # Create a table without logging. + self.pr("create/populate table") + uri = "table:rollback_to_stable14" + ds = SimpleDataSet( + self, uri, 0, key_format="i", value_format="S", config='log=(enabled=false)') + ds.populate() + + # Pin oldest and stable to timestamp 10. + self.conn.set_timestamp('oldest_timestamp=' + timestamp_str(10) + + ',stable_timestamp=' + timestamp_str(10)) + + value_a = "aaaaa" * 100 + + value_modQ = mod_val(value_a, 'Q', 0) + value_modR = mod_val(value_modQ, 'R', 1) + value_modS = mod_val(value_modR, 'S', 2) + value_modT = mod_val(value_modS, 'T', 3) + + # Perform a combination of modifies and updates. + self.pr("large updates and modifies") + self.large_updates(uri, value_a, ds, nrows, 20) + self.large_modifies(uri, 'Q', ds, 0, 1, nrows, 30) + # prepare cannot use same timestamp always, so use a different timestamps that are aborted. + if self.prepare: + self.large_modifies(uri, 'R', ds, 1, 1, nrows, 51) + self.large_modifies(uri, 'S', ds, 2, 1, nrows, 55) + self.large_modifies(uri, 'T', ds, 3, 1, nrows, 60) + else: + self.large_modifies(uri, 'R', ds, 1, 1, nrows, 60) + self.large_modifies(uri, 'S', ds, 2, 1, nrows, 60) + self.large_modifies(uri, 'T', ds, 3, 1, nrows, 60) + + # Verify data is visible and correct. + self.check(value_a, uri, nrows, 20) + self.check(value_modQ, uri, nrows, 30) + self.check(value_modT, uri, nrows, 60) + + self.conn.set_timestamp('stable_timestamp=' + timestamp_str(50)) + + # Create a checkpoint thread + done = threading.Event() + ckpt = checkpoint_thread(self.conn, done) + try: + self.pr("start checkpoint") + ckpt.start() + + # Perform several modifies in parallel with checkpoint. + # Rollbacks may occur when checkpoint is running, so retry as needed. + self.pr("modifies") + retry_rollback(self, 'modify ds1, W', None, + lambda: self.large_modifies(uri, 'W', ds, 4, 1, nrows, 70)) + retry_rollback(self, 'modify ds1, X', None, + lambda: self.large_modifies(uri, 'X', ds, 5, 1, nrows, 80)) + retry_rollback(self, 'modify ds1, Y', None, + lambda: self.large_modifies(uri, 'Y', ds, 6, 1, nrows, 90)) + retry_rollback(self, 'modify ds1, Z', None, + lambda: self.large_modifies(uri, 'Z', ds, 7, 1, nrows, 100)) + finally: + done.set() + ckpt.join() + + # Simulate a server crash and restart. + self.pr("restart") + self.simulate_crash_restart(".", "RESTART") + self.pr("restart complete") + + stat_cursor = self.session.open_cursor('statistics:', None, None) + calls = stat_cursor[stat.conn.txn_rts][2] + hs_removed = stat_cursor[stat.conn.txn_rts_hs_removed][2] + hs_restore_updates = stat_cursor[stat.conn.txn_rts_hs_restore_updates][2] + hs_sweep = stat_cursor[stat.conn.txn_rts_sweep_hs_keys][2] + keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2] + keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2] + pages_visited = stat_cursor[stat.conn.txn_rts_pages_visited][2] + upd_aborted = stat_cursor[stat.conn.txn_rts_upd_aborted][2] + stat_cursor.close() + + self.assertEqual(calls, 0) + self.assertEqual(keys_removed, 0) + self.assertEqual(hs_restore_updates, nrows) + self.assertEqual(keys_restored, 0) + self.assertEqual(upd_aborted, 0) + self.assertGreater(pages_visited, 0) + self.assertGreaterEqual(hs_removed, nrows * 3) + self.assertGreaterEqual(hs_sweep, 0) + + # Check that the correct data is seen at and after the stable timestamp. + self.check(value_a, uri, nrows, 20) + self.check(value_modQ, uri, nrows, 30) + + # The test may output the following message in eviction under cache pressure. Ignore that. + self.ignoreStdoutPatternIfExists("oldest pinned transaction ID rolled back for eviction") + +if __name__ == '__main__': + wttest.run() |