summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSamy Lanka <samy.lanka@mongodb.com>2018-10-12 11:28:51 -0400
committerSamy Lanka <samy.lanka@mongodb.com>2018-10-23 15:29:46 -0400
commit1fea1df1ee21cac90a2217f219f1ba12244fc4fa (patch)
tree5ee156668667be375c0d551485e348736a0c9832 /src
parent485491d5839e11f47c6696d1bcf3a449bfbc56cf (diff)
downloadmongo-1fea1df1ee21cac90a2217f219f1ba12244fc4fa.tar.gz
SERVER-35879 Deal with commitTransaction oplog entries during startup recovery
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/oplog.cpp30
-rw-r--r--src/mongo/db/repl/replication_coordinator.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp12
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp13
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h6
-rw-r--r--src/mongo/db/repl/replication_recovery_test.cpp161
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.cpp4
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.h2
9 files changed, 233 insertions, 3 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 43ea709440a..bd41e12bd1e 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -86,6 +86,7 @@
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/storage/storage_options.h"
+#include "mongo/db/transaction_history_iterator.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/platform/random.h"
#include "mongo/scripting/engine.h"
@@ -1010,7 +1011,34 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
BSONObj& cmd,
const OpTime& opTime,
const OplogEntry& entry,
- OplogApplication::Mode mode) -> Status { return Status::OK(); }}},
+ OplogApplication::Mode mode) -> Status {
+ if (mode == OplogApplication::Mode::kRecovering) {
+ const auto replCoord = ReplicationCoordinator::get(opCtx);
+ const auto recoveryTimestamp = replCoord->getRecoveryTimestamp();
+ invariant(recoveryTimestamp);
+
+ // If the commitTimestamp is before the recoveryTimestamp, then the data already
+ // reflects the operations from the transaction.
+ const auto commitTimestamp = cmd["commitTimestamp"].timestamp();
+ if (recoveryTimestamp.get() > commitTimestamp) {
+ return Status::OK();
+ }
+
+ // Get the corresponding prepareTransaction oplog entry.
+ TransactionHistoryIterator iter(opTime);
+ invariant(iter.hasNext());
+ const auto commitOplogEntry = iter.next(opCtx);
+ invariant(iter.hasNext());
+ const auto prepareOplogEntry = iter.next(opCtx);
+
+ // Transform prepare command into a normal applyOps command.
+ const auto prepareCmd = prepareOplogEntry.getOperationToApply().removeField("prepare");
+
+ BSONObjBuilder resultWeDontCareAbout;
+ return applyOps(opCtx, nsToDatabase(ns), prepareCmd, mode, &resultWeDontCareAbout);
+ }
+ return Status::OK();
+ }}},
{"abortTransaction",
{[](OperationContext* opCtx,
const char* ns,
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 31073457042..873b982022e 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -783,6 +783,12 @@ public:
*/
bool isOplogDisabledFor(OperationContext* opCtx, const NamespaceString& nss);
+ /**
+ * Returns the stable timestamp that the storage engine recovered to on startup. If the
+ * recovery point was not stable, returns "none".
+ */
+ virtual boost::optional<Timestamp> getRecoveryTimestamp() = 0;
+
protected:
ReplicationCoordinator();
};
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index f031144e5b9..99129b4c14d 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -94,6 +94,7 @@ namespace repl {
MONGO_FAIL_POINT_DEFINE(stepdownHangBeforePerformingPostMemberStateUpdateActions);
MONGO_FAIL_POINT_DEFINE(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock);
+MONGO_FAIL_POINT_DEFINE(holdStableTimestampAtSpecificTimestamp);
using CallbackArgs = executor::TaskExecutor::CallbackArgs;
using CallbackFn = executor::TaskExecutor::CallbackFn;
@@ -2776,6 +2777,10 @@ void ReplicationCoordinatorImpl::signalDropPendingCollectionsRemovedFromStorage(
_wakeReadyWaiters_inlock();
}
+boost::optional<Timestamp> ReplicationCoordinatorImpl::getRecoveryTimestamp() {
+ return _storage->getRecoveryTimestamp(getServiceContext());
+}
+
void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
_applierState = ApplierState::Draining;
_externalState->stopProducer();
@@ -3056,6 +3061,13 @@ boost::optional<OpTime> ReplicationCoordinatorImpl::_calculateStableOpTime_inloc
maximumStableTimestamp =
std::min(_storage->getAllCommittedTimestamp(_service), commitPoint.getTimestamp());
}
+
+ MONGO_FAIL_POINT_BLOCK(holdStableTimestampAtSpecificTimestamp, data) {
+ const BSONObj& dataObj = data.getData();
+ const auto holdStableTimestamp = dataObj["timestamp"].timestamp();
+ maximumStableTimestamp = std::min(maximumStableTimestamp, holdStableTimestamp);
+ }
+
const auto maximumStableOpTime = OpTime(maximumStableTimestamp, commitPoint.getTerm());
// Find the greatest optime candidate that is less than or equal to the commit point.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 059b04b576e..91721015755 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -299,6 +299,8 @@ public:
void signalDropPendingCollectionsRemovedFromStorage() final;
+ virtual boost::optional<Timestamp> getRecoveryTimestamp() override;
+
// ================== Test support API ===================
/**
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 51a13bfe741..d00d15301a4 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -62,6 +62,12 @@ ReplicationCoordinatorMock::ReplicationCoordinatorMock(ServiceContext* service,
const ReplSettings& settings)
: _service(service), _settings(settings) {}
+ReplicationCoordinatorMock::ReplicationCoordinatorMock(ServiceContext* service,
+ StorageInterface* storage)
+ : ReplicationCoordinatorMock(service, createReplSettingsForSingleNodeReplSet()) {
+ _storage = storage;
+}
+
ReplicationCoordinatorMock::ReplicationCoordinatorMock(ServiceContext* service)
: ReplicationCoordinatorMock(service, createReplSettingsForSingleNodeReplSet()) {}
@@ -469,5 +475,12 @@ Status ReplicationCoordinatorMock::abortCatchupIfNeeded() {
void ReplicationCoordinatorMock::signalDropPendingCollectionsRemovedFromStorage() {}
+boost::optional<Timestamp> ReplicationCoordinatorMock::getRecoveryTimestamp() {
+ if (_storage) {
+ return _storage->getRecoveryTimestamp(getServiceContext());
+ }
+ return boost::none;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index f908fe11265..79be7c94918 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -34,6 +34,7 @@
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/functional.h"
@@ -55,6 +56,8 @@ class ReplicationCoordinatorMock : public ReplicationCoordinator {
public:
ReplicationCoordinatorMock(ServiceContext* service, const ReplSettings& settings);
+ ReplicationCoordinatorMock(ServiceContext* service, StorageInterface* storage);
+
/**
* Creates a ReplicationCoordinatorMock with ReplSettings for a one-node replica set.
*/
@@ -279,10 +282,13 @@ public:
void signalDropPendingCollectionsRemovedFromStorage() final;
+ virtual boost::optional<Timestamp> getRecoveryTimestamp() override;
+
private:
AtomicUInt64 _snapshotNameGenerator;
ServiceContext* const _service;
ReplSettings _settings;
+ StorageInterface* _storage;
MemberState _memberState;
OpTime _myLastDurableOpTime;
OpTime _myLastAppliedOpTime;
diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp
index bacab9f72e7..48bb3ac9761 100644
--- a/src/mongo/db/repl/replication_recovery_test.cpp
+++ b/src/mongo/db/repl/replication_recovery_test.cpp
@@ -139,8 +139,8 @@ private:
_consistencyMarkers = stdx::make_unique<ReplicationConsistencyMarkersMock>();
auto service = getServiceContext();
- ReplicationCoordinator::set(service,
- stdx::make_unique<ReplicationCoordinatorMock>(service));
+ ReplicationCoordinator::set(
+ service, stdx::make_unique<ReplicationCoordinatorMock>(service, getStorageInterface()));
ASSERT_OK(
ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY));
@@ -997,4 +997,161 @@ DEATH_TEST_F(ReplicationRecoveryTest,
recovery.recoverFromOplog(opCtx, boost::none);
}
+TEST_F(ReplicationRecoveryTest, CommitTransactionOplogEntryCorrectlyUpdatesConfigTransactions) {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ const auto appliedThrough = OpTime(Timestamp(1, 1), 1);
+ getStorageInterfaceRecovery()->setSupportsRecoverToStableTimestamp(true);
+ getStorageInterfaceRecovery()->setRecoveryTimestamp(appliedThrough.getTimestamp());
+ getConsistencyMarkers()->setAppliedThrough(opCtx, appliedThrough);
+ _setUpOplog(opCtx, getStorageInterface(), {1});
+
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+
+ const auto txnOperations = BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << testNs.toString()
+ << "o"
+ << BSON("_id" << 1)));
+ const auto prepareDate = Date_t::now();
+ const auto prepareOp =
+ _makeTransactionOplogEntry({Timestamp(2, 0), 1},
+ repl::OpTypeEnum::kCommand,
+ BSON("applyOps" << txnOperations << "prepare" << true),
+ OpTime(Timestamp(0, 0), -1),
+ true, // prepare
+ 0,
+ sessionInfo,
+ prepareDate);
+
+ ASSERT_OK(getStorageInterface()->insertDocument(
+ opCtx, oplogNs, {prepareOp.toBSON(), Timestamp(2, 0)}, 1));
+
+ const auto commitDate = Date_t::now();
+ const auto commitOp = _makeTransactionOplogEntry(
+ {Timestamp(3, 0), 1},
+ repl::OpTypeEnum::kCommand,
+ BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(2, 1)),
+ OpTime(Timestamp(2, 0), 1),
+ false, // prepare
+ 1,
+ sessionInfo,
+ commitDate);
+
+ ASSERT_OK(getStorageInterface()->insertDocument(
+ opCtx, oplogNs, {commitOp.toBSON(), Timestamp(3, 0)}, 1));
+
+ recovery.recoverFromOplog(opCtx, boost::none);
+
+ SessionTxnRecord expectedTxnRecord;
+ expectedTxnRecord.setSessionId(*sessionInfo.getSessionId());
+ expectedTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
+ expectedTxnRecord.setLastWriteOpTime({Timestamp(3, 0), 1});
+ expectedTxnRecord.setLastWriteDate(commitDate);
+ expectedTxnRecord.setState(DurableTxnStateEnum::kCommitted);
+
+ std::vector<BSONObj> expectedTxnColl{expectedTxnRecord.toBSON()};
+
+ // Make sure that the transaction table shows that the transaction is commited.
+ _assertDocumentsInCollectionEquals(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace, expectedTxnColl);
+
+ // Make sure the data from the transaction is applied.
+ std::vector<BSONObj> expectedColl{BSON("_id" << 1)};
+ _assertDocumentsInCollectionEquals(opCtx, testNs, expectedColl);
+
+ ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp());
+ ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 0), 1));
+}
+
+TEST_F(ReplicationRecoveryTest,
+ CommitTransactionBeforeRecoveryTimestampCorrectlyUpdatesConfigTransactions) {
+ ReplicationRecoveryImpl recovery(getStorageInterface(), getConsistencyMarkers());
+ auto opCtx = getOperationContext();
+
+ // Make the appliedThrough optime to be after the commit timestamp but before the
+ // commitTransaction oplog entry. This way we can check that there are no idempotency concerns
+ // when updating the transactions table during startup recovery when the table already reflects
+ // the committed transaction.
+ const auto appliedThrough = OpTime(Timestamp(2, 2), 1);
+ getStorageInterfaceRecovery()->setSupportsRecoverToStableTimestamp(true);
+ getStorageInterfaceRecovery()->setRecoveryTimestamp(appliedThrough.getTimestamp());
+ getConsistencyMarkers()->setAppliedThrough(opCtx, appliedThrough);
+ _setUpOplog(opCtx, getStorageInterface(), {1});
+
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+
+ const auto txnOperations = BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << testNs.toString()
+ << "o"
+ << BSON("_id" << 1)));
+ const auto prepareDate = Date_t::now();
+ const auto prepareOp =
+ _makeTransactionOplogEntry({Timestamp(2, 0), 1},
+ repl::OpTypeEnum::kCommand,
+ BSON("applyOps" << txnOperations << "prepare" << true),
+ OpTime(Timestamp(0, 0), -1),
+ true, // prepare
+ 0,
+ sessionInfo,
+ prepareDate);
+
+ ASSERT_OK(getStorageInterface()->insertDocument(
+ opCtx, oplogNs, {prepareOp.toBSON(), Timestamp(2, 0)}, 1));
+
+ // Add an operation here so that we can have the appliedThrough time be in-between the commit
+ // timestamp and the commitTransaction oplog entry.
+ const auto insertOp = _makeOplogEntry({Timestamp(2, 2), 1},
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 2),
+ boost::none,
+ sessionInfo,
+ Date_t::now());
+
+ ASSERT_OK(getStorageInterface()->insertDocument(
+ opCtx, oplogNs, {insertOp.toBSON(), Timestamp(2, 2)}, 1));
+
+ const auto commitDate = Date_t::now();
+ const auto commitOp = _makeTransactionOplogEntry(
+ {Timestamp(3, 0), 1},
+ repl::OpTypeEnum::kCommand,
+ BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(2, 1)),
+ OpTime(Timestamp(2, 0), 1),
+ false, // prepare
+ 1,
+ sessionInfo,
+ commitDate);
+
+ ASSERT_OK(getStorageInterface()->insertDocument(
+ opCtx, oplogNs, {commitOp.toBSON(), Timestamp(3, 0)}, 1));
+
+ recovery.recoverFromOplog(opCtx, boost::none);
+
+ SessionTxnRecord expectedTxnRecord;
+ expectedTxnRecord.setSessionId(*sessionInfo.getSessionId());
+ expectedTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
+ expectedTxnRecord.setLastWriteOpTime({Timestamp(3, 0), 1});
+ expectedTxnRecord.setLastWriteDate(commitDate);
+ expectedTxnRecord.setState(DurableTxnStateEnum::kCommitted);
+
+ std::vector<BSONObj> expectedTxnColl{expectedTxnRecord.toBSON()};
+
+ // Make sure that the transaction table shows that the transaction is commited.
+ _assertDocumentsInCollectionEquals(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace, expectedTxnColl);
+
+ ASSERT_EQ(getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx), Timestamp());
+ ASSERT_EQ(getConsistencyMarkers()->getAppliedThrough(opCtx), OpTime(Timestamp(3, 0), 1));
+}
+
} // namespace
diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp
index 09ff73599cd..28ceb137eef 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.cpp
+++ b/src/mongo/embedded/replication_coordinator_embedded.cpp
@@ -427,5 +427,9 @@ void ReplicationCoordinatorEmbedded::signalDropPendingCollectionsRemovedFromStor
UASSERT_NOT_IMPLEMENTED;
}
+boost::optional<Timestamp> ReplicationCoordinatorEmbedded::getRecoveryTimestamp() {
+ UASSERT_NOT_IMPLEMENTED;
+}
+
} // namespace embedded
} // namespace mongo
diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h
index e7ca5504b83..971132f9e33 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.h
+++ b/src/mongo/embedded/replication_coordinator_embedded.h
@@ -227,6 +227,8 @@ public:
void signalDropPendingCollectionsRemovedFromStorage() final;
+ boost::optional<Timestamp> getRecoveryTimestamp() override;
+
private:
// Back pointer to the ServiceContext that has started the instance.
ServiceContext* const _service;