diff options
Diffstat (limited to 'src/third_party/wiredtiger/src/txn/txn.c')
-rw-r--r-- | src/third_party/wiredtiger/src/txn/txn.c | 157 |
1 files changed, 91 insertions, 66 deletions
diff --git a/src/third_party/wiredtiger/src/txn/txn.c b/src/third_party/wiredtiger/src/txn/txn.c index cdc507ddfa5..8b77fcd00fa 100644 --- a/src/third_party/wiredtiger/src/txn/txn.c +++ b/src/third_party/wiredtiger/src/txn/txn.c @@ -772,7 +772,7 @@ __wt_txn_release(WT_SESSION_IMPL *session) static int __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE *page, WT_UPDATE *chain, bool commit, WT_UPDATE **fix_updp, bool *upd_appended, - WT_UPDATE *first_committed_upd, bool first_committed_upd_in_hs) + bool first_committed_upd_in_hs) { WT_DECL_ITEM(hs_value); WT_DECL_RET; @@ -785,7 +785,6 @@ __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE * WT_ASSERT(session, chain != NULL); - hs_tw = NULL; *fix_updp = NULL; *upd_appended = false; size = total_size = 0; @@ -806,16 +805,12 @@ __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE * if (hs_stop_durable_ts != WT_TS_MAX && commit) goto done; - if (!first_committed_upd_in_hs) { - __wt_hs_upd_time_window(hs_cursor, &hs_tw); - WT_ERR(__wt_upd_alloc(session, hs_value, WT_UPDATE_STANDARD, &upd, &size)); - upd->txnid = hs_tw->start_txn; - upd->durable_ts = hs_tw->durable_start_ts; - upd->start_ts = hs_tw->start_ts; - if (commit) - *fix_updp = upd; - } else if (commit) - *fix_updp = first_committed_upd; + __wt_hs_upd_time_window(hs_cursor, &hs_tw); + WT_ERR(__wt_upd_alloc(session, hs_value, WT_UPDATE_STANDARD, &upd, &size)); + upd->txnid = hs_tw->start_txn; + upd->durable_ts = hs_tw->durable_start_ts; + upd->start_ts = hs_tw->start_ts; + *fix_updp = upd; /* * When the prepared update is getting committed or the history store update is still on the @@ -828,7 +823,7 @@ __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE * * Set the flag to indicate that this update has been restored from history store for the * rollback of a prepared transaction. */ - F_SET(upd, WT_UPDATE_RESTORED_FROM_HS | WT_UPDATE_TO_DELETE_FROM_HS); + F_SET(upd, WT_UPDATE_RESTORED_FROM_HS); total_size += size; __wt_verbose(session, WT_VERB_TRANSACTION, @@ -848,7 +843,7 @@ __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE * * Set the flag to indicate that this update has been restored from history store for the * rollback of a prepared transaction. */ - F_SET(tombstone, WT_UPDATE_RESTORED_FROM_HS | WT_UPDATE_TO_DELETE_FROM_HS); + F_SET(tombstone, WT_UPDATE_RESTORED_FROM_HS); total_size += size; __wt_verbose(session, WT_VERB_TRANSACTION, @@ -872,6 +867,7 @@ __txn_locate_hs_record(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_PAGE * WT_PUBLISH(chain->next, upd); *upd_appended = true; + *fix_updp = upd; __wt_cache_page_inmem_incr(session, page, total_size); if (0) { @@ -994,15 +990,18 @@ __txn_commit_timestamps_usage_check(WT_SESSION_IMPL *session, WT_TXN_OP *op, WT_ /* * __txn_fixup_prepared_update -- - * Fix the history store record with the max stop time point if we commit the prepared update. + * Fix/restore the history store update of a prepared datastore update based on transaction + * status. */ static int -__txn_fixup_prepared_update(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_UPDATE *fix_upd) +__txn_fixup_prepared_update( + WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_UPDATE *fix_upd, bool commit) { WT_DECL_RET; WT_ITEM hs_value; WT_TIME_WINDOW tw; WT_TXN *txn; + WT_TXN_GLOBAL *txn_global; uint32_t txn_flags; #ifdef HAVE_DIAGNOSTIC uint64_t hs_upd_type; @@ -1010,14 +1009,12 @@ __txn_fixup_prepared_update(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_U #endif txn = session->txn; + txn_global = &S2C(session)->txn_global; WT_TIME_WINDOW_INIT(&tw); - /* We should not fix a history store record already with a valid tombstone. */ - WT_ASSERT(session, fix_upd->type != WT_UPDATE_TOMBSTONE); - /* - * Transaction error is cleared temporarily as cursor functions are not allowed after an error - * or a prepared transaction. + * Transaction error and prepare are cleared temporarily as cursor functions are not allowed + * after an error or a prepared transaction. */ txn_flags = FLD_MASK(txn->flags, WT_TXN_ERROR); F_CLR(txn, txn_flags); @@ -1031,28 +1028,75 @@ __txn_fixup_prepared_update(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, WT_U */ F_SET(txn, WT_TXN_PREPARE_IGNORE_API_CHECK); - tw.stop_ts = txn->commit_timestamp; - tw.durable_stop_ts = txn->durable_timestamp; - tw.stop_txn = txn->id; - WT_TIME_WINDOW_SET_START(&tw, fix_upd); + /* + * If the history update already has a stop time point and we are committing the prepared update + * there is no work to do. + */ + if (commit) { + tw.stop_ts = txn->commit_timestamp; + tw.durable_stop_ts = txn->durable_timestamp; + tw.stop_txn = txn->id; + WT_TIME_WINDOW_SET_START(&tw, fix_upd); #ifdef HAVE_DIAGNOSTIC - /* Retrieve the existing update value and stop timestamp. */ - WT_ERR(hs_cursor->get_value( - hs_cursor, &hs_stop_durable_ts, &hs_durable_ts, &hs_upd_type, &hs_value)); - WT_ASSERT(session, hs_stop_durable_ts == WT_TS_MAX); - WT_ASSERT(session, (uint8_t)hs_upd_type == WT_UPDATE_STANDARD); + /* Retrieve the existing update value and stop timestamp. */ + WT_ERR(hs_cursor->get_value( + hs_cursor, &hs_stop_durable_ts, &hs_durable_ts, &hs_upd_type, &hs_value)); + WT_ASSERT(session, hs_stop_durable_ts == WT_TS_MAX); + WT_ASSERT(session, (uint8_t)hs_upd_type == WT_UPDATE_STANDARD); #endif - /* - * We need to update the stop durable timestamp stored in the history store value. - * - * Pack the value using cursor api. - */ - hs_value.data = fix_upd->data; - hs_value.size = fix_upd->size; - hs_cursor->set_value(hs_cursor, &tw, tw.durable_stop_ts, tw.durable_start_ts, - (uint64_t)WT_UPDATE_STANDARD, &hs_value); - WT_ERR(hs_cursor->update(hs_cursor)); + /* + * We need to update the stop durable timestamp stored in the history store value. + * + * Pack the value using cursor api. + */ + hs_value.data = fix_upd->data; + hs_value.size = fix_upd->size; + hs_cursor->set_value(hs_cursor, &tw, tw.durable_stop_ts, tw.durable_start_ts, + (uint64_t)WT_UPDATE_STANDARD, &hs_value); + WT_ERR(hs_cursor->update(hs_cursor)); + } else { + /* + * Remove the history store entry if a checkpoint is not running, otherwise place a + * tombstone in front of the history store entry if it doesn't have a stop timestamp. + */ + if (txn_global->checkpoint_running) { + /* Don't update the history store entry if the entry already has a stop timestamp. */ + if (fix_upd->type != WT_UPDATE_TOMBSTONE) { + /* + * When the history store's update start transaction id is greater than the + * checkpoint's reserved transaction id, the durable timestamp of this update is + * guaranteed to be greater than the checkpoint timestamp, as such there is no need + * to save this unstable update in the history store. + */ + if (fix_upd->txnid > txn_global->checkpoint_reserved_txn_id) + WT_ERR(hs_cursor->remove(hs_cursor)); + else { + tw.durable_stop_ts = fix_upd->durable_ts; + tw.stop_ts = fix_upd->start_ts; + + /* + * Set the stop transaction id of the time window to the checkpoint reserved + * transaction id. As such the tombstone won't be visible to rollback to stable, + * additionally checkpoint garbage collection cannot clean it up as it greater + * than the globally visible transaction id. + */ + tw.stop_txn = txn_global->checkpoint_reserved_txn_id; + WT_TIME_WINDOW_SET_START(&tw, fix_upd); + + hs_value.data = fix_upd->data; + hs_value.size = fix_upd->size; + hs_cursor->set_value(hs_cursor, &tw, tw.durable_stop_ts, tw.durable_start_ts, + (uint64_t)WT_UPDATE_STANDARD, &hs_value); + WT_ERR(hs_cursor->update(hs_cursor)); + WT_STAT_CONN_INCR( + session, txn_prepare_rollback_fix_hs_update_with_ckpt_reserved_txnid); + } + } else + WT_STAT_CONN_INCR(session, txn_prepare_rollback_do_not_remove_hs_update); + } else + WT_ERR(hs_cursor->remove(hs_cursor)); + } err: F_SET(txn, txn_flags); @@ -1181,7 +1225,7 @@ __txn_resolve_prepared_op(WT_SESSION_IMPL *session, WT_TXN_OP *op, bool commit, WT_PAGE *page; WT_TIME_WINDOW tw; WT_TXN *txn; - WT_UPDATE *first_committed_upd, *fix_upd, *upd, *upd_followed_tombstone; + WT_UPDATE *first_committed_upd, *fix_upd, *upd; #ifdef HAVE_DIAGNOSTIC WT_UPDATE *head_upd; #endif @@ -1266,27 +1310,6 @@ __txn_resolve_prepared_op(WT_SESSION_IMPL *session, WT_TXN_OP *op, bool commit, upd->txnid == upd->next->txnid && upd->start_ts == upd->next->start_ts)); first_committed_upd_in_hs = first_committed_upd != NULL && F_ISSET(first_committed_upd, WT_UPDATE_HS); - - /* - * Marked the update older than the prepared update that is already in the history store to be - * deleted from the history store. - */ - if (first_committed_upd_in_hs && !commit) { - if (first_committed_upd->type == WT_UPDATE_TOMBSTONE) { - for (upd_followed_tombstone = first_committed_upd->next; upd_followed_tombstone != NULL; - upd_followed_tombstone = upd_followed_tombstone->next) - if (upd_followed_tombstone->txnid != WT_TXN_ABORTED) - break; - /* We may not find a full update following the tombstone if it is obsolete. */ - if (upd_followed_tombstone != NULL) { - WT_ASSERT(session, F_ISSET(upd_followed_tombstone, WT_UPDATE_HS)); - F_SET(first_committed_upd, WT_UPDATE_TO_DELETE_FROM_HS); - F_SET(upd_followed_tombstone, WT_UPDATE_TO_DELETE_FROM_HS); - } - } else - F_SET(first_committed_upd, WT_UPDATE_TO_DELETE_FROM_HS); - } - if (prepare_on_disk || first_committed_upd_in_hs) { btree = S2BT(session); @@ -1317,8 +1340,8 @@ __txn_resolve_prepared_op(WT_SESSION_IMPL *session, WT_TXN_OP *op, bool commit, */ WT_ERR(__txn_append_tombstone(session, op, cbt)); } else if (ret == 0) - WT_ERR(__txn_locate_hs_record(session, hs_cursor, page, upd, commit, &fix_upd, - &upd_appended, first_committed_upd, first_committed_upd_in_hs)); + WT_ERR(__txn_locate_hs_record( + session, hs_cursor, page, upd, commit, &fix_upd, &upd_appended, first_committed_upd)); else ret = 0; } else if (F_ISSET(S2C(session), WT_CONN_IN_MEMORY) && !commit && first_committed_upd == NULL) { @@ -1403,8 +1426,10 @@ __txn_resolve_prepared_op(WT_SESSION_IMPL *session, WT_TXN_OP *op, bool commit, * will be only one uncommitted prepared update. */ if (fix_upd != NULL) { - WT_ASSERT(session, commit && fix_upd->type != WT_UPDATE_TOMBSTONE); - WT_ERR(__txn_fixup_prepared_update(session, hs_cursor, fix_upd)); + WT_ERR(__txn_fixup_prepared_update(session, hs_cursor, fix_upd, commit)); + /* Clear the WT_UPDATE_HS flag as we should have removed it from the history store. */ + if (first_committed_upd_in_hs && !commit) + F_CLR(first_committed_upd, WT_UPDATE_HS); } prepare_verify: |