summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2016-01-05 15:36:18 -0500
committerScott Hernandez <scotthernandez@gmail.com>2016-01-19 11:05:08 -0500
commit05a115fe4e87db76c16fab5d17e23ac45329994d (patch)
tree5b048cce174548a124a9c8fe4a3e4cfc03d702e0
parentd74c7ae40f82bd5bf6334c8114c0cf4948f60002 (diff)
downloadmongo-05a115fe4e87db76c16fab5d17e23ac45329994d.tar.gz
SERVER-21988: wait for applier before starting rollback
(cherry picked from commit 4846585c6a7f09a18ac8c313ca7b0cee405ad29c)
-rw-r--r--src/mongo/db/repl/bgsync.cpp86
-rw-r--r--src/mongo/db/repl/bgsync.h6
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp17
-rw-r--r--src/mongo/db/repl/rs_rollback.h3
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp43
5 files changed, 66 insertions, 89 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 6759bf85230..41b58658162 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
+#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/oplogreader.h"
@@ -160,7 +161,7 @@ BackgroundSync::BackgroundSync()
_lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0),
std::numeric_limits<long long>::max()),
_lastFetchedHash(0),
- _pause(true),
+ _stopped(true),
_replCoord(getGlobalReplicationCoordinator()),
_initialSyncRequestedFlag(false),
_indexPrefetchConfig(PREFETCH_ALL) {}
@@ -179,7 +180,7 @@ void BackgroundSync::shutdown() {
// Clear the buffer in case the producerThread is waiting in push() due to a full queue.
invariant(inShutdown());
clearBuffer();
- _pause = true;
+ _stopped = true;
}
void BackgroundSync::producerThread() {
@@ -209,14 +210,14 @@ void BackgroundSync::producerThread() {
void BackgroundSync::_producerThread() {
const MemberState state = _replCoord->getMemberState();
- // we want to pause when the state changes to primary
+ // Stop when the state changes to primary.
if (_replCoord->isWaitingForApplierToDrain() || state.primary()) {
- if (!isPaused()) {
+ if (!isStopped()) {
stop();
}
if (_replCoord->isWaitingForApplierToDrain()) {
- // Signal to consumers that we have entered the paused state if the signal isn't already
- // in the queue.
+ // Signal to consumers that we have entered the stopped state
+ // if the signal isn't already in the queue.
const boost::optional<BSONObj> lastObjectPushed = _buffer.lastObjectPushed();
if (!lastObjectPushed || !lastObjectPushed->isEmpty()) {
const BSONObj sentinelDoc;
@@ -241,10 +242,10 @@ void BackgroundSync::_producerThread() {
sleepsecs(1);
return;
}
- // we want to unpause when we're no longer primary
+ // we want to start when we're no longer primary
// start() also loads _lastOpTimeFetched, which we know is set from the "if"
OperationContextImpl txn;
- if (isPaused()) {
+ if (isStopped()) {
start(&txn);
}
@@ -295,7 +296,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
long long lastHashFetched;
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_pause) {
+ if (_stopped) {
return;
}
lastOpTimeFetched = _lastOpTimeFetched;
@@ -367,9 +368,9 @@ void BackgroundSync::_produce(OperationContext* txn) {
fetcher.wait();
LOG(1) << "fetcher stopped reading remote oplog on " << source;
- // If the background sync is paused after the fetcher is started, we need to
+ // If the background sync is stopped after the fetcher is started, we need to
// re-evaluate our sync source and oplog common point.
- if (isPaused()) {
+ if (isStopped()) {
return;
}
@@ -397,7 +398,37 @@ void BackgroundSync::_produce(OperationContext* txn) {
return connection->get();
};
- log() << "starting rollback: " << fetcherReturnStatus;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ lastOpTimeFetched = _lastOpTimeFetched;
+ }
+
+ log() << "Starting rollback due to " << fetcherReturnStatus;
+
+ // Wait till all buffered oplog entries have drained and been applied.
+ auto lastApplied = _replCoord->getMyLastOptime();
+ if (lastApplied != _lastOpTimeFetched) {
+ log() << "Waiting for all operations from " << lastApplied << " until "
+ << _lastOpTimeFetched << " to be applied before starting rollback.";
+ while (_lastOpTimeFetched > (lastApplied = _replCoord->getMyLastOptime())) {
+ sleepmillis(10);
+ if (isStopped() || inShutdown()) {
+ return;
+ }
+ }
+ }
+ // check that we are at minvalid, otherwise we cannot roll back as we may be in an
+ // inconsistent state
+ BatchBoundaries boundaries = getMinValid(txn);
+ if (!boundaries.start.isNull() || boundaries.end > lastApplied) {
+ fassertNoTrace(18750,
+ Status(ErrorCodes::UnrecoverableRollbackError,
+ str::stream()
+ << "need to rollback, but in inconsistent state. "
+ << "minvalid: " << boundaries.end.toString()
+ << " > our last optime: " << lastApplied.toString()));
+ }
+
_rollback(txn, source, getConnection);
stop();
} else if (!fetcherReturnStatus.isOK()) {
@@ -422,8 +453,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
return;
}
- // Check if we have been paused.
- if (isPaused()) {
+ // Check if we have been stopped.
+ if (isStopped()) {
return;
}
@@ -491,7 +522,15 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
// The count of the bytes of the documents read off the network.
int networkDocumentBytes = 0;
- Timestamp lastTS = _lastOpTimeFetched.getTimestamp();
+ Timestamp lastTS;
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ // If we are stopped then return without queueing this batch to apply.
+ if (_stopped) {
+ return;
+ }
+ lastTS = _lastOpTimeFetched.getTimestamp();
+ }
int count = 0;
for (auto&& doc : documents) {
networkDocumentBytes += doc.objsize();
@@ -554,7 +593,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
stdx::unique_lock<stdx::mutex> lock(_mutex);
_lastFetchedHash = lastDoc["h"].numberLong();
_lastOpTimeFetched = fassertStatusOK(28770, OpTime::parseFromOplogEntry(lastDoc));
- LOG(3) << "batch lastOpTimeFetched: " << _lastOpTimeFetched;
+ LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched;
}
}
@@ -581,7 +620,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
}
// If we are transitioning to primary state, we need to leave
- // this loop in order to go into bgsync-pause mode.
+ // this loop in order to go into bgsync-stop mode.
if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary()) {
return;
}
@@ -591,8 +630,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
return;
}
- // Check if we have been paused.
- if (isPaused()) {
+ // Check if we have been stopped.
+ if (isStopped()) {
return;
}
@@ -647,7 +686,6 @@ void BackgroundSync::_rollback(OperationContext* txn,
// Abort only when syncRollback detects we are in a unrecoverable state.
// In other cases, we log the message contained in the error status and retry later.
auto status = syncRollback(txn,
- _replCoord->getMyLastOptime(),
OplogInterfaceLocal(txn, rsOplogName),
RollbackSourceImpl(getConnection, source, rsOplogName),
_replCoord);
@@ -677,7 +715,7 @@ void BackgroundSync::cancelFetcher() {
void BackgroundSync::stop() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _pause = true;
+ _stopped = true;
_syncSourceHost = HostAndPort();
_lastOpTimeFetched = OpTime();
_lastFetchedHash = 0;
@@ -688,7 +726,7 @@ void BackgroundSync::start(OperationContext* txn) {
long long lastFetchedHash = _readLastAppliedHash(txn);
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _pause = false;
+ _stopped = false;
// reset _last fields with current oplog data
_lastOpTimeFetched = _replCoord->getMyLastOptime();
@@ -697,9 +735,9 @@ void BackgroundSync::start(OperationContext* txn) {
LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash;
}
-bool BackgroundSync::isPaused() const {
+bool BackgroundSync::isStopped() const {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _pause;
+ return _stopped;
}
void BackgroundSync::clearBuffer() {
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 0afe7728af9..5d6113a8efb 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -88,7 +88,7 @@ public:
void shutdown();
- bool isPaused() const;
+ bool isStopped() const;
virtual ~BackgroundSync() {}
@@ -152,8 +152,8 @@ private:
// a secondary.
long long _lastFetchedHash;
- // if produce thread should be running
- bool _pause;
+ // if producer thread should not be running
+ bool _stopped;
HostAndPort _syncSourceHost;
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 6b11660a44b..be9bf0e1d1c 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -878,7 +878,6 @@ Status _syncRollback(OperationContext* txn,
} // namespace
Status syncRollback(OperationContext* txn,
- const OpTime& lastOpTimeApplied,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord,
@@ -886,20 +885,6 @@ Status syncRollback(OperationContext* txn,
invariant(txn);
invariant(replCoord);
- // check that we are at minvalid, otherwise we cannot rollback as we may be in an
- // inconsistent state
- {
- BatchBoundaries boundaries = getMinValid(txn);
- if (!boundaries.start.isNull() || boundaries.end > lastOpTimeApplied) {
- severe() << "need to rollback, but in inconsistent state" << endl;
- return Status(ErrorCodes::UnrecoverableRollbackError,
- str::stream() << "need to rollback, but in inconsistent state. "
- << "minvalid: " << boundaries.end.toString()
- << " > our last optime: " << lastOpTimeApplied.toString(),
- 18750);
- }
- }
-
log() << "beginning rollback" << rsLog;
DisableDocumentValidation validationDisabler(txn);
@@ -911,12 +896,10 @@ Status syncRollback(OperationContext* txn,
}
Status syncRollback(OperationContext* txn,
- const OpTime& lastOpTimeWritten,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord) {
return syncRollback(txn,
- lastOpTimeWritten,
localOplog,
rollbackSource,
replCoord,
diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h
index 793521393a9..c88e9fd27c0 100644
--- a/src/mongo/db/repl/rs_rollback.h
+++ b/src/mongo/db/repl/rs_rollback.h
@@ -63,7 +63,6 @@ class RollbackSource;
* while our rollback is in progress.
*
* @param txn Used to read and write from this node's databases
- * @param lastOpTimeWritten The last OpTime applied by the applier
* @param localOplog reads the oplog on this server.
* @param rollbackSource interface for sync source:
* provides oplog; and
@@ -76,14 +75,12 @@ class RollbackSource;
using SleepSecondsFn = stdx::function<void(Seconds)>;
Status syncRollback(OperationContext* txn,
- const OpTime& lastOpTimeWritten,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord,
const SleepSecondsFn& sleepSecondsFn);
Status syncRollback(OperationContext* txn,
- const OpTime& lastOpTimeWritten,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord);
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index 4296f9d62e7..fc3a1c40014 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -173,14 +173,13 @@ TEST_F(RSRollbackTest, InconsistentMinValid) {
repl::setMinValid(_txn.get(),
{OpTime(Timestamp(Seconds(0), 0), 0), OpTime(Timestamp(Seconds(1), 0), 0)});
auto status = syncRollback(_txn.get(),
- OpTime(),
OplogInterfaceMock(kEmptyMockOperations),
RollbackSourceMock(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
_coordinator,
noSleep);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
- ASSERT_EQUALS(18750, status.location());
+ ASSERT_EQUALS(18752, status.location());
}
TEST_F(RSRollbackTest, SetFollowerModeFailed) {
@@ -200,7 +199,6 @@ TEST_F(RSRollbackTest, SetFollowerModeFailed) {
ASSERT_EQUALS(ErrorCodes::OperationFailed,
syncRollback(_txn.get(),
- OpTime(),
OplogInterfaceMock(kEmptyMockOperations),
RollbackSourceMock(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
@@ -215,7 +213,6 @@ TEST_F(RSRollbackTest, OplogStartMissing) {
ASSERT_EQUALS(
ErrorCodes::OplogStartMissing,
syncRollback(_txn.get(),
- OpTime(),
OplogInterfaceMock(kEmptyMockOperations),
RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
operation,
@@ -229,7 +226,6 @@ TEST_F(RSRollbackTest, NoRemoteOpLog) {
auto operation =
std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId());
auto status = syncRollback(_txn.get(),
- ts,
OplogInterfaceMock({operation}),
RollbackSourceMock(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
@@ -252,7 +248,6 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) {
}
};
ASSERT_THROWS_CODE(syncRollback(_txn.get(),
- ts,
OplogInterfaceMock({operation}),
RollbackSourceLocal(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
@@ -269,7 +264,6 @@ TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) {
std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId(1));
ASSERT_OK(
syncRollback(_txn.get(),
- ts,
OplogInterfaceMock({operation}),
RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
operation,
@@ -338,9 +332,7 @@ int _testRollbackDelete(OperationContext* txn,
std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
commonOperation,
})));
- OpTime opTime(deleteOperation.first["ts"].timestamp(), deleteOperation.first["h"].Long());
ASSERT_OK(syncRollback(txn,
- opTime,
OplogInterfaceMock({deleteOperation, commonOperation}),
rollbackSource,
coordinator,
@@ -413,11 +405,8 @@ TEST_F(RSRollbackTest, RollbackInsertDocumentWithNoId) {
RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
commonOperation,
})));
- OpTime opTime(insertDocumentOperation.first["ts"].timestamp(),
- insertDocumentOperation.first["h"].Long());
startCapturingLogMessages();
auto status = syncRollback(_txn.get(),
- opTime,
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
@@ -472,14 +461,11 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommand) {
RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
commonOperation,
})));
- OpTime opTime(insertDocumentOperation.first["ts"].timestamp(),
- insertDocumentOperation.first["h"].Long());
// Repeat index creation operation and confirm that rollback attempts to drop index just once.
// This can happen when an index is re-created with different options.
startCapturingLogMessages();
ASSERT_OK(syncRollback(
_txn.get(),
- opTime,
OplogInterfaceMock({insertDocumentOperation, insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
@@ -535,11 +521,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandIndexNotInCatalog) {
RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
commonOperation,
})));
- OpTime opTime(insertDocumentOperation.first["ts"].timestamp(),
- insertDocumentOperation.first["h"].Long());
startCapturingLogMessages();
ASSERT_OK(syncRollback(_txn.get(),
- opTime,
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
@@ -585,11 +568,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingNamespace) {
RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
commonOperation,
})));
- OpTime opTime(insertDocumentOperation.first["ts"].timestamp(),
- insertDocumentOperation.first["h"].Long());
startCapturingLogMessages();
auto status = syncRollback(_txn.get(),
- opTime,
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
@@ -632,11 +612,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandInvalidNamespace) {
RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
commonOperation,
})));
- OpTime opTime(insertDocumentOperation.first["ts"].timestamp(),
- insertDocumentOperation.first["h"].Long());
startCapturingLogMessages();
auto status = syncRollback(_txn.get(),
- opTime,
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
@@ -678,11 +655,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingIndexName) {
RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
commonOperation,
})));
- OpTime opTime(insertDocumentOperation.first["ts"].timestamp(),
- insertDocumentOperation.first["h"].Long());
startCapturingLogMessages();
auto status = syncRollback(_txn.get(),
- opTime,
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
@@ -714,11 +688,8 @@ TEST_F(RSRollbackTest, RollbackUnknownCommand) {
ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t"));
wuow.commit();
}
- OpTime opTime(unknownCommandOperation.first["ts"].timestamp(),
- unknownCommandOperation.first["h"].Long());
auto status =
syncRollback(_txn.get(),
- opTime,
OplogInterfaceMock({unknownCommandOperation, commonOperation}),
RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
commonOperation,
@@ -755,10 +726,7 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) {
commonOperation,
})));
_createCollection(_txn.get(), "test.t", CollectionOptions());
- OpTime opTime(dropCollectionOperation.first["ts"].timestamp(),
- dropCollectionOperation.first["h"].Long());
ASSERT_OK(syncRollback(_txn.get(),
- opTime,
OplogInterfaceMock({dropCollectionOperation, commonOperation}),
rollbackSource,
_coordinator,
@@ -782,10 +750,7 @@ TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) {
commonOperation,
})));
_createCollection(_txn.get(), "test.t", CollectionOptions());
- OpTime opTime(createCollectionOperation.first["ts"].timestamp(),
- createCollectionOperation.first["h"].Long());
ASSERT_OK(syncRollback(_txn.get(),
- opTime,
OplogInterfaceMock({createCollectionOperation, commonOperation}),
rollbackSource,
_coordinator,
@@ -825,11 +790,8 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) {
commonOperation,
})));
_createCollection(_txn.get(), "test.t", CollectionOptions());
- OpTime opTime(collectionModificationOperation.first["ts"].timestamp(),
- collectionModificationOperation.first["h"].Long());
startCapturingLogMessages();
ASSERT_OK(syncRollback(_txn.get(),
- opTime,
OplogInterfaceMock({collectionModificationOperation, commonOperation}),
rollbackSource,
_coordinator,
@@ -867,11 +829,8 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOpt
commonOperation,
})));
_createCollection(_txn.get(), "test.t", CollectionOptions());
- OpTime opTime(collectionModificationOperation.first["ts"].timestamp(),
- collectionModificationOperation.first["h"].Long());
auto status =
syncRollback(_txn.get(),
- opTime,
OplogInterfaceMock({collectionModificationOperation, commonOperation}),
rollbackSource,
_coordinator,