summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2017-03-08 19:43:36 -0500
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2017-03-14 21:39:16 -0400
commit231e760c744013fe68fe863a7d0315148c69047a (patch)
treec8f5ee02b99a9b33da3b3555aa33429db2728b11 /src/mongo/db/repl
parente2f02e5dc1dc96632f89504ec545818b2764fdd4 (diff)
downloadmongo-231e760c744013fe68fe863a7d0315148c69047a.tar.gz
SERVER-28181 Deadlock involving the mutexes of oplog fetcher and replication coordinator
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp47
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h2
2 files changed, 28 insertions, 21 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 052de5cba7f..b0db62af574 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -89,10 +89,9 @@ Milliseconds calculateAwaitDataTimeout(const ReplSetConfig& config) {
/**
* Returns find command object suitable for tailing remote oplog.
*/
-BSONObj makeFindCommandObject(DataReplicatorExternalState* dataReplicatorExternalState,
- const NamespaceString& nss,
+BSONObj makeFindCommandObject(const NamespaceString& nss,
+ long long currentTerm,
OpTime lastOpTimeFetched) {
- invariant(dataReplicatorExternalState);
BSONObjBuilder cmdBob;
cmdBob.append("find", nss.coll());
cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
@@ -100,9 +99,8 @@ BSONObj makeFindCommandObject(DataReplicatorExternalState* dataReplicatorExterna
cmdBob.append("oplogReplay", true);
cmdBob.append("awaitData", true);
cmdBob.append("maxTimeMS", durationCount<Milliseconds>(kOplogInitialFindMaxTime));
- auto opTimeWithTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
- if (opTimeWithTerm.value != OpTime::kUninitializedTerm) {
- cmdBob.append("term", opTimeWithTerm.value);
+ if (currentTerm != OpTime::kUninitializedTerm) {
+ cmdBob.append("term", currentTerm);
}
return cmdBob.obj();
}
@@ -110,18 +108,17 @@ BSONObj makeFindCommandObject(DataReplicatorExternalState* dataReplicatorExterna
/**
* Returns getMore command object suitable for tailing remote oplog.
*/
-BSONObj makeGetMoreCommandObject(DataReplicatorExternalState* dataReplicatorExternalState,
- const NamespaceString& nss,
+BSONObj makeGetMoreCommandObject(const NamespaceString& nss,
CursorId cursorId,
+ OpTimeWithTerm lastCommittedWithCurrentTerm,
Milliseconds fetcherMaxTimeMS) {
BSONObjBuilder cmdBob;
cmdBob.append("getMore", cursorId);
cmdBob.append("collection", nss.coll());
cmdBob.append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS));
- auto opTimeWithTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
- if (opTimeWithTerm.value != OpTime::kUninitializedTerm) {
- cmdBob.append("term", opTimeWithTerm.value);
- opTimeWithTerm.opTime.append(&cmdBob, "lastKnownCommittedOpTime");
+ if (lastCommittedWithCurrentTerm.value != OpTime::kUninitializedTerm) {
+ cmdBob.append("term", lastCommittedWithCurrentTerm.value);
+ lastCommittedWithCurrentTerm.opTime.append(&cmdBob, "lastKnownCommittedOpTime");
}
return cmdBob.obj();
}
@@ -347,14 +344,16 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
_enqueueDocumentsFn(enqueueDocumentsFn),
_awaitDataTimeout(calculateAwaitDataTimeout(config)),
_onShutdownCallbackFn(onShutdownCallbackFn),
- _lastFetched(lastFetched),
- _fetcher(_makeFetcher(_lastFetched.opTime)) {
+ _lastFetched(lastFetched) {
uassert(ErrorCodes::BadValue, "null last optime fetched", !_lastFetched.opTime.isNull());
uassert(ErrorCodes::InvalidReplicaSetConfig,
"uninitialized replica set configuration",
config.isInitialized());
uassert(ErrorCodes::BadValue, "null enqueueDocuments function", enqueueDocumentsFn);
uassert(ErrorCodes::BadValue, "null onShutdownCallback function", onShutdownCallbackFn);
+
+ auto currentTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().value;
+ _fetcher = _makeFetcher(currentTerm, _lastFetched.opTime);
}
OplogFetcher::~OplogFetcher() {
@@ -462,6 +461,11 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
// example, because it stepped down) we might not have a cursor.
if (!responseStatus.isOK()) {
{
+ // We have to call into replication coordinator outside oplog fetcher's mutex.
+ // It is OK if the current term becomes stale after this line since requests
+ // to remote nodes are asynchronous anyway.
+ auto currentTerm =
+ _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().value;
stdx::lock_guard<stdx::mutex> lock(_mutex);
if (_isShuttingDown_inlock()) {
log() << "Error returned from oplog query while canceling query: "
@@ -478,8 +482,8 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
_shuttingDownFetcher.reset();
// Move the old fetcher into the shutting down instance.
_shuttingDownFetcher.swap(_fetcher);
- // Create and start fetcher with new starting optime.
- _fetcher = _makeFetcher(_lastFetched.opTime);
+ // Create and start fetcher with current term and new starting optime.
+ _fetcher = _makeFetcher(currentTerm, _lastFetched.opTime);
auto scheduleStatus = _scheduleFetcher_inlock();
if (scheduleStatus.isOK()) {
log() << "Scheduled new oplog query " << _fetcher->toString();
@@ -641,9 +645,11 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
return;
}
- getMoreBob->appendElements(makeGetMoreCommandObject(_dataReplicatorExternalState,
- queryResponse.nss,
+ auto lastCommittedWithCurrentTerm =
+ _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
+ getMoreBob->appendElements(makeGetMoreCommandObject(queryResponse.nss,
queryResponse.cursorId,
+ lastCommittedWithCurrentTerm,
_awaitDataTimeout));
}
@@ -669,12 +675,13 @@ void OplogFetcher::_finishCallback(Status status, OpTimeWithHash opTimeWithHash)
std::swap(_onShutdownCallbackFn, onShutdownCallbackFn);
}
-std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(OpTime lastFetchedOpTime) {
+std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(long long currentTerm,
+ OpTime lastFetchedOpTime) {
return stdx::make_unique<Fetcher>(
_executor,
_source,
_nss.db().toString(),
- makeFindCommandObject(_dataReplicatorExternalState, _nss, lastFetchedOpTime),
+ makeFindCommandObject(_nss, currentTerm, lastFetchedOpTime),
stdx::bind(&OplogFetcher::_callback, this, stdx::placeholders::_1, stdx::placeholders::_3),
_metadataObject,
kOplogQueryNetworkTimeout);
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 5d739bfae0c..ad4fde296b1 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -237,7 +237,7 @@ private:
/**
* Creates a new instance of the fetcher to tail the remote oplog starting at the given optime.
*/
- std::unique_ptr<Fetcher> _makeFetcher(OpTime lastFetchedOpTime);
+ std::unique_ptr<Fetcher> _makeFetcher(long long currentTerm, OpTime lastFetchedOpTime);
/**
* Returns whether the oplog fetcher is in shutdown.