diff options
author | Samy Lanka <samy.lanka@mongodb.com> | 2018-10-12 11:28:51 -0400 |
---|---|---|
committer | Samy Lanka <samy.lanka@mongodb.com> | 2018-10-23 15:29:46 -0400 |
commit | 1fea1df1ee21cac90a2217f219f1ba12244fc4fa (patch) | |
tree | 5ee156668667be375c0d551485e348736a0c9832 /src/mongo/db | |
parent | 485491d5839e11f47c6696d1bcf3a449bfbc56cf (diff) | |
download | mongo-1fea1df1ee21cac90a2217f219f1ba12244fc4fa.tar.gz |
SERVER-35879 Deal with commitTransaction oplog entries during startup recovery
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery_test.cpp | 161 |
7 files changed, 227 insertions, 3 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 43ea709440a..bd41e12bd1e 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -86,6 +86,7 @@ #include "mongo/db/stats/counters.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/storage_options.h" +#include "mongo/db/transaction_history_iterator.h" #include "mongo/db/transaction_participant.h" #include "mongo/platform/random.h" #include "mongo/scripting/engine.h" @@ -1010,7 +1011,34 @@ std::map<std::string, ApplyOpMetadata> opsMap = { BSONObj& cmd, const OpTime& opTime, const OplogEntry& entry, - OplogApplication::Mode mode) -> Status { return Status::OK(); }}}, + OplogApplication::Mode mode) -> Status { + if (mode == OplogApplication::Mode::kRecovering) { + const auto replCoord = ReplicationCoordinator::get(opCtx); + const auto recoveryTimestamp = replCoord->getRecoveryTimestamp(); + invariant(recoveryTimestamp); + + // If the commitTimestamp is before the recoveryTimestamp, then the data already + // reflects the operations from the transaction. + const auto commitTimestamp = cmd["commitTimestamp"].timestamp(); + if (recoveryTimestamp.get() > commitTimestamp) { + return Status::OK(); + } + + // Get the corresponding prepareTransaction oplog entry. + TransactionHistoryIterator iter(opTime); + invariant(iter.hasNext()); + const auto commitOplogEntry = iter.next(opCtx); + invariant(iter.hasNext()); + const auto prepareOplogEntry = iter.next(opCtx); + + // Transform prepare command into a normal applyOps command. + const auto prepareCmd = prepareOplogEntry.getOperationToApply().removeField("prepare"); + + BSONObjBuilder resultWeDontCareAbout; + return applyOps(opCtx, nsToDatabase(ns), prepareCmd, mode, &resultWeDontCareAbout); + } + return Status::OK(); + }}}, {"abortTransaction", {[](OperationContext* opCtx, const char* ns, diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 31073457042..873b982022e 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -783,6 +783,12 @@ public: */ bool isOplogDisabledFor(OperationContext* opCtx, const NamespaceString& nss); + /** + * Returns the stable timestamp that the storage engine recovered to on startup. If the + * recovery point was not stable, returns "none". + */ + virtual boost::optional<Timestamp> getRecoveryTimestamp() = 0; + protected: ReplicationCoordinator(); }; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f031144e5b9..99129b4c14d 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -94,6 +94,7 @@ namespace repl { MONGO_FAIL_POINT_DEFINE(stepdownHangBeforePerformingPostMemberStateUpdateActions); MONGO_FAIL_POINT_DEFINE(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock); +MONGO_FAIL_POINT_DEFINE(holdStableTimestampAtSpecificTimestamp); using CallbackArgs = executor::TaskExecutor::CallbackArgs; using CallbackFn = executor::TaskExecutor::CallbackFn; @@ -2776,6 +2777,10 @@ void ReplicationCoordinatorImpl::signalDropPendingCollectionsRemovedFromStorage( _wakeReadyWaiters_inlock(); } +boost::optional<Timestamp> ReplicationCoordinatorImpl::getRecoveryTimestamp() { + return _storage->getRecoveryTimestamp(getServiceContext()); +} + void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { _applierState = ApplierState::Draining; _externalState->stopProducer(); @@ -3056,6 +3061,13 @@ boost::optional<OpTime> ReplicationCoordinatorImpl::_calculateStableOpTime_inloc maximumStableTimestamp = std::min(_storage->getAllCommittedTimestamp(_service), commitPoint.getTimestamp()); } + + MONGO_FAIL_POINT_BLOCK(holdStableTimestampAtSpecificTimestamp, data) { + const BSONObj& dataObj = data.getData(); + const auto holdStableTimestamp = dataObj["timestamp"].timestamp(); + maximumStableTimestamp = std::min(maximumStableTimestamp, holdStableTimestamp); + } + const auto maximumStableOpTime = OpTime(maximumStableTimestamp, commitPoint.getTerm()); // Find the greatest optime candidate that is less than or equal to the commit point. diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 059b04b576e..91721015755 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -299,6 +299,8 @@ public: void signalDropPendingCollectionsRemovedFromStorage() final; + virtual boost::optional<Timestamp> getRecoveryTimestamp() override; + // ================== Test support API =================== /** diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 51a13bfe741..d00d15301a4 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -62,6 +62,12 @@ ReplicationCoordinatorMock::ReplicationCoordinatorMock(ServiceContext* service, const ReplSettings& settings) : _service(service), _settings(settings) {} +ReplicationCoordinatorMock::ReplicationCoordinatorMock(ServiceContext* service, + StorageInterface* storage) + : ReplicationCoordinatorMock(service, createReplSettingsForSingleNodeReplSet()) { + _storage = storage; +} + ReplicationCoordinatorMock::ReplicationCoordinatorMock(ServiceContext* service) : ReplicationCoordinatorMock(service, createReplSettingsForSingleNodeReplSet()) {} @@ -469,5 +475,12 @@ Status ReplicationCoordinatorMock::abortCatchupIfNeeded() { void ReplicationCoordinatorMock::signalDropPendingCollectionsRemovedFromStorage() {} +boost::optional<Timestamp> ReplicationCoordinatorMock::getRecoveryTimestamp() { + if (_storage) { + return _storage->getRecoveryTimestamp(getServiceContext()); + } + return boost::none; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index f908fe11265..79be7c94918 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -34,6 +34,7 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/platform/atomic_word.h" #include "mongo/stdx/functional.h" @@ -55,6 +56,8 @@ class ReplicationCoordinatorMock : public ReplicationCoordinator { public: ReplicationCoordinatorMock(ServiceContext* service, const ReplSettings& settings); + ReplicationCoordinatorMock(ServiceContext* service, StorageInterface* storage); + /** * Creates a ReplicationCoordinatorMock with ReplSettings for a one-node replica set. */ @@ -279,10 +282,13 @@ public: void signalDropPendingCollectionsRemovedFromStorage() final; + virtual boost::optional<Timestamp> getRecoveryTimestamp() override; + private: AtomicUInt64 _snapshotNameGenerator; ServiceContext* const _service; ReplSettings _settings; + StorageInterface* _storage; MemberState _memberState; OpTime _myLastDurableOpTime; OpTime _myLastAppliedOpTime; diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp index bacab9f72e7..48bb3ac9761 100644 --- a/src/mongo/db/repl/replication_recovery_test.cpp +++ b/src/mongo/db/repl/replication_recovery_test.cpp @@ -139,8 +139,8 @@ private: _consistencyMarkers = stdx::make_unique<ReplicationConsistencyMarkersMock>(); auto service = getServiceContext(); - ReplicationCoordinator::set(service, - stdx::make_unique<ReplicationCoordinatorMock>(service)); + ReplicationCoordinator::set( + service, stdx::make_unique<ReplicationCoordinatorMock>(service, getStorageInterface())); ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); @@ -997,4 +997,161 @@ DEATH_TEST_F(ReplicationRecoveryTest, recovery.recoverFromOplog(opCtx, boost::none); } +TEST_F(ReplicationRecoveryTest, CommitTransactionOplogEntryCorrectlyUpdatesConfigTransactions) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + const auto appliedThrough = OpTime(Timestamp(1, 1), 1); + getStorageInterfaceRecovery()->setSupportsRecoverToStableTimestamp(true); + getStorageInterfaceRecovery()->setRecoveryTimestamp(appliedThrough.getTimestamp()); + getConsistencyMarkers()->setAppliedThrough(opCtx, appliedThrough); + _setUpOplog(opCtx, getStorageInterface(), {1}); + + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(3); + + const auto txnOperations = BSON_ARRAY(BSON("op" + << "i" + << "ns" + << testNs.toString() + << "o" + << BSON("_id" << 1))); + const auto prepareDate = Date_t::now(); + const auto prepareOp = + _makeTransactionOplogEntry({Timestamp(2, 0), 1}, + repl::OpTypeEnum::kCommand, + BSON("applyOps" << txnOperations << "prepare" << true), + OpTime(Timestamp(0, 0), -1), + true, // prepare + 0, + sessionInfo, + prepareDate); + + ASSERT_OK(getStorageInterface()->insertDocument( + opCtx, oplogNs, {prepareOp.toBSON(), Timestamp(2, 0)}, 1)); + + const auto commitDate = Date_t::now(); + const auto commitOp = _makeTransactionOplogEntry( + {Timestamp(3, 0), 1}, + repl::OpTypeEnum::kCommand, + BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(2, 1)), + OpTime(Timestamp(2, 0), 1), + false, // prepare + 1, + sessionInfo, + commitDate); + + ASSERT_OK(getStorageInterface()->insertDocument( + opCtx, oplogNs, {commitOp.toBSON(), Timestamp(3, 0)}, 1)); + + recovery.recoverFromOplog(opCtx, boost::none); + + SessionTxnRecord expectedTxnRecord; + expectedTxnRecord.setSessionId(*sessionInfo.getSessionId()); + expectedTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); + expectedTxnRecord.setLastWriteOpTime({Timestamp(3, 0), 1}); + expectedTxnRecord.setLastWriteDate(commitDate); + expectedTxnRecord.setState(DurableTxnStateEnum::kCommitted); + + std::vector<BSONObj> expectedTxnColl{expectedTxnRecord.toBSON()}; + + // Make sure that the transaction table shows that the transaction is commited. + _assertDocumentsInCollectionEquals( + opCtx, NamespaceString::kSessionTransactionsTableNamespace, expectedTxnColl); + + // Make sure the data from the transaction is applied. + std::vector<BSONObj> expectedColl{BSON("_id" << 1)}; + _assertDocumentsInCollectionEquals(opCtx, testNs, expectedColl); + + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 0), 1)); +} + +TEST_F(ReplicationRecoveryTest, + CommitTransactionBeforeRecoveryTimestampCorrectlyUpdatesConfigTransactions) { + ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers()); + auto opCtx = getOperationContext(); + + // Make the appliedThrough optime to be after the commit timestamp but before the + // commitTransaction oplog entry. This way we can check that there are no idempotency concerns + // when updating the transactions table during startup recovery when the table already reflects + // the committed transaction. + const auto appliedThrough = OpTime(Timestamp(2, 2), 1); + getStorageInterfaceRecovery()->setSupportsRecoverToStableTimestamp(true); + getStorageInterfaceRecovery()->setRecoveryTimestamp(appliedThrough.getTimestamp()); + getConsistencyMarkers()->setAppliedThrough(opCtx, appliedThrough); + _setUpOplog(opCtx, getStorageInterface(), {1}); + + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(3); + + const auto txnOperations = BSON_ARRAY(BSON("op" + << "i" + << "ns" + << testNs.toString() + << "o" + << BSON("_id" << 1))); + const auto prepareDate = Date_t::now(); + const auto prepareOp = + _makeTransactionOplogEntry({Timestamp(2, 0), 1}, + repl::OpTypeEnum::kCommand, + BSON("applyOps" << txnOperations << "prepare" << true), + OpTime(Timestamp(0, 0), -1), + true, // prepare + 0, + sessionInfo, + prepareDate); + + ASSERT_OK(getStorageInterface()->insertDocument( + opCtx, oplogNs, {prepareOp.toBSON(), Timestamp(2, 0)}, 1)); + + // Add an operation here so that we can have the appliedThrough time be in-between the commit + // timestamp and the commitTransaction oplog entry. + const auto insertOp = _makeOplogEntry({Timestamp(2, 2), 1}, + repl::OpTypeEnum::kInsert, + BSON("_id" << 2), + boost::none, + sessionInfo, + Date_t::now()); + + ASSERT_OK(getStorageInterface()->insertDocument( + opCtx, oplogNs, {insertOp.toBSON(), Timestamp(2, 2)}, 1)); + + const auto commitDate = Date_t::now(); + const auto commitOp = _makeTransactionOplogEntry( + {Timestamp(3, 0), 1}, + repl::OpTypeEnum::kCommand, + BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(2, 1)), + OpTime(Timestamp(2, 0), 1), + false, // prepare + 1, + sessionInfo, + commitDate); + + ASSERT_OK(getStorageInterface()->insertDocument( + opCtx, oplogNs, {commitOp.toBSON(), Timestamp(3, 0)}, 1)); + + recovery.recoverFromOplog(opCtx, boost::none); + + SessionTxnRecord expectedTxnRecord; + expectedTxnRecord.setSessionId(*sessionInfo.getSessionId()); + expectedTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); + expectedTxnRecord.setLastWriteOpTime({Timestamp(3, 0), 1}); + expectedTxnRecord.setLastWriteDate(commitDate); + expectedTxnRecord.setState(DurableTxnStateEnum::kCommitted); + + std::vector<BSONObj> expectedTxnColl{expectedTxnRecord.toBSON()}; + + // Make sure that the transaction table shows that the transaction is commited. + _assertDocumentsInCollectionEquals( + opCtx, NamespaceString::kSessionTransactionsTableNamespace, expectedTxnColl); + + ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp()); + ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 0), 1)); +} + } // namespace |