summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-11-15 15:24:22 -0500
committerMathias Stearn <mathias@10gen.com>2017-01-03 16:02:19 -0500
commit0b76764eac7651ddba4c82c504aa7e8d785087c2 (patch)
treef90fce58d2781a48afaee696ee3fb9e6f8fefedc /src/mongo
parent506c8af1269c76fcd730e121e37b82a18347ac70 (diff)
downloadmongo-0b76764eac7651ddba4c82c504aa7e8d785087c2.tar.gz
SERVER-27050 Ensure upstream node doesn't roll back after checking MinValid
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/bgsync.cpp109
-rw-r--r--src/mongo/db/repl/bgsync.h14
-rw-r--r--src/mongo/db/repl/data_replicator.cpp11
-rw-r--r--src/mongo/db/repl/data_replicator.h9
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp6
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h6
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp20
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp21
-rw-r--r--src/mongo/db/repl/rs_rollback.h2
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp19
-rw-r--r--src/mongo/db/repl/sync_source_resolver.cpp66
-rw-r--r--src/mongo/db/repl/sync_source_resolver.h20
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp101
13 files changed, 331 insertions, 73 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index d5fdd001921..84676fd60c8 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
@@ -287,39 +288,32 @@ void BackgroundSync::_produce(OperationContext* txn) {
OpTime lastOpTimeFetched;
HostAndPort source;
SyncSourceResolverResponse syncSourceResp;
- SyncSourceResolver* syncSourceResolver;
- OpTime minValid;
- OpTime minValidSaved;
- if (_replCoord->getMemberState().recovering()) {
- minValidSaved = StorageInterface::get(txn)->getMinValid(txn);
- }
{
+ const OpTime minValidSaved = StorageInterface::get(txn)->getMinValid(txn);
+
stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (minValidSaved > _lastOpTimeFetched) {
- minValid = minValidSaved;
- }
+ const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime();
lastOpTimeFetched = _lastOpTimeFetched;
_syncSourceHost = HostAndPort();
_syncSourceResolver = stdx::make_unique<SyncSourceResolver>(
_replicationCoordinatorExternalState->getTaskExecutor(),
_replCoord,
lastOpTimeFetched,
- minValid,
+ requiredOpTime,
[&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; });
- syncSourceResolver = _syncSourceResolver.get();
}
// This may deadlock if called inside the mutex because SyncSourceResolver::startup() calls
// ReplicationCoordinator::chooseNewSyncSource(). ReplicationCoordinatorImpl's mutex has to
// acquired before BackgroundSync's.
// It is safe to call startup() outside the mutex on this instance of SyncSourceResolver because
- // we do not destroy this instance outside of this function.
+ // we do not destroy this instance outside of this function which is only called from a single
+ // thread.
auto status = _syncSourceResolver->startup();
if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) {
return;
}
fassertStatusOK(40349, status);
- syncSourceResolver->join();
- syncSourceResolver = nullptr;
+ _syncSourceResolver->join();
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
_syncSourceResolver.reset();
@@ -388,6 +382,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
Status fetcherReturnStatus = Status::OK();
DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState(
_replCoord, _replicationCoordinatorExternalState, this);
+ auto rbidCopyForFetcher = syncSourceResp.rbid; // OplogFetcher's callback modifies this.
OplogFetcher* oplogFetcher;
try {
auto executor = _replicationCoordinatorExternalState->getTaskExecutor();
@@ -410,7 +405,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
- stdx::placeholders::_3),
+ stdx::placeholders::_3,
+ &rbidCopyForFetcher),
onOplogFetcherShutdownCallbackFn);
oplogFetcher = _oplogFetcher.get();
} catch (const mongo::DBException& ex) {
@@ -484,20 +480,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
}
- // check that we are at minvalid, otherwise we cannot roll back as we may be in an
- // inconsistent state
- const auto minValid = StorageInterface::get(txn)->getMinValid(txn);
- if (lastApplied < minValid) {
- fassertNoTrace(18750,
- Status(ErrorCodes::UnrecoverableRollbackError,
- str::stream() << "need to rollback, but in inconsistent state. "
- << "minvalid: "
- << minValid.toString()
- << " > our last optime: "
- << lastApplied.toString()));
- }
- _rollback(txn, source, getConnection);
+ _rollback(txn, source, syncSourceResp.rbid, getConnection);
stop();
} else if (fetcherReturnStatus == ErrorCodes::InvalidBSON) {
Seconds blacklistDuration(60);
@@ -510,14 +494,57 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
-void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
- Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo& info) {
+Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info,
+ boost::optional<int>* requiredRBID) {
+ // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back
+ // since that could cause it to not have our required minValid point. The cursor will be killed
+ // if the upstream node rolls back so we don't need to keep checking. This must be blocking
+ // since the Fetcher doesn't give us a way to defer sending the getmores after we return.
+ if (*requiredRBID) {
+ auto rbidStatus = Status(ErrorCodes::InternalError, "");
+ auto handle =
+ _replicationCoordinatorExternalState->getTaskExecutor()->scheduleRemoteCommand(
+ {getSyncTarget(), "admin", BSON("replSetGetRBID" << 1), nullptr},
+ [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) {
+ rbidStatus = rbidReply.response.status;
+ if (!rbidStatus.isOK())
+ return;
+
+ rbidStatus = getStatusFromCommandResult(rbidReply.response.data);
+ if (!rbidStatus.isOK())
+ return;
+
+ const auto rbidElem = rbidReply.response.data["rbid"];
+ if (rbidElem.type() != NumberInt) {
+ rbidStatus = Status(ErrorCodes::BadValue,
+ str::stream() << "Upstream node returned an "
+ << "rbid with invalid type "
+ << rbidElem.type());
+ return;
+ }
+ if (rbidElem.Int() != **requiredRBID) {
+ rbidStatus = Status(ErrorCodes::BadValue,
+ "Upstream node rolled back after verifying "
+ "that it had our MinValid point. Retrying.");
+ }
+ });
+ if (!handle.isOK())
+ return handle.getStatus();
+
+ _replicationCoordinatorExternalState->getTaskExecutor()->wait(handle.getValue());
+ if (!rbidStatus.isOK())
+ return rbidStatus;
+
+ requiredRBID->reset(); // Don't come back to this block while on this cursor.
+ }
+
// If this is the first batch of operations returned from the query, "toApplyDocumentCount" will
// be one fewer than "networkDocumentCount" because the first document (which was applied
// previously) is skipped.
if (info.toApplyDocumentCount == 0) {
- return; // Nothing to do.
+ return Status::OK(); // Nothing to do.
}
auto txn = cc().makeOperationContext();
@@ -532,7 +559,7 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
// buffer.
stdx::unique_lock<stdx::mutex> lock(_mutex);
if (_inShutdown) {
- return;
+ return Status::OK();
}
OCCASIONALLY {
@@ -561,6 +588,8 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
// The inference here is basically if the batch is really small, we are "caught up".
sleepmillis(kSleepToAllowBatchingMillis);
}
+
+ return Status::OK();
}
bool BackgroundSync::peek(OperationContext* txn, BSONObj* op) {
@@ -589,26 +618,16 @@ void BackgroundSync::consume(OperationContext* txn) {
void BackgroundSync::_rollback(OperationContext* txn,
const HostAndPort& source,
+ boost::optional<int> requiredRBID,
stdx::function<DBClientBase*()> getConnection) {
// Abort only when syncRollback detects we are in a unrecoverable state.
// In other cases, we log the message contained in the error status and retry later.
auto status = syncRollback(txn,
OplogInterfaceLocal(txn, rsOplogName),
RollbackSourceImpl(getConnection, source, rsOplogName),
+ requiredRBID,
_replCoord);
if (status.isOK()) {
- // When the syncTail thread sees there is no new data by adding something to the buffer.
- _signalNoNewDataForApplier(txn);
- // Wait until the buffer is empty.
- // This is an indication that syncTail has removed the sentinal marker from the buffer
- // and reset its local lastAppliedOpTime via the replCoord.
- while (!_oplogBuffer->isEmpty()) {
- sleepmillis(10);
- if (inShutdown()) {
- return;
- }
- }
-
// At this point we are about to leave rollback. Before we do, wait for any writes done
// as part of rollback to be durable, and then do any necessary checks that we didn't
// wind up rolling back something illegal. We must wait for the rollback to be durable
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 312acb15bc7..8177509e049 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -147,10 +147,13 @@ private:
/**
* Checks current background sync state before pushing operations into blocking queue and
* updating metrics. If the queue is full, might block.
+ *
+ * requiredRBID is reset to empty after the first call.
*/
- void _enqueueDocuments(Fetcher::Documents::const_iterator begin,
- Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo& info);
+ Status _enqueueDocuments(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info,
+ boost::optional<int>* requiredRBID);
/**
* Executes a rollback.
@@ -158,6 +161,7 @@ private:
*/
void _rollback(OperationContext* txn,
const HostAndPort& source,
+ boost::optional<int> requiredRBID,
stdx::function<DBClientBase*()> getConnection);
// restart syncing
@@ -195,11 +199,11 @@ private:
HostAndPort _syncSourceHost;
// Current sync source resolver validating sync source candidates.
- // Owned by us.
+ // Pointer may be read on any thread that locks _mutex or unlocked on the BGSync thread. It can
+ // only be written to by the BGSync thread while holding _mutex.
std::unique_ptr<SyncSourceResolver> _syncSourceResolver;
// Current oplog fetcher tailing the oplog on the sync source.
- // Owned by us.
std::unique_ptr<OplogFetcher> _oplogFetcher;
};
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index ad5a157b10f..0fb71dacd75 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -1467,15 +1467,15 @@ StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() {
return syncSource;
}
-void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
- Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo& info) {
+Status DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info) {
if (info.toApplyDocumentCount == 0) {
- return;
+ return Status::OK();
}
if (_isShuttingDown()) {
- return;
+ return Status::OK();
}
invariant(_oplogBuffer);
@@ -1494,6 +1494,7 @@ void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
_lastFetched = info.lastDocument;
// TODO: updates metrics with "info".
+ return Status::OK();
}
DataReplicator::OnCompletionGuard::OnCompletionGuard(
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index adf3098bd87..4ee9c3082e9 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -510,10 +510,13 @@ private:
/**
* Pushes documents from oplog fetcher to blocking queue for
* applier to consume.
+ *
+ * Returns a status even though it always returns OK, to conform the interface OplogFetcher
+ * expects for the EnqueueDocumentsFn.
*/
- void _enqueueDocuments(Fetcher::Documents::const_iterator begin,
- Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo& info);
+ Status _enqueueDocuments(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info);
BSONObj _getInitialSyncProgress_inlock() const;
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 2893a9099d6..7e31eb598d5 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -495,7 +495,11 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
getmoreReplStats.recordMillis(durationCount<Milliseconds>(queryResponse.elapsedMillis));
// TODO: back pressure handling will be added in SERVER-23499.
- _enqueueDocumentsFn(firstDocToApply, documents.cend(), info);
+ auto status = _enqueueDocumentsFn(firstDocToApply, documents.cend(), info);
+ if (!status.isOK()) {
+ _finishCallback(status);
+ return;
+ }
// Update last fetched info.
if (firstDocToApply != documents.cend()) {
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 3d44a50fb67..998df06a07e 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -108,9 +108,9 @@ public:
* Additional information on the operations is provided in a DocumentsInfo
* struct.
*/
- using EnqueueDocumentsFn = stdx::function<void(Fetcher::Documents::const_iterator begin,
- Fetcher::Documents::const_iterator end,
- const DocumentsInfo& info)>;
+ using EnqueueDocumentsFn = stdx::function<Status(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const DocumentsInfo& info)>;
/**
* Validates documents in current batch of results returned from tailing the remote oplog.
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 0c3e41e575f..f3ea0760d35 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -140,9 +140,10 @@ void OplogFetcherTest::setUp() {
enqueueDocumentsFn = [this](Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo& info) {
+ const OplogFetcher::DocumentsInfo& info) -> Status {
lastEnqueuedDocuments = {begin, end};
lastEnqueuedDocumentsInfo = info;
+ return Status::OK();
};
}
@@ -652,6 +653,23 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenE
shutdownState->getLastFetched());
}
+TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) {
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
+ auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300);
+ Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry};
+
+ enqueueDocumentsFn = [](Fetcher::Documents::const_iterator,
+ Fetcher::Documents::const_iterator,
+ const OplogFetcher::DocumentsInfo&) -> Status {
+ return Status(ErrorCodes::InternalError, "my custom error");
+ };
+
+ auto shutdownState = processSingleBatch(
+ {makeCursorResponse(0, documents), rpc::makeEmptyMetadata(), Milliseconds(0)});
+ ASSERT_EQ(shutdownState->getStatus(), Status(ErrorCodes::InternalError, "my custom error"));
+}
+
void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* metadata) {
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index ecca00a539b..7cb229e064f 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -828,6 +828,7 @@ void syncFixUp(OperationContext* txn,
Status _syncRollback(OperationContext* txn,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
+ boost::optional<int> requiredRBID,
ReplicationCoordinator* replCoord,
const SleepSecondsFn& sleepSecondsFn) {
invariant(!txn->lockState()->isLocked());
@@ -854,6 +855,14 @@ Status _syncRollback(OperationContext* txn,
FixUpInfo how;
log() << "rollback 1";
how.rbid = rollbackSource.getRollbackId();
+ if (requiredRBID && how.rbid != requiredRBID) {
+ log() << "Upstream node rolled back. Need to retry our rollback.";
+ // Currently the transitive callers of this function require that we return Status::OK() for
+ // all recoverable errors such as this. Even though we aren't able to rollback we are still
+ // "OK" because the system is still in a consistent state.
+ return Status::OK();
+ }
+
{
log() << "rollback 2 FindCommonPoint";
try {
@@ -928,6 +937,7 @@ Status _syncRollback(OperationContext* txn,
Status syncRollback(OperationContext* txn,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
+ boost::optional<int> requiredRBID,
ReplicationCoordinator* replCoord,
const SleepSecondsFn& sleepSecondsFn) {
invariant(txn);
@@ -937,7 +947,8 @@ Status syncRollback(OperationContext* txn,
DisableDocumentValidation validationDisabler(txn);
txn->setReplicatedWrites(false);
- Status status = _syncRollback(txn, localOplog, rollbackSource, replCoord, sleepSecondsFn);
+ Status status =
+ _syncRollback(txn, localOplog, rollbackSource, requiredRBID, replCoord, sleepSecondsFn);
log() << "rollback finished" << rsLog;
return status;
@@ -946,10 +957,12 @@ Status syncRollback(OperationContext* txn,
Status syncRollback(OperationContext* txn,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
+ boost::optional<int> requiredRBID,
ReplicationCoordinator* replCoord) {
- return syncRollback(txn, localOplog, rollbackSource, replCoord, [](Seconds seconds) {
- sleepsecs(durationCount<Seconds>(seconds));
- });
+ return syncRollback(
+ txn, localOplog, rollbackSource, requiredRBID, replCoord, [](Seconds seconds) {
+ sleepsecs(durationCount<Seconds>(seconds));
+ });
}
} // namespace repl
diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h
index c88e9fd27c0..2c54ed6ba73 100644
--- a/src/mongo/db/repl/rs_rollback.h
+++ b/src/mongo/db/repl/rs_rollback.h
@@ -77,12 +77,14 @@ using SleepSecondsFn = stdx::function<void(Seconds)>;
Status syncRollback(OperationContext* txn,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
+ boost::optional<int> requiredRBID,
ReplicationCoordinator* replCoord,
const SleepSecondsFn& sleepSecondsFn);
Status syncRollback(OperationContext* txn,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
+ boost::optional<int> requiredRBID,
ReplicationCoordinator* replCoord);
} // namespace repl
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index edcf8668c16..852ce117712 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -173,6 +173,7 @@ TEST_F(RSRollbackTest, InconsistentMinValid) {
OplogInterfaceMock(kEmptyMockOperations),
RollbackSourceMock(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
+ {},
_coordinator,
noSleep);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
@@ -199,6 +200,7 @@ TEST_F(RSRollbackTest, SetFollowerModeFailed) {
OplogInterfaceMock(kEmptyMockOperations),
RollbackSourceMock(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
+ {},
_coordinator,
noSleep)
.code());
@@ -215,6 +217,7 @@ TEST_F(RSRollbackTest, OplogStartMissing) {
RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
operation,
}))),
+ {},
_coordinator,
noSleep)
.code());
@@ -228,6 +231,7 @@ TEST_F(RSRollbackTest, NoRemoteOpLog) {
OplogInterfaceMock({operation}),
RollbackSourceMock(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
+ {},
_coordinator,
noSleep);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
@@ -250,6 +254,7 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) {
OplogInterfaceMock({operation}),
RollbackSourceLocal(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
+ {},
_coordinator,
noSleep),
UserException,
@@ -267,6 +272,7 @@ TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) {
RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
operation,
}))),
+ {},
_coordinator,
noSleep));
}
@@ -335,6 +341,7 @@ int _testRollbackDelete(OperationContext* txn,
ASSERT_OK(syncRollback(txn,
OplogInterfaceMock({deleteOperation, commonOperation}),
rollbackSource,
+ {},
coordinator,
noSleep));
ASSERT_TRUE(rollbackSource.called);
@@ -410,6 +417,7 @@ TEST_F(RSRollbackTest, RollbackInsertDocumentWithNoId) {
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
+ {},
_coordinator,
noSleep);
stopCapturingLogMessages();
@@ -474,6 +482,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommand) {
_txn.get(),
OplogInterfaceMock({insertDocumentOperation, insertDocumentOperation, commonOperation}),
rollbackSource,
+ {},
_coordinator,
noSleep));
stopCapturingLogMessages();
@@ -534,6 +543,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandIndexNotInCatalog) {
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
+ {},
_coordinator,
noSleep));
stopCapturingLogMessages();
@@ -582,6 +592,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingNamespace) {
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
+ {},
_coordinator,
noSleep);
stopCapturingLogMessages();
@@ -629,6 +640,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandInvalidNamespace) {
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
+ {},
_coordinator,
noSleep);
stopCapturingLogMessages();
@@ -674,6 +686,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingIndexName) {
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
+ {},
_coordinator,
noSleep);
stopCapturingLogMessages();
@@ -710,6 +723,7 @@ TEST_F(RSRollbackTest, RollbackUnknownCommand) {
RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
commonOperation,
}))),
+ {},
_coordinator,
noSleep);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
@@ -746,6 +760,7 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) {
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({dropCollectionOperation, commonOperation}),
rollbackSource,
+ {},
_coordinator,
noSleep));
ASSERT_TRUE(rollbackSource.called);
@@ -868,6 +883,7 @@ TEST_F(RSRollbackTest, RollbackApplyOpsCommand) {
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({applyOpsOperation, commonOperation}),
rollbackSource,
+ {},
_coordinator,
noSleep));
ASSERT_EQUALS(4U, rollbackSource.searchedIds.size());
@@ -908,6 +924,7 @@ TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) {
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({createCollectionOperation, commonOperation}),
rollbackSource,
+ {},
_coordinator,
noSleep));
{
@@ -951,6 +968,7 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) {
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({collectionModificationOperation, commonOperation}),
rollbackSource,
+ {},
_coordinator,
noSleep));
stopCapturingLogMessages();
@@ -992,6 +1010,7 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOpt
syncRollback(_txn.get(),
OplogInterfaceMock({collectionModificationOperation, commonOperation}),
rollbackSource,
+ {},
_coordinator,
noSleep);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp
index 881206a7d5e..33818e14c50 100644
--- a/src/mongo/db/repl/sync_source_resolver.cpp
+++ b/src/mongo/db/repl/sync_source_resolver.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/sync_source_selector.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
@@ -129,6 +130,9 @@ void SyncSourceResolver::shutdown() {
if (_fetcher) {
_fetcher->shutdown();
}
+ if (_rbidCommandHandle) {
+ _taskExecutor->cancel(_rbidCommandHandle);
+ }
}
void SyncSourceResolver::join() {
@@ -196,6 +200,7 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndP
Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // TODO SERVER-27499 need to check if _state is kShuttingDown inside the mutex.
// Must schedule fetcher inside lock in case fetcher's callback gets invoked immediately by task
// executor.
auto status = fetcher->schedule();
@@ -306,18 +311,65 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback(
return;
}
- // Schedules fetcher to look for '_requiredOpTime' in the remote oplog.
if (!_requiredOpTime.isNull()) {
- auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen));
- if (!status.isOK()) {
- _finishCallback(status);
- }
+ _scheduleRBIDRequest(candidate, earliestOpTimeSeen);
return;
}
_finishCallback(candidate);
}
+void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) {
+ auto handle = _taskExecutor->scheduleRemoteCommand(
+ {candidate, "admin", BSON("replSetGetRBID" << 1), nullptr, kFetcherTimeout},
+ stdx::bind(&SyncSourceResolver::_rbidRequestCallback,
+ this,
+ candidate,
+ earliestOpTimeSeen,
+ stdx::placeholders::_1));
+
+ if (!handle.isOK()) {
+ _finishCallback(handle.getStatus());
+ return;
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _rbidCommandHandle = std::move(handle.getValue());
+ if (_state == State::kShuttingDown) {
+ _taskExecutor->cancel(_rbidCommandHandle);
+ }
+}
+
+void SyncSourceResolver::_rbidRequestCallback(
+ HostAndPort candidate,
+ OpTime earliestOpTimeSeen,
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) {
+ if (rbidReply.response.status == ErrorCodes::CallbackCanceled) {
+ _finishCallback(rbidReply.response.status);
+ return;
+ }
+
+ try {
+ uassertStatusOK(rbidReply.response.status);
+ uassertStatusOK(getStatusFromCommandResult(rbidReply.response.data));
+ _rbid = rbidReply.response.data["rbid"].Int();
+ } catch (const DBException& ex) {
+ const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration;
+ log() << "Blacklisting " << candidate << " due to error: '" << ex << "' for "
+ << kFetcherErrorBlacklistDuration << " until: " << until;
+ _syncSourceSelector->blacklistSyncSource(candidate, until);
+ _chooseAndProbeNextSyncSource(earliestOpTimeSeen);
+ return;
+ }
+
+ // Schedule fetcher to look for '_requiredOpTime' in the remote oplog.
+ // Unittest requires that this kind of failure be handled specially.
+ auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen));
+ if (!status.isOK()) {
+ _finishCallback(status);
+ }
+}
+
Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse(
const Fetcher::QueryResponse& queryResponse) {
if (queryResponse.documents.empty()) {
@@ -425,6 +477,10 @@ Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSe
Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) {
SyncSourceResolverResponse response;
response.syncSourceStatus = std::move(result);
+ if (response.isOK() && !response.getSyncSource().empty()) {
+ invariant(_requiredOpTime.isNull() || _rbid);
+ response.rbid = _rbid;
+ }
return _finishCallback(response);
}
diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h
index 099b56994e2..48f7e968a8d 100644
--- a/src/mongo/db/repl/sync_source_resolver.h
+++ b/src/mongo/db/repl/sync_source_resolver.h
@@ -70,6 +70,11 @@ struct SyncSourceResolverResponse {
// Contains the new MinValid boundry if syncSourceStatus is ErrorCodes::OplogStartMissing.
OpTime earliestOpTimeSeen;
+ // Rollback ID of the selected sync source. Only filled in when there is a required optime.
+ // The rbid is fetched before the required optime so callers can be sure that as long as the
+ // rbid is the same, the required optime is still present.
+ boost::optional<int> rbid;
+
bool isOK() {
return syncSourceStatus.isOK();
}
@@ -172,6 +177,14 @@ private:
OpTime earliestOpTimeSeen);
/**
+ * Schedules a replSetGetRBID command against the candidate to fetch its current rollback id.
+ */
+ void _scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen);
+ void _rbidRequestCallback(HostAndPort candidate,
+ OpTime earliestOpTimeSeen,
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply);
+
+ /**
* Checks query response for required optime.
*/
Status _compareRequiredOpTimeWithQueryResponse(const Fetcher::QueryResponse& queryResponse);
@@ -216,7 +229,10 @@ private:
// resolver via this callback in a SyncSourceResolverResponse struct when the resolver finishes.
const OnCompletionFn _onCompletion;
- // Protects members of this sync source resolver.
+ // The rbid we will return to our caller.
+ boost::optional<int> _rbid;
+
+ // Protects members of this sync source resolver defined below.
mutable stdx::mutex _mutex;
mutable stdx::condition_variable _condition;
@@ -233,6 +249,8 @@ private:
// Holds reference to fetcher in the process of shutting down.
std::unique_ptr<Fetcher> _shuttingDownFetcher;
+
+ executor::TaskExecutor::CallbackHandle _rbidCommandHandle;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index 82ae8ab528c..fb27a2cc681 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -614,6 +614,19 @@ void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net,
{BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm())});
}
+void _scheduleRBIDResponse(executor::NetworkInterfaceMock* net,
+ HostAndPort currentSyncSource,
+ const BSONObj& reply = BSON("ok" << 1 << "rbid" << 1)) {
+ executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net);
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto request = net->scheduleSuccessfulResponse(reply);
+ ASSERT_EQUALS(currentSyncSource, request.target);
+ ASSERT_EQUALS("admin", request.dbname);
+ ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout);
+ ASSERT_BSONOBJ_EQ(BSON("replSetGetRBID" << 1), request.cmdObj);
+ net->runReadyNetworkOperations();
+}
+
const OpTime requiredOpTime(Timestamp(200, 1U), 1LL);
TEST_F(
@@ -631,11 +644,14 @@ TEST_F(
ASSERT_TRUE(_resolver->isActive());
+ _scheduleRBIDResponse(getNet(), candidate1, BSON("ok" << 1 << "rbid" << 7));
_scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus));
+ ASSERT(_response.rbid);
+ ASSERT_EQ(*_response.rbid, 7);
}
TEST_F(SyncSourceResolverTest,
@@ -653,6 +669,7 @@ TEST_F(SyncSourceResolverTest,
ASSERT_TRUE(_resolver->isActive());
+ _scheduleRBIDResponse(getNet(), candidate1);
_scheduleRequiredOpTimeFetcherResponse(
getNet(),
_selector.get(),
@@ -667,6 +684,7 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRBIDResponse(getNet(), candidate2);
_scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime);
_resolver->join();
@@ -692,6 +710,7 @@ TEST_F(
ASSERT_TRUE(_resolver->isActive());
+ _scheduleRBIDResponse(getNet(), candidate1);
_scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime);
ASSERT_TRUE(_resolver->isActive());
@@ -701,6 +720,7 @@ TEST_F(
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRBIDResponse(getNet(), candidate2);
_scheduleRequiredOpTimeFetcherResponse(
getNet(), _selector.get(), candidate2, requireOpTimeWithUninitializedTerm);
@@ -724,6 +744,7 @@ TEST_F(SyncSourceResolverTest,
ASSERT_TRUE(_resolver->isActive());
+ _scheduleRBIDResponse(getNet(), candidate1);
_scheduleRequiredOpTimeFetcherResponse(
getNet(), _selector.get(), candidate1, requiredOpTime, {});
@@ -734,6 +755,7 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRBIDResponse(getNet(), candidate2);
_scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime);
_resolver->join();
@@ -756,6 +778,7 @@ TEST_F(SyncSourceResolverTest,
ASSERT_TRUE(_resolver->isActive());
+ _scheduleRBIDResponse(getNet(), candidate1);
_scheduleRequiredOpTimeFetcherResponse(
getNet(),
_selector.get(),
@@ -770,6 +793,7 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRBIDResponse(getNet(), candidate2);
_scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime);
_resolver->join();
@@ -778,6 +802,27 @@ TEST_F(SyncSourceResolverTest,
}
TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsScheduleErrorWhenSchedulingRBIDCommandFails) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ _shouldFailRequest = [](const executor::RemoteCommandRequest& request) {
+ return request.cmdObj.firstElementFieldName() == "replSetGetRBID"_sd;
+ };
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->setChooseNewSyncSourceResult_forTest(candidate1);
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus);
+}
+
+TEST_F(SyncSourceResolverTest,
SyncSourceResolverReturnsScheduleErrorWhenSchedulingRequiredOpTimeFindCommandFails) {
_resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
@@ -792,6 +837,7 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0));
+ _scheduleRBIDResponse(getNet(), candidate1);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -843,6 +889,60 @@ TEST_F(
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus);
}
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRBIDCommand) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->setChooseNewSyncSourceResult_forTest(candidate1);
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate1, Timestamp(10, 0));
+
+ _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, HostAndPort());
+
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->getLastBlacklistedSyncSource_forTest());
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration,
+ _selector->getLastBlacklistExpiration_forTest());
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterRBIDCommandNotOk) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->setChooseNewSyncSourceResult_forTest(candidate1);
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate1, Timestamp(10, 0));
+
+ // _scheduleNetworkErrorForFirstNode does this for us.
+ _selector->setChooseNewSyncSourceResult_forTest(HostAndPort());
+
+ _scheduleRBIDResponse(getNet(),
+ candidate1,
+ BSON("ok" << 0 << "code" << 9000 << "errmsg"
+ << "I'm sorry Dave, I'm afraid I can't do that."));
+
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->getLastBlacklistedSyncSource_forTest());
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration,
+ _selector->getLastBlacklistExpiration_forTest());
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus));
+}
+
TEST_F(
SyncSourceResolverTest,
SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRequiredOpTimeCommand) {
@@ -856,6 +956,7 @@ TEST_F(
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate1, candidate1, Timestamp(10, 0));
+ _scheduleRBIDResponse(getNet(), candidate1);
_scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, HostAndPort());
ASSERT_FALSE(_resolver->isActive());