diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2016-01-05 15:36:18 -0500 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2016-01-19 11:05:08 -0500 |
commit | 05a115fe4e87db76c16fab5d17e23ac45329994d (patch) | |
tree | 5b048cce174548a124a9c8fe4a3e4cfc03d702e0 | |
parent | d74c7ae40f82bd5bf6334c8114c0cf4948f60002 (diff) | |
download | mongo-05a115fe4e87db76c16fab5d17e23ac45329994d.tar.gz |
SERVER-21988: wait for applier before starting rollback
(cherry picked from commit 4846585c6a7f09a18ac8c313ca7b0cee405ad29c)
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 86 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback_test.cpp | 43 |
5 files changed, 66 insertions, 89 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 6759bf85230..41b58658162 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -43,6 +43,7 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/oplogreader.h" @@ -160,7 +161,7 @@ BackgroundSync::BackgroundSync() _lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0), std::numeric_limits<long long>::max()), _lastFetchedHash(0), - _pause(true), + _stopped(true), _replCoord(getGlobalReplicationCoordinator()), _initialSyncRequestedFlag(false), _indexPrefetchConfig(PREFETCH_ALL) {} @@ -179,7 +180,7 @@ void BackgroundSync::shutdown() { // Clear the buffer in case the producerThread is waiting in push() due to a full queue. invariant(inShutdown()); clearBuffer(); - _pause = true; + _stopped = true; } void BackgroundSync::producerThread() { @@ -209,14 +210,14 @@ void BackgroundSync::producerThread() { void BackgroundSync::_producerThread() { const MemberState state = _replCoord->getMemberState(); - // we want to pause when the state changes to primary + // Stop when the state changes to primary. if (_replCoord->isWaitingForApplierToDrain() || state.primary()) { - if (!isPaused()) { + if (!isStopped()) { stop(); } if (_replCoord->isWaitingForApplierToDrain()) { - // Signal to consumers that we have entered the paused state if the signal isn't already - // in the queue. + // Signal to consumers that we have entered the stopped state + // if the signal isn't already in the queue. const boost::optional<BSONObj> lastObjectPushed = _buffer.lastObjectPushed(); if (!lastObjectPushed || !lastObjectPushed->isEmpty()) { const BSONObj sentinelDoc; @@ -241,10 +242,10 @@ void BackgroundSync::_producerThread() { sleepsecs(1); return; } - // we want to unpause when we're no longer primary + // we want to start when we're no longer primary // start() also loads _lastOpTimeFetched, which we know is set from the "if" OperationContextImpl txn; - if (isPaused()) { + if (isStopped()) { start(&txn); } @@ -295,7 +296,7 @@ void BackgroundSync::_produce(OperationContext* txn) { long long lastHashFetched; { stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_pause) { + if (_stopped) { return; } lastOpTimeFetched = _lastOpTimeFetched; @@ -367,9 +368,9 @@ void BackgroundSync::_produce(OperationContext* txn) { fetcher.wait(); LOG(1) << "fetcher stopped reading remote oplog on " << source; - // If the background sync is paused after the fetcher is started, we need to + // If the background sync is stopped after the fetcher is started, we need to // re-evaluate our sync source and oplog common point. - if (isPaused()) { + if (isStopped()) { return; } @@ -397,7 +398,37 @@ void BackgroundSync::_produce(OperationContext* txn) { return connection->get(); }; - log() << "starting rollback: " << fetcherReturnStatus; + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + lastOpTimeFetched = _lastOpTimeFetched; + } + + log() << "Starting rollback due to " << fetcherReturnStatus; + + // Wait till all buffered oplog entries have drained and been applied. + auto lastApplied = _replCoord->getMyLastOptime(); + if (lastApplied != _lastOpTimeFetched) { + log() << "Waiting for all operations from " << lastApplied << " until " + << _lastOpTimeFetched << " to be applied before starting rollback."; + while (_lastOpTimeFetched > (lastApplied = _replCoord->getMyLastOptime())) { + sleepmillis(10); + if (isStopped() || inShutdown()) { + return; + } + } + } + // check that we are at minvalid, otherwise we cannot roll back as we may be in an + // inconsistent state + BatchBoundaries boundaries = getMinValid(txn); + if (!boundaries.start.isNull() || boundaries.end > lastApplied) { + fassertNoTrace(18750, + Status(ErrorCodes::UnrecoverableRollbackError, + str::stream() + << "need to rollback, but in inconsistent state. " + << "minvalid: " << boundaries.end.toString() + << " > our last optime: " << lastApplied.toString())); + } + _rollback(txn, source, getConnection); stop(); } else if (!fetcherReturnStatus.isOK()) { @@ -422,8 +453,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& return; } - // Check if we have been paused. - if (isPaused()) { + // Check if we have been stopped. + if (isStopped()) { return; } @@ -491,7 +522,15 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& // The count of the bytes of the documents read off the network. int networkDocumentBytes = 0; - Timestamp lastTS = _lastOpTimeFetched.getTimestamp(); + Timestamp lastTS; + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + // If we are stopped then return without queueing this batch to apply. + if (_stopped) { + return; + } + lastTS = _lastOpTimeFetched.getTimestamp(); + } int count = 0; for (auto&& doc : documents) { networkDocumentBytes += doc.objsize(); @@ -554,7 +593,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& stdx::unique_lock<stdx::mutex> lock(_mutex); _lastFetchedHash = lastDoc["h"].numberLong(); _lastOpTimeFetched = fassertStatusOK(28770, OpTime::parseFromOplogEntry(lastDoc)); - LOG(3) << "batch lastOpTimeFetched: " << _lastOpTimeFetched; + LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched; } } @@ -581,7 +620,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& } // If we are transitioning to primary state, we need to leave - // this loop in order to go into bgsync-pause mode. + // this loop in order to go into bgsync-stop mode. if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary()) { return; } @@ -591,8 +630,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& return; } - // Check if we have been paused. - if (isPaused()) { + // Check if we have been stopped. + if (isStopped()) { return; } @@ -647,7 +686,6 @@ void BackgroundSync::_rollback(OperationContext* txn, // Abort only when syncRollback detects we are in a unrecoverable state. // In other cases, we log the message contained in the error status and retry later. auto status = syncRollback(txn, - _replCoord->getMyLastOptime(), OplogInterfaceLocal(txn, rsOplogName), RollbackSourceImpl(getConnection, source, rsOplogName), _replCoord); @@ -677,7 +715,7 @@ void BackgroundSync::cancelFetcher() { void BackgroundSync::stop() { stdx::lock_guard<stdx::mutex> lock(_mutex); - _pause = true; + _stopped = true; _syncSourceHost = HostAndPort(); _lastOpTimeFetched = OpTime(); _lastFetchedHash = 0; @@ -688,7 +726,7 @@ void BackgroundSync::start(OperationContext* txn) { long long lastFetchedHash = _readLastAppliedHash(txn); stdx::lock_guard<stdx::mutex> lk(_mutex); - _pause = false; + _stopped = false; // reset _last fields with current oplog data _lastOpTimeFetched = _replCoord->getMyLastOptime(); @@ -697,9 +735,9 @@ void BackgroundSync::start(OperationContext* txn) { LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash; } -bool BackgroundSync::isPaused() const { +bool BackgroundSync::isStopped() const { stdx::lock_guard<stdx::mutex> lock(_mutex); - return _pause; + return _stopped; } void BackgroundSync::clearBuffer() { diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 0afe7728af9..5d6113a8efb 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -88,7 +88,7 @@ public: void shutdown(); - bool isPaused() const; + bool isStopped() const; virtual ~BackgroundSync() {} @@ -152,8 +152,8 @@ private: // a secondary. long long _lastFetchedHash; - // if produce thread should be running - bool _pause; + // if producer thread should not be running + bool _stopped; HostAndPort _syncSourceHost; diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 6b11660a44b..be9bf0e1d1c 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -878,7 +878,6 @@ Status _syncRollback(OperationContext* txn, } // namespace Status syncRollback(OperationContext* txn, - const OpTime& lastOpTimeApplied, const OplogInterface& localOplog, const RollbackSource& rollbackSource, ReplicationCoordinator* replCoord, @@ -886,20 +885,6 @@ Status syncRollback(OperationContext* txn, invariant(txn); invariant(replCoord); - // check that we are at minvalid, otherwise we cannot rollback as we may be in an - // inconsistent state - { - BatchBoundaries boundaries = getMinValid(txn); - if (!boundaries.start.isNull() || boundaries.end > lastOpTimeApplied) { - severe() << "need to rollback, but in inconsistent state" << endl; - return Status(ErrorCodes::UnrecoverableRollbackError, - str::stream() << "need to rollback, but in inconsistent state. " - << "minvalid: " << boundaries.end.toString() - << " > our last optime: " << lastOpTimeApplied.toString(), - 18750); - } - } - log() << "beginning rollback" << rsLog; DisableDocumentValidation validationDisabler(txn); @@ -911,12 +896,10 @@ Status syncRollback(OperationContext* txn, } Status syncRollback(OperationContext* txn, - const OpTime& lastOpTimeWritten, const OplogInterface& localOplog, const RollbackSource& rollbackSource, ReplicationCoordinator* replCoord) { return syncRollback(txn, - lastOpTimeWritten, localOplog, rollbackSource, replCoord, diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h index 793521393a9..c88e9fd27c0 100644 --- a/src/mongo/db/repl/rs_rollback.h +++ b/src/mongo/db/repl/rs_rollback.h @@ -63,7 +63,6 @@ class RollbackSource; * while our rollback is in progress. * * @param txn Used to read and write from this node's databases - * @param lastOpTimeWritten The last OpTime applied by the applier * @param localOplog reads the oplog on this server. * @param rollbackSource interface for sync source: * provides oplog; and @@ -76,14 +75,12 @@ class RollbackSource; using SleepSecondsFn = stdx::function<void(Seconds)>; Status syncRollback(OperationContext* txn, - const OpTime& lastOpTimeWritten, const OplogInterface& localOplog, const RollbackSource& rollbackSource, ReplicationCoordinator* replCoord, const SleepSecondsFn& sleepSecondsFn); Status syncRollback(OperationContext* txn, - const OpTime& lastOpTimeWritten, const OplogInterface& localOplog, const RollbackSource& rollbackSource, ReplicationCoordinator* replCoord); diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 4296f9d62e7..fc3a1c40014 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -173,14 +173,13 @@ TEST_F(RSRollbackTest, InconsistentMinValid) { repl::setMinValid(_txn.get(), {OpTime(Timestamp(Seconds(0), 0), 0), OpTime(Timestamp(Seconds(1), 0), 0)}); auto status = syncRollback(_txn.get(), - OpTime(), OplogInterfaceMock(kEmptyMockOperations), RollbackSourceMock(std::unique_ptr<OplogInterface>( new OplogInterfaceMock(kEmptyMockOperations))), _coordinator, noSleep); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); - ASSERT_EQUALS(18750, status.location()); + ASSERT_EQUALS(18752, status.location()); } TEST_F(RSRollbackTest, SetFollowerModeFailed) { @@ -200,7 +199,6 @@ TEST_F(RSRollbackTest, SetFollowerModeFailed) { ASSERT_EQUALS(ErrorCodes::OperationFailed, syncRollback(_txn.get(), - OpTime(), OplogInterfaceMock(kEmptyMockOperations), RollbackSourceMock(std::unique_ptr<OplogInterface>( new OplogInterfaceMock(kEmptyMockOperations))), @@ -215,7 +213,6 @@ TEST_F(RSRollbackTest, OplogStartMissing) { ASSERT_EQUALS( ErrorCodes::OplogStartMissing, syncRollback(_txn.get(), - OpTime(), OplogInterfaceMock(kEmptyMockOperations), RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ operation, @@ -229,7 +226,6 @@ TEST_F(RSRollbackTest, NoRemoteOpLog) { auto operation = std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); auto status = syncRollback(_txn.get(), - ts, OplogInterfaceMock({operation}), RollbackSourceMock(std::unique_ptr<OplogInterface>( new OplogInterfaceMock(kEmptyMockOperations))), @@ -252,7 +248,6 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) { } }; ASSERT_THROWS_CODE(syncRollback(_txn.get(), - ts, OplogInterfaceMock({operation}), RollbackSourceLocal(std::unique_ptr<OplogInterface>( new OplogInterfaceMock(kEmptyMockOperations))), @@ -269,7 +264,6 @@ TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) { std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId(1)); ASSERT_OK( syncRollback(_txn.get(), - ts, OplogInterfaceMock({operation}), RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ operation, @@ -338,9 +332,7 @@ int _testRollbackDelete(OperationContext* txn, std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ commonOperation, }))); - OpTime opTime(deleteOperation.first["ts"].timestamp(), deleteOperation.first["h"].Long()); ASSERT_OK(syncRollback(txn, - opTime, OplogInterfaceMock({deleteOperation, commonOperation}), rollbackSource, coordinator, @@ -413,11 +405,8 @@ TEST_F(RSRollbackTest, RollbackInsertDocumentWithNoId) { RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ commonOperation, }))); - OpTime opTime(insertDocumentOperation.first["ts"].timestamp(), - insertDocumentOperation.first["h"].Long()); startCapturingLogMessages(); auto status = syncRollback(_txn.get(), - opTime, OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, @@ -472,14 +461,11 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommand) { RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ commonOperation, }))); - OpTime opTime(insertDocumentOperation.first["ts"].timestamp(), - insertDocumentOperation.first["h"].Long()); // Repeat index creation operation and confirm that rollback attempts to drop index just once. // This can happen when an index is re-created with different options. startCapturingLogMessages(); ASSERT_OK(syncRollback( _txn.get(), - opTime, OplogInterfaceMock({insertDocumentOperation, insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, @@ -535,11 +521,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandIndexNotInCatalog) { RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ commonOperation, }))); - OpTime opTime(insertDocumentOperation.first["ts"].timestamp(), - insertDocumentOperation.first["h"].Long()); startCapturingLogMessages(); ASSERT_OK(syncRollback(_txn.get(), - opTime, OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, @@ -585,11 +568,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingNamespace) { RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ commonOperation, }))); - OpTime opTime(insertDocumentOperation.first["ts"].timestamp(), - insertDocumentOperation.first["h"].Long()); startCapturingLogMessages(); auto status = syncRollback(_txn.get(), - opTime, OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, @@ -632,11 +612,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandInvalidNamespace) { RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ commonOperation, }))); - OpTime opTime(insertDocumentOperation.first["ts"].timestamp(), - insertDocumentOperation.first["h"].Long()); startCapturingLogMessages(); auto status = syncRollback(_txn.get(), - opTime, OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, @@ -678,11 +655,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingIndexName) { RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ commonOperation, }))); - OpTime opTime(insertDocumentOperation.first["ts"].timestamp(), - insertDocumentOperation.first["h"].Long()); startCapturingLogMessages(); auto status = syncRollback(_txn.get(), - opTime, OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, @@ -714,11 +688,8 @@ TEST_F(RSRollbackTest, RollbackUnknownCommand) { ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t")); wuow.commit(); } - OpTime opTime(unknownCommandOperation.first["ts"].timestamp(), - unknownCommandOperation.first["h"].Long()); auto status = syncRollback(_txn.get(), - opTime, OplogInterfaceMock({unknownCommandOperation, commonOperation}), RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ commonOperation, @@ -755,10 +726,7 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) { commonOperation, }))); _createCollection(_txn.get(), "test.t", CollectionOptions()); - OpTime opTime(dropCollectionOperation.first["ts"].timestamp(), - dropCollectionOperation.first["h"].Long()); ASSERT_OK(syncRollback(_txn.get(), - opTime, OplogInterfaceMock({dropCollectionOperation, commonOperation}), rollbackSource, _coordinator, @@ -782,10 +750,7 @@ TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) { commonOperation, }))); _createCollection(_txn.get(), "test.t", CollectionOptions()); - OpTime opTime(createCollectionOperation.first["ts"].timestamp(), - createCollectionOperation.first["h"].Long()); ASSERT_OK(syncRollback(_txn.get(), - opTime, OplogInterfaceMock({createCollectionOperation, commonOperation}), rollbackSource, _coordinator, @@ -825,11 +790,8 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) { commonOperation, }))); _createCollection(_txn.get(), "test.t", CollectionOptions()); - OpTime opTime(collectionModificationOperation.first["ts"].timestamp(), - collectionModificationOperation.first["h"].Long()); startCapturingLogMessages(); ASSERT_OK(syncRollback(_txn.get(), - opTime, OplogInterfaceMock({collectionModificationOperation, commonOperation}), rollbackSource, _coordinator, @@ -867,11 +829,8 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOpt commonOperation, }))); _createCollection(_txn.get(), "test.t", CollectionOptions()); - OpTime opTime(collectionModificationOperation.first["ts"].timestamp(), - collectionModificationOperation.first["h"].Long()); auto status = syncRollback(_txn.get(), - opTime, OplogInterfaceMock({collectionModificationOperation, commonOperation}), rollbackSource, _coordinator, |