diff options
Diffstat (limited to 'src/third_party/wiredtiger/src/txn/txn.c')
-rw-r--r-- | src/third_party/wiredtiger/src/txn/txn.c | 157 |
1 files changed, 66 insertions, 91 deletions
diff --git a/src/third_party/wiredtiger/src/txn/txn.c b/src/third_party/wiredtiger/src/txn/txn.c index 5beb059c12b..6de0205e967 100644 --- a/src/third_party/wiredtiger/src/txn/txn.c +++ b/src/third_party/wiredtiger/src/txn/txn.c @@ -719,7 +719,7 @@ __wt_txn_release(WT_SESSION_IMPL *session) static int __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE *page, WT_UPDATE *chain, bool commit, WT_UPDATE **fix_updp, bool *upd_appended, - bool first_committed_upd_in_hs) + WT_UPDATE *first_committed_upd, bool first_committed_upd_in_hs) { WT_DECL_ITEM(hs_value); WT_DECL_RET; @@ -732,6 +732,7 @@ __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE * WT_ASSERT(session, chain != NULL); + hs_tw = NULL; *fix_updp = NULL; *upd_appended = false; size = total_size = 0; @@ -752,12 +753,16 @@ __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE * if (hs_stop_durable_ts != WT_TS_MAX && commit) goto done; - __wt_hs_upd_time_window(hs_cursor, &hs_tw); - WT_ERR(__wt_upd_alloc(session, hs_value, WT_UPDATE_STANDARD, &upd, &size)); - upd->txnid = hs_tw->start_txn; - upd->durable_ts = hs_tw->durable_start_ts; - upd->start_ts = hs_tw->start_ts; - *fix_updp = upd; + if (!first_committed_upd_in_hs) { + __wt_hs_upd_time_window(hs_cursor, &hs_tw); + WT_ERR(__wt_upd_alloc(session, hs_value, WT_UPDATE_STANDARD, &upd, &size)); + upd->txnid = hs_tw->start_txn; + upd->durable_ts = hs_tw->durable_start_ts; + upd->start_ts = hs_tw->start_ts; + if (commit) + *fix_updp = upd; + } else if (commit) + *fix_updp = first_committed_upd; /* * When the prepared update is getting committed or the history store update is still on the @@ -770,7 +775,7 @@ __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE * * Set the flag to indicate that this update has been restored from history store for the * rollback of a prepared transaction. */ - F_SET(upd, WT_UPDATE_RESTORED_FROM_HS); + F_SET(upd, WT_UPDATE_RESTORED_FROM_HS | WT_UPDATE_TO_DELETE_FROM_HS); total_size += size; __wt_verbose(session, WT_VERB_TRANSACTION, @@ -790,7 +795,7 @@ __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE * * Set the flag to indicate that this update has been restored from history store for the * rollback of a prepared transaction. */ - F_SET(tombstone, WT_UPDATE_RESTORED_FROM_HS); + F_SET(tombstone, WT_UPDATE_RESTORED_FROM_HS | WT_UPDATE_TO_DELETE_FROM_HS); total_size += size; __wt_verbose(session, WT_VERB_TRANSACTION, @@ -814,7 +819,6 @@ __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE * WT_PUBLISH(chain->next, upd); *upd_appended = true; - *fix_updp = upd; __wt_cache_page_inmem_incr(session, page, total_size); if (0) { @@ -909,18 +913,15 @@ __txn_timestamp_usage_check(WT_SESSION_IMPL *session, WT_TXN_OP *op, WT_UPDATE * /* * __txn_fixup_prepared_update -- - * Fix/restore the history store update of a prepared datastore update based on transaction - * status. + * Fix the history store record with the max stop time point if we commit the prepared update. */ static int -__txn_fixup_prepared_update( - WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_UPDATE *fix_upd, bool commit) +__txn_fixup_prepared_update(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_UPDATE *fix_upd) { WT_DECL_RET; WT_ITEM hs_value; WT_TIME_WINDOW tw; WT_TXN *txn; - WT_TXN_GLOBAL *txn_global; uint32_t txn_flags; #ifdef HAVE_DIAGNOSTIC uint64_t hs_upd_type; @@ -928,12 +929,14 @@ __txn_fixup_prepared_update( #endif txn = session->txn; - txn_global = &S2C(session)->txn_global; WT_TIME_WINDOW_INIT(&tw); + /* We should not fix a history store record already with a valid tombstone. */ + WT_ASSERT(session, fix_upd->type != WT_UPDATE_TOMBSTONE); + /* - * Transaction error and prepare are cleared temporarily as cursor functions are not allowed - * after an error or a prepared transaction. + * Transaction error is cleared temporarily as cursor functions are not allowed after an error + * or a prepared transaction. */ txn_flags = FLD_MASK(txn->flags, WT_TXN_ERROR); F_CLR(txn, txn_flags); @@ -947,75 +950,28 @@ __txn_fixup_prepared_update( */ F_SET(txn, WT_TXN_PREPARE_IGNORE_API_CHECK); - /* - * If the history update already has a stop time point and we are committing the prepared update - * there is no work to do. - */ - if (commit) { - tw.stop_ts = txn->commit_timestamp; - tw.durable_stop_ts = txn->durable_timestamp; - tw.stop_txn = txn->id; - WT_TIME_WINDOW_SET_START(&tw, fix_upd); + tw.stop_ts = txn->commit_timestamp; + tw.durable_stop_ts = txn->durable_timestamp; + tw.stop_txn = txn->id; + WT_TIME_WINDOW_SET_START(&tw, fix_upd); #ifdef HAVE_DIAGNOSTIC - /* Retrieve the existing update value and stop timestamp. */ - WT_ERR(hs_cursor->get_value( - hs_cursor, &hs_stop_durable_ts, &hs_durable_ts, &hs_upd_type, &hs_value)); - WT_ASSERT(session, hs_stop_durable_ts == WT_TS_MAX); - WT_ASSERT(session, (uint8_t)hs_upd_type == WT_UPDATE_STANDARD); + /* Retrieve the existing update value and stop timestamp. */ + WT_ERR(hs_cursor->get_value( + hs_cursor, &hs_stop_durable_ts, &hs_durable_ts, &hs_upd_type, &hs_value)); + WT_ASSERT(session, hs_stop_durable_ts == WT_TS_MAX); + WT_ASSERT(session, (uint8_t)hs_upd_type == WT_UPDATE_STANDARD); #endif - /* - * We need to update the stop durable timestamp stored in the history store value. - * - * Pack the value using cursor api. - */ - hs_value.data = fix_upd->data; - hs_value.size = fix_upd->size; - hs_cursor->set_value(hs_cursor, &tw, tw.durable_stop_ts, tw.durable_start_ts, - (uint64_t)WT_UPDATE_STANDARD, &hs_value); - WT_ERR(hs_cursor->update(hs_cursor)); - } else { - /* - * Remove the history store entry if a checkpoint is not running, otherwise place a - * tombstone in front of the history store entry if it doesn't have a stop timestamp. - */ - if (txn_global->checkpoint_running) { - /* Don't update the history store entry if the entry already has a stop timestamp. */ - if (fix_upd->type != WT_UPDATE_TOMBSTONE) { - /* - * When the history store's update start transaction id is greater than the - * checkpoint's reserved transaction id, the durable timestamp of this update is - * guaranteed to be greater than the checkpoint timestamp, as such there is no need - * to save this unstable update in the history store. - */ - if (fix_upd->txnid > txn_global->checkpoint_reserved_txn_id) - WT_ERR(hs_cursor->remove(hs_cursor)); - else { - tw.durable_stop_ts = fix_upd->durable_ts; - tw.stop_ts = fix_upd->start_ts; - - /* - * Set the stop transaction id of the time window to the checkpoint reserved - * transaction id. As such the tombstone won't be visible to rollback to stable, - * additionally checkpoint garbage collection cannot clean it up as it greater - * than the globally visible transaction id. - */ - tw.stop_txn = txn_global->checkpoint_reserved_txn_id; - WT_TIME_WINDOW_SET_START(&tw, fix_upd); - - hs_value.data = fix_upd->data; - hs_value.size = fix_upd->size; - hs_cursor->set_value(hs_cursor, &tw, tw.durable_stop_ts, tw.durable_start_ts, - (uint64_t)WT_UPDATE_STANDARD, &hs_value); - WT_ERR(hs_cursor->update(hs_cursor)); - WT_STAT_CONN_INCR( - session, txn_prepare_rollback_fix_hs_update_with_ckpt_reserved_txnid); - } - } else - WT_STAT_CONN_INCR(session, txn_prepare_rollback_do_not_remove_hs_update); - } else - WT_ERR(hs_cursor->remove(hs_cursor)); - } + /* + * We need to update the stop durable timestamp stored in the history store value. + * + * Pack the value using cursor api. + */ + hs_value.data = fix_upd->data; + hs_value.size = fix_upd->size; + hs_cursor->set_value(hs_cursor, &tw, tw.durable_stop_ts, tw.durable_start_ts, + (uint64_t)WT_UPDATE_STANDARD, &hs_value); + WT_ERR(hs_cursor->update(hs_cursor)); err: F_SET(txn, txn_flags); @@ -1197,7 +1153,7 @@ __txn_resolve_prepared_op(WT_SESSION_IMPL *session, WT_TXN_OP *op, bool commit, WT_PAGE *page; WT_TIME_WINDOW tw; WT_TXN *txn; - WT_UPDATE *first_committed_upd, *fix_upd, *upd; + WT_UPDATE *first_committed_upd, *fix_upd, *upd, *upd_followed_tombstone; #ifdef HAVE_DIAGNOSTIC WT_UPDATE *head_upd; #endif @@ -1282,6 +1238,27 @@ __txn_resolve_prepared_op(WT_SESSION_IMPL *session, WT_TXN_OP *op, bool commit, upd->txnid == upd->next->txnid && upd->start_ts == upd->next->start_ts)); first_committed_upd_in_hs = first_committed_upd != NULL && F_ISSET(first_committed_upd, WT_UPDATE_HS); + + /* + * Marked the update older than the prepared update that is already in the history store to be + * deleted from the history store. + */ + if (first_committed_upd_in_hs && !commit) { + if (first_committed_upd->type == WT_UPDATE_TOMBSTONE) { + for (upd_followed_tombstone = first_committed_upd->next; upd_followed_tombstone != NULL; + upd_followed_tombstone = upd_followed_tombstone->next) + if (upd_followed_tombstone->txnid != WT_TXN_ABORTED) + break; + /* We may not find a full update following the tombstone if it is obsolete. */ + if (upd_followed_tombstone != NULL) { + WT_ASSERT(session, F_ISSET(upd_followed_tombstone, WT_UPDATE_HS)); + F_SET(first_committed_upd, WT_UPDATE_TO_DELETE_FROM_HS); + F_SET(upd_followed_tombstone, WT_UPDATE_TO_DELETE_FROM_HS); + } + } else + F_SET(first_committed_upd, WT_UPDATE_TO_DELETE_FROM_HS); + } + if (prepare_on_disk || first_committed_upd_in_hs) { btree = S2BT(session); @@ -1312,8 +1289,8 @@ __txn_resolve_prepared_op(WT_SESSION_IMPL *session, WT_TXN_OP *op, bool commit, */ WT_ERR(__txn_append_tombstone(session, op, cbt)); } else if (ret == 0) - WT_ERR(__txn_locate_hs_record( - session, hs_cursor, page, upd, commit, &fix_upd, &upd_appended, first_committed_upd)); + WT_ERR(__txn_locate_hs_record(session, hs_cursor, page, upd, commit, &fix_upd, + &upd_appended, first_committed_upd, first_committed_upd_in_hs)); else ret = 0; } else if (F_ISSET(S2C(session), WT_CONN_IN_MEMORY) && !commit && first_committed_upd == NULL) { @@ -1365,10 +1342,8 @@ __txn_resolve_prepared_op(WT_SESSION_IMPL *session, WT_TXN_OP *op, bool commit, * will be only one uncommitted prepared update. */ if (fix_upd != NULL) { - WT_ERR(__txn_fixup_prepared_update(session, hs_cursor, fix_upd, commit)); - /* Clear the WT_UPDATE_HS flag as we should have removed it from the history store. */ - if (first_committed_upd_in_hs && !commit) - F_CLR(first_committed_upd, WT_UPDATE_HS); + WT_ASSERT(session, commit && fix_upd->type != WT_UPDATE_TOMBSTONE); + WT_ERR(__txn_fixup_prepared_update(session, hs_cursor, fix_upd)); } prepare_verify: |