summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-11-15 15:24:22 -0500
committerMathias Stearn <mathias@10gen.com>2017-01-03 16:02:19 -0500
commit0b76764eac7651ddba4c82c504aa7e8d785087c2 (patch)
treef90fce58d2781a48afaee696ee3fb9e6f8fefedc /src/mongo/db/repl/bgsync.cpp
parent506c8af1269c76fcd730e121e37b82a18347ac70 (diff)
downloadmongo-0b76764eac7651ddba4c82c504aa7e8d785087c2.tar.gz
SERVER-27050 Ensure upstream node doesn't roll back after checking MinValid
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp109
1 files changed, 64 insertions, 45 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index d5fdd001921..84676fd60c8 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
@@ -287,39 +288,32 @@ void BackgroundSync::_produce(OperationContext* txn) {
OpTime lastOpTimeFetched;
HostAndPort source;
SyncSourceResolverResponse syncSourceResp;
- SyncSourceResolver* syncSourceResolver;
- OpTime minValid;
- OpTime minValidSaved;
- if (_replCoord->getMemberState().recovering()) {
- minValidSaved = StorageInterface::get(txn)->getMinValid(txn);
- }
{
+ const OpTime minValidSaved = StorageInterface::get(txn)->getMinValid(txn);
+
stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (minValidSaved > _lastOpTimeFetched) {
- minValid = minValidSaved;
- }
+ const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime();
lastOpTimeFetched = _lastOpTimeFetched;
_syncSourceHost = HostAndPort();
_syncSourceResolver = stdx::make_unique<SyncSourceResolver>(
_replicationCoordinatorExternalState->getTaskExecutor(),
_replCoord,
lastOpTimeFetched,
- minValid,
+ requiredOpTime,
[&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; });
- syncSourceResolver = _syncSourceResolver.get();
}
// This may deadlock if called inside the mutex because SyncSourceResolver::startup() calls
// ReplicationCoordinator::chooseNewSyncSource(). ReplicationCoordinatorImpl's mutex has to
// acquired before BackgroundSync's.
// It is safe to call startup() outside the mutex on this instance of SyncSourceResolver because
- // we do not destroy this instance outside of this function.
+ // we do not destroy this instance outside of this function which is only called from a single
+ // thread.
auto status = _syncSourceResolver->startup();
if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) {
return;
}
fassertStatusOK(40349, status);
- syncSourceResolver->join();
- syncSourceResolver = nullptr;
+ _syncSourceResolver->join();
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
_syncSourceResolver.reset();
@@ -388,6 +382,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
Status fetcherReturnStatus = Status::OK();
DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState(
_replCoord, _replicationCoordinatorExternalState, this);
+ auto rbidCopyForFetcher = syncSourceResp.rbid; // OplogFetcher's callback modifies this.
OplogFetcher* oplogFetcher;
try {
auto executor = _replicationCoordinatorExternalState->getTaskExecutor();
@@ -410,7 +405,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
- stdx::placeholders::_3),
+ stdx::placeholders::_3,
+ &rbidCopyForFetcher),
onOplogFetcherShutdownCallbackFn);
oplogFetcher = _oplogFetcher.get();
} catch (const mongo::DBException& ex) {
@@ -484,20 +480,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
}
- // check that we are at minvalid, otherwise we cannot roll back as we may be in an
- // inconsistent state
- const auto minValid = StorageInterface::get(txn)->getMinValid(txn);
- if (lastApplied < minValid) {
- fassertNoTrace(18750,
- Status(ErrorCodes::UnrecoverableRollbackError,
- str::stream() << "need to rollback, but in inconsistent state. "
- << "minvalid: "
- << minValid.toString()
- << " > our last optime: "
- << lastApplied.toString()));
- }
- _rollback(txn, source, getConnection);
+ _rollback(txn, source, syncSourceResp.rbid, getConnection);
stop();
} else if (fetcherReturnStatus == ErrorCodes::InvalidBSON) {
Seconds blacklistDuration(60);
@@ -510,14 +494,57 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
-void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
- Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo& info) {
+Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info,
+ boost::optional<int>* requiredRBID) {
+ // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back
+ // since that could cause it to not have our required minValid point. The cursor will be killed
+ // if the upstream node rolls back so we don't need to keep checking. This must be blocking
+ // since the Fetcher doesn't give us a way to defer sending the getmores after we return.
+ if (*requiredRBID) {
+ auto rbidStatus = Status(ErrorCodes::InternalError, "");
+ auto handle =
+ _replicationCoordinatorExternalState->getTaskExecutor()->scheduleRemoteCommand(
+ {getSyncTarget(), "admin", BSON("replSetGetRBID" << 1), nullptr},
+ [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) {
+ rbidStatus = rbidReply.response.status;
+ if (!rbidStatus.isOK())
+ return;
+
+ rbidStatus = getStatusFromCommandResult(rbidReply.response.data);
+ if (!rbidStatus.isOK())
+ return;
+
+ const auto rbidElem = rbidReply.response.data["rbid"];
+ if (rbidElem.type() != NumberInt) {
+ rbidStatus = Status(ErrorCodes::BadValue,
+ str::stream() << "Upstream node returned an "
+ << "rbid with invalid type "
+ << rbidElem.type());
+ return;
+ }
+ if (rbidElem.Int() != **requiredRBID) {
+ rbidStatus = Status(ErrorCodes::BadValue,
+ "Upstream node rolled back after verifying "
+ "that it had our MinValid point. Retrying.");
+ }
+ });
+ if (!handle.isOK())
+ return handle.getStatus();
+
+ _replicationCoordinatorExternalState->getTaskExecutor()->wait(handle.getValue());
+ if (!rbidStatus.isOK())
+ return rbidStatus;
+
+ requiredRBID->reset(); // Don't come back to this block while on this cursor.
+ }
+
// If this is the first batch of operations returned from the query, "toApplyDocumentCount" will
// be one fewer than "networkDocumentCount" because the first document (which was applied
// previously) is skipped.
if (info.toApplyDocumentCount == 0) {
- return; // Nothing to do.
+ return Status::OK(); // Nothing to do.
}
auto txn = cc().makeOperationContext();
@@ -532,7 +559,7 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
// buffer.
stdx::unique_lock<stdx::mutex> lock(_mutex);
if (_inShutdown) {
- return;
+ return Status::OK();
}
OCCASIONALLY {
@@ -561,6 +588,8 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
// The inference here is basically if the batch is really small, we are "caught up".
sleepmillis(kSleepToAllowBatchingMillis);
}
+
+ return Status::OK();
}
bool BackgroundSync::peek(OperationContext* txn, BSONObj* op) {
@@ -589,26 +618,16 @@ void BackgroundSync::consume(OperationContext* txn) {
void BackgroundSync::_rollback(OperationContext* txn,
const HostAndPort& source,
+ boost::optional<int> requiredRBID,
stdx::function<DBClientBase*()> getConnection) {
// Abort only when syncRollback detects we are in a unrecoverable state.
// In other cases, we log the message contained in the error status and retry later.
auto status = syncRollback(txn,
OplogInterfaceLocal(txn, rsOplogName),
RollbackSourceImpl(getConnection, source, rsOplogName),
+ requiredRBID,
_replCoord);
if (status.isOK()) {
- // When the syncTail thread sees there is no new data by adding something to the buffer.
- _signalNoNewDataForApplier(txn);
- // Wait until the buffer is empty.
- // This is an indication that syncTail has removed the sentinal marker from the buffer
- // and reset its local lastAppliedOpTime via the replCoord.
- while (!_oplogBuffer->isEmpty()) {
- sleepmillis(10);
- if (inShutdown()) {
- return;
- }
- }
-
// At this point we are about to leave rollback. Before we do, wait for any writes done
// as part of rollback to be durable, and then do any necessary checks that we didn't
// wind up rolling back something illegal. We must wait for the rollback to be durable