diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-03-08 19:43:36 -0500 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-03-14 21:39:16 -0400 |
commit | 231e760c744013fe68fe863a7d0315148c69047a (patch) | |
tree | c8f5ee02b99a9b33da3b3555aa33429db2728b11 /src/mongo/db/repl | |
parent | e2f02e5dc1dc96632f89504ec545818b2764fdd4 (diff) | |
download | mongo-231e760c744013fe68fe863a7d0315148c69047a.tar.gz |
SERVER-28181 Deadlock involving the mutexes of oplog fetcher and replication coordinator
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 47 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 2 |
2 files changed, 28 insertions, 21 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 052de5cba7f..b0db62af574 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -89,10 +89,9 @@ Milliseconds calculateAwaitDataTimeout(const ReplSetConfig& config) { /** * Returns find command object suitable for tailing remote oplog. */ -BSONObj makeFindCommandObject(DataReplicatorExternalState* dataReplicatorExternalState, - const NamespaceString& nss, +BSONObj makeFindCommandObject(const NamespaceString& nss, + long long currentTerm, OpTime lastOpTimeFetched) { - invariant(dataReplicatorExternalState); BSONObjBuilder cmdBob; cmdBob.append("find", nss.coll()); cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp()))); @@ -100,9 +99,8 @@ BSONObj makeFindCommandObject(DataReplicatorExternalState* dataReplicatorExterna cmdBob.append("oplogReplay", true); cmdBob.append("awaitData", true); cmdBob.append("maxTimeMS", durationCount<Milliseconds>(kOplogInitialFindMaxTime)); - auto opTimeWithTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); - if (opTimeWithTerm.value != OpTime::kUninitializedTerm) { - cmdBob.append("term", opTimeWithTerm.value); + if (currentTerm != OpTime::kUninitializedTerm) { + cmdBob.append("term", currentTerm); } return cmdBob.obj(); } @@ -110,18 +108,17 @@ BSONObj makeFindCommandObject(DataReplicatorExternalState* dataReplicatorExterna /** * Returns getMore command object suitable for tailing remote oplog. */ -BSONObj makeGetMoreCommandObject(DataReplicatorExternalState* dataReplicatorExternalState, - const NamespaceString& nss, +BSONObj makeGetMoreCommandObject(const NamespaceString& nss, CursorId cursorId, + OpTimeWithTerm lastCommittedWithCurrentTerm, Milliseconds fetcherMaxTimeMS) { BSONObjBuilder cmdBob; cmdBob.append("getMore", cursorId); cmdBob.append("collection", nss.coll()); cmdBob.append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS)); - auto opTimeWithTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); - if (opTimeWithTerm.value != OpTime::kUninitializedTerm) { - cmdBob.append("term", opTimeWithTerm.value); - opTimeWithTerm.opTime.append(&cmdBob, "lastKnownCommittedOpTime"); + if (lastCommittedWithCurrentTerm.value != OpTime::kUninitializedTerm) { + cmdBob.append("term", lastCommittedWithCurrentTerm.value); + lastCommittedWithCurrentTerm.opTime.append(&cmdBob, "lastKnownCommittedOpTime"); } return cmdBob.obj(); } @@ -347,14 +344,16 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, _enqueueDocumentsFn(enqueueDocumentsFn), _awaitDataTimeout(calculateAwaitDataTimeout(config)), _onShutdownCallbackFn(onShutdownCallbackFn), - _lastFetched(lastFetched), - _fetcher(_makeFetcher(_lastFetched.opTime)) { + _lastFetched(lastFetched) { uassert(ErrorCodes::BadValue, "null last optime fetched", !_lastFetched.opTime.isNull()); uassert(ErrorCodes::InvalidReplicaSetConfig, "uninitialized replica set configuration", config.isInitialized()); uassert(ErrorCodes::BadValue, "null enqueueDocuments function", enqueueDocumentsFn); uassert(ErrorCodes::BadValue, "null onShutdownCallback function", onShutdownCallbackFn); + + auto currentTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().value; + _fetcher = _makeFetcher(currentTerm, _lastFetched.opTime); } OplogFetcher::~OplogFetcher() { @@ -462,6 +461,11 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, // example, because it stepped down) we might not have a cursor. if (!responseStatus.isOK()) { { + // We have to call into replication coordinator outside oplog fetcher's mutex. + // It is OK if the current term becomes stale after this line since requests + // to remote nodes are asynchronous anyway. + auto currentTerm = + _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().value; stdx::lock_guard<stdx::mutex> lock(_mutex); if (_isShuttingDown_inlock()) { log() << "Error returned from oplog query while canceling query: " @@ -478,8 +482,8 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, _shuttingDownFetcher.reset(); // Move the old fetcher into the shutting down instance. _shuttingDownFetcher.swap(_fetcher); - // Create and start fetcher with new starting optime. - _fetcher = _makeFetcher(_lastFetched.opTime); + // Create and start fetcher with current term and new starting optime. + _fetcher = _makeFetcher(currentTerm, _lastFetched.opTime); auto scheduleStatus = _scheduleFetcher_inlock(); if (scheduleStatus.isOK()) { log() << "Scheduled new oplog query " << _fetcher->toString(); @@ -641,9 +645,11 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, return; } - getMoreBob->appendElements(makeGetMoreCommandObject(_dataReplicatorExternalState, - queryResponse.nss, + auto lastCommittedWithCurrentTerm = + _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); + getMoreBob->appendElements(makeGetMoreCommandObject(queryResponse.nss, queryResponse.cursorId, + lastCommittedWithCurrentTerm, _awaitDataTimeout)); } @@ -669,12 +675,13 @@ void OplogFetcher::_finishCallback(Status status, OpTimeWithHash opTimeWithHash) std::swap(_onShutdownCallbackFn, onShutdownCallbackFn); } -std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(OpTime lastFetchedOpTime) { +std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(long long currentTerm, + OpTime lastFetchedOpTime) { return stdx::make_unique<Fetcher>( _executor, _source, _nss.db().toString(), - makeFindCommandObject(_dataReplicatorExternalState, _nss, lastFetchedOpTime), + makeFindCommandObject(_nss, currentTerm, lastFetchedOpTime), stdx::bind(&OplogFetcher::_callback, this, stdx::placeholders::_1, stdx::placeholders::_3), _metadataObject, kOplogQueryNetworkTimeout); diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 5d739bfae0c..ad4fde296b1 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -237,7 +237,7 @@ private: /** * Creates a new instance of the fetcher to tail the remote oplog starting at the given optime. */ - std::unique_ptr<Fetcher> _makeFetcher(OpTime lastFetchedOpTime); + std::unique_ptr<Fetcher> _makeFetcher(long long currentTerm, OpTime lastFetchedOpTime); /** * Returns whether the oplog fetcher is in shutdown. |