path: root/src/mongo/db/repl/bgsync.cpp
diff options
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
1 files changed, 118 insertions, 44 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 7e2587b4561..37ba22bf1a7 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -54,6 +54,8 @@
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/executor/network_interface_factory.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -242,6 +244,8 @@ void BackgroundSync::_producerThread() {
+ invariant(!state.rollback());
// We need to wait until initial sync has started.
if (_replCoord->getMyLastAppliedOpTime().isNull()) {
@@ -296,7 +300,9 @@ void BackgroundSync::_produce(OperationContext* txn) {
minValid = minValidSaved;
- syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, minValid, _replCoord);
+ int rbid;
+ syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, minValid, _replCoord, &rbid);
// no server found
if (syncSourceReader.getHost().empty()) {
@@ -348,7 +354,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
- &fetcherReturnStatus);
+ &fetcherReturnStatus,
+ rbid);
BSONObjBuilder cmdBob;
@@ -436,19 +443,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
- // check that we are at minvalid, otherwise we cannot roll back as we may be in an
- // inconsistent state
- const auto minValid = getMinValid(txn);
- if (lastApplied < minValid) {
- fassertNoTrace(18750,
- Status(ErrorCodes::UnrecoverableRollbackError,
- str::stream()
- << "need to rollback, but in inconsistent state. "
- << "minvalid: " << minValid.toString()
- << " > our last optime: " << lastApplied.toString()));
- }
- _rollback(txn, source, getConnection);
+ _rollback(txn, source, rbid, getConnection);
} else if (!fetcherReturnStatus.isOK()) {
warning() << "Fetcher error querying oplog: " << fetcherReturnStatus.toString();
@@ -461,7 +457,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
OpTime lastOpTimeFetched,
long long lastFetchedHash,
Milliseconds fetcherMaxTimeMS,
- Status* returnStatus) {
+ Status* returnStatus,
+ int rbid) {
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
if (!result.isOK()) {
@@ -515,6 +512,46 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
// Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
if (queryResponse.first) {
+ // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back
+ // since that could cause it to not have our required minValid point. The cursor will be
+ // killed if the upstream node rolls back so we don't need to keep checking. This must be
+ // blocking since the Fetcher doesn't give us a way to defer sending the getmores after we
+ // return.
+ auto handle = _threadPoolTaskExecutor.scheduleRemoteCommand(
+ {source, "admin", BSON("replSetGetRBID" << 1)},
+ [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) {
+ *returnStatus = rbidReply.response.getStatus();
+ if (!returnStatus->isOK())
+ return;
+ const auto& rbidReplyObj = rbidReply.response.getValue().data;
+ *returnStatus = getStatusFromCommandResult(rbidReplyObj);
+ if (!returnStatus->isOK())
+ return;
+ const auto rbidElem = rbidReplyObj["rbid"];
+ if (rbidElem.type() != NumberInt) {
+ *returnStatus =
+ Status(ErrorCodes::BadValue,
+ str::stream() << "Upstream node returned an "
+ << "rbid with invalid type " << rbidElem.type());
+ return;
+ }
+ if (rbidElem.Int() != rbid) {
+ *returnStatus = Status(ErrorCodes::BadValue,
+ "Upstream node rolled back after verifying "
+ "that it had our MinValid point. Retrying.");
+ }
+ });
+ if (!handle.isOK()) {
+ *returnStatus = handle.getStatus();
+ return;
+ }
+ _threadPoolTaskExecutor.wait(handle.getValue());
+ if (!returnStatus->isOK())
+ return;
auto getNextOperation = [&firstDocToApply, lastDocToApply]() -> StatusWith<BSONObj> {
if (firstDocToApply == lastDocToApply) {
return Status(ErrorCodes::OplogStartMissing, "remote oplog start missing");
@@ -702,40 +739,77 @@ void BackgroundSync::consume() {
void BackgroundSync::_rollback(OperationContext* txn,
const HostAndPort& source,
+ boost::optional<int> requiredRBID,
stdx::function<DBClientBase*()> getConnection) {
- // Abort only when syncRollback detects we are in a unrecoverable state.
- // In other cases, we log the message contained in the error status and retry later.
- auto status = syncRollback(txn,
- OplogInterfaceLocal(txn, rsOplogName),
- RollbackSourceImpl(getConnection, source, rsOplogName),
- _replCoord);
- if (status.isOK()) {
- // When the syncTail thread sees there is no new data by adding something to the buffer.
- _signalNoNewDataForApplier();
- // Wait until the buffer is empty.
- // This is an indication that syncTail has removed the sentinal marker from the buffer
- // and reset its local lastAppliedOpTime via the replCoord.
- while (!_buffer.empty()) {
- sleepmillis(10);
- if (inShutdown()) {
- return;
- }
+ // Set state to ROLLBACK while we are in this function. This prevents serving reads, even from
+ // the oplog. This can fail if we are elected PRIMARY, in which case we better not do any
+ // rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or
+ // after transitioning to RECOVERING. We always transition to RECOVERING regardless of success
+ // or (recoverable) failure since we may be in an inconsistent state. If rollback failed before
+ // writing anything, SyncTail will quickly take us to SECONDARY since are are still at our
+ // original MinValid, which is fine because we may choose a sync source that doesn't require
+ // rollback. If it failed after we wrote to MinValid, then we will pick a sync source that will
+ // cause us to roll back to the same common point, which is fine. If we succeeded, we will be
+ // consistent as soon as we apply up to/through MinValid and SyncTail will make us SECONDARY
+ // then.
+ {
+ log() << "rollback 0";
+ Lock::GlobalWrite globalWrite(txn->lockState());
+ if (!_replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) {
+ log() << "Cannot transition from " << _replCoord->getMemberState().toString() << " to "
+ << MemberState(MemberState::RS_ROLLBACK).toString();
+ return;
+ }
- // It is now safe to clear the ROLLBACK state, which may result in the applier thread
- // transitioning to SECONDARY. This is safe because the applier thread has now reloaded
- // the new rollback minValid from the database.
- if (!_replCoord->setFollowerMode(MemberState::RS_RECOVERING)) {
- warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING)
- << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK)
- << " but found self in " << _replCoord->getMemberState();
+ try {
+ auto status = syncRollback(txn,
+ OplogInterfaceLocal(txn, rsOplogName),
+ RollbackSourceImpl(getConnection, source, rsOplogName),
+ requiredRBID,
+ _replCoord);
+ // Abort only when syncRollback detects we are in a unrecoverable state.
+ // WARNING: these statuses sometimes have location codes which are lost with uassertStatusOK
+ // so we need to check here first.
+ if (ErrorCodes::UnrecoverableRollbackError == status.code()) {
+ severe() << "Unable to complete rollback. A full resync may be needed: " << status;
+ fassertFailedNoTrace(28723);
- return;
- }
- if (ErrorCodes::UnrecoverableRollbackError == status.code()) {
- fassertNoTrace(28723, status);
+ // In other cases, we log the message contained in the error status and retry later.
+ uassertStatusOK(status);
+ } catch (const DBException& ex) {
+ // UnrecoverableRollbackError should only come from a returned status which is handled
+ // above.
+ invariant(ex.getCode() != ErrorCodes::UnrecoverableRollbackError);
+ warning() << "rollback cannot complete at this time (retrying later): " << ex
+ << " appliedThrough=" << _replCoord->getMyLastAppliedOpTime()
+ << " minvalid=" << getMinValid(txn);
+ // Sleep a bit to allow upstream node to coalesce, if that was the cause of the failure. If
+ // we failed in a way that will keep failing, but wasn't flagged as a fatal failure, this
+ // will also prevent us from hot-looping and putting too much load on upstream nodes.
+ sleepsecs(5); // 5 seconds was chosen as a completely arbitrary amount of time.
+ } catch (...) {
+ std::terminate();
+ }
+ // At this point we are about to leave rollback. Before we do, wait for any writes done
+ // as part of rollback to be durable, and then do any necessary checks that we didn't
+ // wind up rolling back something illegal. We must wait for the rollback to be durable
+ // so that if we wind up shutting down uncleanly in response to something we rolled back
+ // we know that we won't wind up right back in the same situation when we start back up
+ // because the rollback wasn't durable.
+ txn->recoveryUnit()->waitUntilDurable();
+ if (!_replCoord->setFollowerMode(MemberState::RS_RECOVERING)) {
+ severe() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING)
+ << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK)
+ << " but found self in " << _replCoord->getMemberState();
+ fassertFailedNoTrace(40364);
- warning() << "rollback cannot proceed at this time (retrying later): " << status;
HostAndPort BackgroundSync::getSyncTarget() {