summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2017-08-23 13:12:17 -0400
committerWilliam Schultz <william.schultz@mongodb.com>2017-08-23 13:27:17 -0400
commit8097b23bb419708202b5306133cb57a545d0825b (patch)
tree1556dcbc854d7f11810803618d310ff52328355b /src/mongo/db/repl
parentce094a604ba1e11ea220a47c0e6626e18103601a (diff)
downloadmongo-8097b23bb419708202b5306133cb57a545d0825b.tar.gz
SERVER-29899 Call recovery logic in rollback
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp46
-rw-r--r--src/mongo/db/repl/rollback_impl.h29
-rw-r--r--src/mongo/db/repl/rollback_impl_test.cpp53
3 files changed, 112 insertions, 16 deletions
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index 0d8de17e588..8a5f887fdac 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -114,16 +114,19 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) {
return status;
}
- // Recover to the stable timestamp while holding the global exclusive lock.
- auto serviceCtx = opCtx->getServiceContext();
- {
- Lock::GlobalWrite globalWrite(opCtx);
- status = _storageInterface->recoverToStableTimestamp(serviceCtx);
- if (!status.isOK()) {
- return status;
- }
+ // Recover to the stable timestamp.
+ status = _recoverToStableTimestamp(opCtx);
+ if (!status.isOK()) {
+ return status;
}
+ _listener->onRecoverToStableTimestamp();
+ // Run the oplog recovery logic.
+ status = _oplogRecovery(opCtx);
+ if (!status.isOK()) {
+ return status;
+ }
+ _listener->onRecoverFromOplog();
// At this point these functions need to always be called before returning, even on failure.
// These functions fassert on failure.
@@ -133,7 +136,6 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) {
_transitionFromRollbackToSecondary(opCtx);
});
- // TODO: The rest of roll back.
return Status::OK();
}
@@ -192,6 +194,32 @@ StatusWith<Timestamp> RollbackImpl::_findCommonPoint() {
return commonPointSW.getValue().first.getTimestamp();
}
+Status RollbackImpl::_recoverToStableTimestamp(OperationContext* opCtx) {
+ if (_isInShutdown()) {
+ return Status(ErrorCodes::ShutdownInProgress, "rollback shutting down");
+ }
+ // Recover to the stable timestamp while holding the global exclusive lock.
+ auto serviceCtx = opCtx->getServiceContext();
+ {
+ Lock::GlobalWrite globalWrite(opCtx);
+ try {
+ return _storageInterface->recoverToStableTimestamp(serviceCtx);
+ } catch (...) {
+ return exceptionToStatus();
+ }
+ }
+}
+
+Status RollbackImpl::_oplogRecovery(OperationContext* opCtx) {
+ if (_isInShutdown()) {
+ return Status(ErrorCodes::ShutdownInProgress, "rollback shutting down");
+ }
+ // Run the recovery process.
+ _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx);
+ return Status::OK();
+}
+
+
void RollbackImpl::_checkShardIdentityRollback(OperationContext* opCtx) {
invariant(opCtx);
diff --git a/src/mongo/db/repl/rollback_impl.h b/src/mongo/db/repl/rollback_impl.h
index 46f4111ec95..b8a6b5b04cf 100644
--- a/src/mongo/db/repl/rollback_impl.h
+++ b/src/mongo/db/repl/rollback_impl.h
@@ -95,14 +95,24 @@ public:
virtual ~Listener() = default;
/**
- * Function called after we transition to ROLLBACK.
+ * Function called after we transition to ROLLBACK.
*/
- virtual void onTransitionToRollback() {}
+ virtual void onTransitionToRollback() noexcept {}
/**
- * Function called after we find the common point.
+ * Function called after we find the common point.
*/
- virtual void onCommonPointFound(Timestamp commonPoint) {}
+ virtual void onCommonPointFound(Timestamp commonPoint) noexcept {}
+
+ /**
+ * Function called after we recover to the stable timestamp.
+ */
+ virtual void onRecoverToStableTimestamp() noexcept {}
+
+ /**
+ * Function called after we recover from the oplog.
+ */
+ virtual void onRecoverFromOplog() noexcept {}
};
/**
@@ -159,6 +169,17 @@ private:
Status _transitionToRollback(OperationContext* opCtx);
/**
+ * Recovers to the stable timestamp while holding the global exclusive lock.
+ */
+ Status _recoverToStableTimestamp(OperationContext* opCtx);
+
+ /**
+ * Runs the oplog recovery logic. This involves applying oplog operations between the stable
+ * timestamp and the common point.
+ */
+ Status _oplogRecovery(OperationContext* opCtx);
+
+ /**
* If we detected that we rolled back the shardIdentity document as part of this rollback
* then we must shut down the server to clear the in-memory ShardingState associated with the
* shardIdentity document.
diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp
index 07db1736d3b..4b0b8640a53 100644
--- a/src/mongo/db/repl/rollback_impl_test.cpp
+++ b/src/mongo/db/repl/rollback_impl_test.cpp
@@ -122,6 +122,14 @@ protected:
bool _transitionedToRollback = false;
stdx::function<void()> _onTransitionToRollbackFn = [this]() { _transitionedToRollback = true; };
+ bool _recoveredToStableTimestamp = false;
+ stdx::function<void()> _onRecoverToStableTimestampFn = [this]() {
+ _recoveredToStableTimestamp = true;
+ };
+
+ bool _recoveredFromOplog = false;
+ stdx::function<void()> _onRecoverFromOplogFn = [this]() { _recoveredFromOplog = true; };
+
Timestamp _commonPointFound;
stdx::function<void(Timestamp commonPoint)> _onCommonPointFoundFn =
[this](Timestamp commonPoint) { _commonPointFound = commonPoint; };
@@ -158,14 +166,22 @@ class RollbackImplTest::Listener : public RollbackImpl::Listener {
public:
Listener(RollbackImplTest* test) : _test(test) {}
- void onTransitionToRollback() {
+ void onTransitionToRollback() noexcept {
_test->_onTransitionToRollbackFn();
}
- void onCommonPointFound(Timestamp commonPoint) {
+ void onCommonPointFound(Timestamp commonPoint) noexcept {
_test->_onCommonPointFoundFn(commonPoint);
}
+ void onRecoverToStableTimestamp() noexcept {
+ _test->_onRecoverToStableTimestampFn();
+ }
+
+ void onRecoverFromOplog() noexcept {
+ _test->_onRecoverFromOplogFn();
+ }
+
private:
RollbackImplTest* _test;
};
@@ -236,7 +252,7 @@ TEST_F(RollbackImplTest, RollbackPersistsCommonPointToOplogTruncateAfterPoint) {
_remoteOplog->setOperations({makeOpAndRecordId(2)});
_localOplog->setOperations({makeOpAndRecordId(2)});
- ASSERT_EQUALS(Status::OK(), _rollback->runRollback(_opCtx.get()));
+ ASSERT_OK(_rollback->runRollback(_opCtx.get()));
// Check that the common point was saved.
auto truncateAfterPoint =
@@ -324,6 +340,37 @@ TEST_F(RollbackImplTest, RollbackReturnsBadStatusIfIncrementRollbackIDFails) {
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status.code());
}
+TEST_F(RollbackImplTest, RollbackCallsRecoverFromOplog) {
+ auto op = makeOpAndRecordId(1);
+ _remoteOplog->setOperations({op});
+ _localOplog->setOperations({op});
+
+ // Run rollback.
+ ASSERT_OK(_rollback->runRollback(_opCtx.get()));
+
+ // Make sure oplog recovery was executed.
+ ASSERT(_recoveredFromOplog);
+}
+
+TEST_F(RollbackImplTest, RollbackSkipsRecoverFromOplogWhenShutdownEarly) {
+ auto op = makeOpAndRecordId(1);
+ _remoteOplog->setOperations({op});
+ _localOplog->setOperations({op});
+
+ _onRecoverToStableTimestampFn = [this]() {
+ _recoveredToStableTimestamp = true;
+ _rollback->shutdown();
+ };
+
+ // Run rollback.
+ auto status = _rollback->runRollback(_opCtx.get());
+
+ // Make sure shutdown occurred before oplog recovery.
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _rollback->runRollback(_opCtx.get()));
+ ASSERT(_recoveredToStableTimestamp);
+ ASSERT_FALSE(_recoveredFromOplog);
+}
+
TEST_F(RollbackImplTest, RollbackSucceeds) {
auto op = makeOpAndRecordId(1);
_remoteOplog->setOperations({op});