summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathan Myers <nathan.myers@10gen.com>2017-09-25 15:53:59 -0400
committerNathan Myers <nathan.myers@10gen.com>2017-09-25 15:53:59 -0400
commitd7aca6435e8ccc89005a97dc585dfbe429a17dec (patch)
tree8f42ebe1db5e80b71d5e97730747971acea7fd02
parent829c1d6afe8177433192e6af84bf4536c330caee (diff)
downloadmongo-d7aca6435e8ccc89005a97dc585dfbe429a17dec.tar.gz
SERVER-30838 Remove _inlock names in sharding subsystem
-rw-r--r--src/mongo/s/async_requests_sender.cpp10
-rw-r--r--src/mongo/s/async_requests_sender.h6
-rw-r--r--src/mongo/s/catalog_cache.cpp31
-rw-r--r--src/mongo/s/catalog_cache.h10
-rw-r--r--src/mongo/s/client/shard_registry.cpp28
-rw-r--r--src/mongo/s/client/shard_registry.h16
-rw-r--r--src/mongo/s/query/async_results_merger.cpp207
-rw-r--r--src/mongo/s/query/async_results_merger.h59
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp30
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h8
10 files changed, 208 insertions, 197 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 4821fb0f504..0dc955fe119 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -77,7 +77,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
// We lock so that no callbacks signal the notification until after we are done scheduling
// requests, to prevent signaling the notification twice, which is illegal.
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _scheduleRequests_inlock();
+ _scheduleRequests(lk);
}
AsyncRequestsSender::~AsyncRequestsSender() {
_cancelPendingRequests();
@@ -142,7 +142,7 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
_notification.emplace();
if (!_stopRetrying) {
- _scheduleRequests_inlock();
+ _scheduleRequests(lk);
}
// Check if any remote is ready.
@@ -171,7 +171,7 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
return boost::none;
}
-void AsyncRequestsSender::_scheduleRequests_inlock() {
+void AsyncRequestsSender::_scheduleRequests(WithLock lk) {
invariant(!_stopRetrying);
// Schedule remote work on hosts for which we have not sent a request or need to retry.
for (size_t i = 0; i < _remotes.size(); ++i) {
@@ -212,7 +212,7 @@ void AsyncRequestsSender::_scheduleRequests_inlock() {
// If the remote does not have a response or pending request, schedule remote work for it.
if (!remote.swResponse && !remote.cbHandle.isValid()) {
- auto scheduleStatus = _scheduleRequest_inlock(i);
+ auto scheduleStatus = _scheduleRequest(lk, i);
if (!scheduleStatus.isOK()) {
remote.swResponse = std::move(scheduleStatus);
// Signal the notification indicating the remote had an error (we need to do this
@@ -226,7 +226,7 @@ void AsyncRequestsSender::_scheduleRequests_inlock() {
}
}
-Status AsyncRequestsSender::_scheduleRequest_inlock(size_t remoteIndex) {
+Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) {
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 8dfc013c380..70d574a8ef7 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -41,6 +41,7 @@
#include "mongo/s/shard_id.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/notification.h"
+#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -227,7 +228,7 @@ private:
*
* On failure to schedule a request, signals the notification.
*/
- void _scheduleRequests_inlock();
+ void _scheduleRequests(WithLock);
/**
* Helper to schedule a command to a remote.
@@ -237,7 +238,7 @@ private:
*
* Returns success if the command was scheduled successfully.
*/
- Status _scheduleRequest_inlock(size_t remoteIndex);
+ Status _scheduleRequest(WithLock, size_t remoteIndex);
/**
* The callback for a remote command.
@@ -276,7 +277,6 @@ private:
Status _interruptStatus = Status::OK();
// Must be acquired before accessing the below data members.
- // Must also be held when calling any of the '_inlock()' helper functions.
stdx::mutex _mutex;
// Data tracking the state of our communication with each of the remote nodes.
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 59602925efc..5681d0afa05 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -42,6 +42,7 @@
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/log.h"
#include "mongo/util/timer.h"
@@ -160,8 +161,7 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo(
if (!refreshNotification) {
refreshNotification = (collEntry.refreshCompletionNotification =
std::make_shared<Notification<Status>>());
- _scheduleCollectionRefresh_inlock(
- dbEntry, std::move(collEntry.routingInfo), nss, 1);
+ _scheduleCollectionRefresh(ul, dbEntry, std::move(collEntry.routingInfo), nss, 1);
}
// Wait on the notification outside of the mutex
@@ -313,18 +313,18 @@ std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase(Oper
dbDesc.getPrimary(), dbDesc.getSharded(), std::move(collectionEntries)});
}
-void CatalogCache::_scheduleCollectionRefresh_inlock(
- std::shared_ptr<DatabaseInfoEntry> dbEntry,
- std::shared_ptr<ChunkManager> existingRoutingInfo,
- const NamespaceString& nss,
- int refreshAttempt) {
+void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
+ std::shared_ptr<DatabaseInfoEntry> dbEntry,
+ std::shared_ptr<ChunkManager> existingRoutingInfo,
+ NamespaceString const& nss,
+ int refreshAttempt) {
Timer t;
const ChunkVersion startingCollectionVersion =
(existingRoutingInfo ? existingRoutingInfo->getVersion() : ChunkVersion::UNSHARDED());
- const auto refreshFailed_inlock =
- [ this, t, dbEntry, nss, refreshAttempt ](const Status& status) noexcept {
+ const auto refreshFailed =
+ [ this, t, dbEntry, nss, refreshAttempt ](WithLock lk, const Status& status) noexcept {
log() << "Refresh for collection " << nss << " took " << t.millis() << " ms and failed"
<< causedBy(redact(status));
@@ -337,7 +337,7 @@ void CatalogCache::_scheduleCollectionRefresh_inlock(
// refresh again
if (status == ErrorCodes::ConflictingOperationInProgress &&
refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) {
- _scheduleCollectionRefresh_inlock(dbEntry, nullptr, nss, refreshAttempt + 1);
+ _scheduleCollectionRefresh(lk, dbEntry, nullptr, nss, refreshAttempt + 1);
} else {
// Leave needsRefresh to true so that any subsequent get attempts will kick off
// another round of refresh
@@ -346,17 +346,16 @@ void CatalogCache::_scheduleCollectionRefresh_inlock(
}
};
- const auto refreshCallback =
- [ this, t, dbEntry, nss, existingRoutingInfo, refreshFailed_inlock ](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
+ const auto refreshCallback = [ this, t, dbEntry, nss, existingRoutingInfo, refreshFailed ](
+ OperationContext * opCtx,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
std::shared_ptr<ChunkManager> newRoutingInfo;
try {
newRoutingInfo = refreshCollectionRoutingInfo(
opCtx, nss, std::move(existingRoutingInfo), std::move(swCollAndChunks));
} catch (const DBException& ex) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- refreshFailed_inlock(ex.toStatus());
+ refreshFailed(lg, ex.toStatus());
return;
}
@@ -397,7 +396,7 @@ void CatalogCache::_scheduleCollectionRefresh_inlock(
invariant(status != ErrorCodes::ConflictingOperationInProgress);
stdx::lock_guard<stdx::mutex> lg(_mutex);
- refreshFailed_inlock(status);
+ refreshFailed(lg, status);
}
}
diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h
index 00f3465d4c3..a2506e1c217 100644
--- a/src/mongo/s/catalog_cache.h
+++ b/src/mongo/s/catalog_cache.h
@@ -37,6 +37,7 @@
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/notification.h"
+#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/string_map.h"
namespace mongo {
@@ -161,10 +162,11 @@ private:
* Non-blocking call which schedules an asynchronous refresh for the specified namespace. The
* namespace must be in the 'needRefresh' state.
*/
- void _scheduleCollectionRefresh_inlock(std::shared_ptr<DatabaseInfoEntry> dbEntry,
- std::shared_ptr<ChunkManager> existingRoutingInfo,
- const NamespaceString& nss,
- int refreshAttempt);
+ void _scheduleCollectionRefresh(WithLock,
+ std::shared_ptr<DatabaseInfoEntry> dbEntry,
+ std::shared_ptr<ChunkManager> existingRoutingInfo,
+ NamespaceString const& nss,
+ int refreshAttempt);
// Interface from which chunks will be retrieved
CatalogCacheLoader& _cacheLoader;
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 6fc44deefb2..4c581f18475 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -57,6 +57,7 @@
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/log.h"
#include "mongo/util/map_util.h"
#include "mongo/util/mongoutils/str.h"
@@ -367,10 +368,6 @@ void ShardRegistry::replicaSetChangeConfigServerUpdateHook(const std::string& se
////////////// ShardRegistryData //////////////////
ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shardFactory) {
- _init(opCtx, shardFactory);
-}
-
-void ShardRegistryData::_init(OperationContext* opCtx, ShardFactory* shardFactory) {
auto shardsStatus =
grid.catalogClient()->getAllShards(opCtx, repl::ReadConcernLevel::kMajorityReadConcern);
@@ -412,7 +409,7 @@ void ShardRegistryData::_init(OperationContext* opCtx, ShardFactory* shardFactor
auto shard = shardFactory->createShard(std::move(std::get<0>(shardInfo)),
std::move(std::get<1>(shardInfo)));
- _addShard_inlock(std::move(shard), false);
+ _addShard(WithLock::withoutLock(), std::move(shard), false);
}
}
@@ -432,7 +429,7 @@ shared_ptr<Shard> ShardRegistryData::getConfigShard() const {
void ShardRegistryData::addConfigShard(std::shared_ptr<Shard> shard) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_configShard = shard;
- _addShard_inlock(shard, true);
+ _addShard(lk, shard, true);
}
shared_ptr<Shard> ShardRegistryData::findByRSName(const string& name) const {
@@ -448,10 +445,10 @@ shared_ptr<Shard> ShardRegistryData::findByHostAndPort(const HostAndPort& hostAn
shared_ptr<Shard> ShardRegistryData::findByShardId(const ShardId& shardId) const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _findByShardId_inlock(shardId);
+ return _findByShardId(lk, shardId);
}
-shared_ptr<Shard> ShardRegistryData::_findByShardId_inlock(const ShardId& shardId) const {
+shared_ptr<Shard> ShardRegistryData::_findByShardId(WithLock, ShardId const& shardId) const {
auto i = _lookup.find(shardId);
return (i != _lookup.end()) ? i->second : nullptr;
}
@@ -505,28 +502,31 @@ void ShardRegistryData::rebuildShardIfExists(const ConnectionString& newConnStri
return;
}
- _rebuildShard_inlock(newConnString, factory);
+ _rebuildShard(updateConnStringLock, newConnString, factory);
}
-void ShardRegistryData::_rebuildShard_inlock(const ConnectionString& newConnString,
- ShardFactory* factory) {
+void ShardRegistryData::_rebuildShard(WithLock lk,
+ ConnectionString const& newConnString,
+ ShardFactory* factory) {
auto it = _rsLookup.find(newConnString.getSetName());
invariant(it->second);
auto shard = factory->createShard(it->second->getId(), newConnString);
- _addShard_inlock(shard, true);
+ _addShard(lk, shard, true);
if (shard->isConfig()) {
_configShard = shard;
}
}
-void ShardRegistryData::_addShard_inlock(const std::shared_ptr<Shard>& shard, bool useOriginalCS) {
+void ShardRegistryData::_addShard(WithLock lk,
+ std::shared_ptr<Shard> const& shard,
+ bool useOriginalCS) {
const ShardId shardId = shard->getId();
const ConnectionString connString =
useOriginalCS ? shard->originalConnString() : shard->getConnString();
- auto currentShard = _findByShardId_inlock(shardId);
+ auto currentShard = _findByShardId(lk, shardId);
if (currentShard) {
auto oldConnString = currentShard->originalConnString();
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index 37ad27ba5b5..a1d6b333cf7 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -40,6 +40,7 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
+#include "mongo/util/concurrency/with_lock.h"
namespace mongo {
@@ -54,6 +55,9 @@ class ShardType;
class ShardRegistryData {
public:
+ /**
+ * Reads shards docs from the catalog client and fills in maps.
+ */
ShardRegistryData(OperationContext* opCtx, ShardFactory* shardFactory);
ShardRegistryData() = default;
~ShardRegistryData() = default;
@@ -100,22 +104,18 @@ public:
private:
/**
- * Reads shards docs from the catalog client and fills in maps.
- */
- void _init(OperationContext* opCtx, ShardFactory* factory);
-
- /**
* Creates a shard based on the specified information and puts it into the lookup maps.
* if useOriginalCS = true it will use the ConnectionSring used for shard creation to update
* lookup maps. Otherwise the current connection string from the Shard's RemoteCommandTargeter
* will be used.
*/
- void _addShard_inlock(const std::shared_ptr<Shard>&, bool useOriginalCS);
- std::shared_ptr<Shard> _findByShardId_inlock(const ShardId&) const;
- void _rebuildShard_inlock(const ConnectionString& newConnString, ShardFactory* factory);
+ void _addShard(WithLock, std::shared_ptr<Shard> const&, bool useOriginalCS);
+ auto _findByShardId(WithLock, ShardId const&) const -> std::shared_ptr<Shard>;
+ void _rebuildShard(WithLock, ConnectionString const& newConnString, ShardFactory* factory);
// Protects the lookup maps below.
mutable stdx::mutex _mutex;
+
using ShardMap = stdx::unordered_map<ShardId, std::shared_ptr<Shard>, ShardId::Hasher>;
// Map of both shardName -> Shard and hostName -> Shard
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index b867c7b43df..9d05cacfa57 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -42,7 +42,6 @@
#include "mongo/s/grid.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
-#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
@@ -65,9 +64,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
remote.cursorResponse.getNSS(),
remote.cursorResponse.getCursorId());
- // We don't check the return value of addBatchToBuffer here; if there was an error,
+ // We don't check the return value of _addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
- addBatchToBuffer(remoteIndex, remote.cursorResponse.getBatch());
+ _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse.getBatch());
++remoteIndex;
}
@@ -81,15 +80,15 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
AsyncResultsMerger::~AsyncResultsMerger() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(remotesExhausted_inlock() || _lifecycleState == kKillComplete);
+ invariant(_remotesExhausted(lk) || _lifecycleState == kKillComplete);
}
bool AsyncResultsMerger::remotesExhausted() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return remotesExhausted_inlock();
+ return _remotesExhausted(lk);
}
-bool AsyncResultsMerger::remotesExhausted_inlock() {
+bool AsyncResultsMerger::_remotesExhausted(WithLock) {
for (const auto& remote : _remotes) {
if (!remote.exhausted()) {
return false;
@@ -113,7 +112,7 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
bool AsyncResultsMerger::ready() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return ready_inlock();
+ return _ready(lk);
}
void AsyncResultsMerger::detachFromOperationContext() {
@@ -130,7 +129,7 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) {
_opCtx = opCtx;
}
-bool AsyncResultsMerger::ready_inlock() {
+bool AsyncResultsMerger::_ready(WithLock lk) {
if (_lifecycleState != kAlive) {
return true;
}
@@ -150,10 +149,10 @@ bool AsyncResultsMerger::ready_inlock() {
}
const bool hasSort = !_params->sort.isEmpty();
- return hasSort ? readySorted_inlock() : readyUnsorted_inlock();
+ return hasSort ? _readySorted(lk) : _readyUnsorted(lk);
}
-bool AsyncResultsMerger::readySorted_inlock() {
+bool AsyncResultsMerger::_readySorted(WithLock) {
// Tailable cursors cannot have a sort.
invariant(_params->tailableMode == TailableMode::kNormal);
@@ -166,7 +165,7 @@ bool AsyncResultsMerger::readySorted_inlock() {
return true;
}
-bool AsyncResultsMerger::readyUnsorted_inlock() {
+bool AsyncResultsMerger::_readyUnsorted(WithLock) {
bool allExhausted = true;
for (const auto& remote : _remotes) {
if (!remote.exhausted()) {
@@ -183,7 +182,7 @@ bool AsyncResultsMerger::readyUnsorted_inlock() {
StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- dassert(ready_inlock());
+ dassert(_ready(lk));
if (_lifecycleState != kAlive) {
return Status(ErrorCodes::IllegalOperation, "AsyncResultsMerger killed");
}
@@ -198,10 +197,10 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
}
const bool hasSort = !_params->sort.isEmpty();
- return hasSort ? nextReadySorted() : nextReadyUnsorted();
+ return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
}
-ClusterQueryResult AsyncResultsMerger::nextReadySorted() {
+ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) {
// Tailable cursors cannot have a sort.
invariant(_params->tailableMode == TailableMode::kNormal);
@@ -227,7 +226,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadySorted() {
return front;
}
-ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() {
+ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) {
size_t remotesAttempted = 0;
while (remotesAttempted < _remotes.size()) {
// It is illegal to call this method if there is an error received from any shard.
@@ -258,7 +257,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() {
return {};
}
-Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
+Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
@@ -283,10 +282,12 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
executor::RemoteCommandRequest request(
remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _metadataObj, _opCtx);
- auto callbackStatus = _executor->scheduleRemoteCommand(
- request,
- stdx::bind(
- &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, remoteIndex));
+ auto callbackStatus =
+ _executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) {
+ stdx::lock_guard<stdx::mutex> lk(this->_mutex);
+ this->_handleBatchResponse(lk, cbData, remoteIndex);
+ });
+
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
}
@@ -330,7 +331,7 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) {
// If this remote is not exhausted and there is no outstanding request for it, schedule
// work to retrieve the next batch.
- auto nextBatchStatus = askForNextBatch_inlock(i);
+ auto nextBatchStatus = _askForNextBatch(lk, i);
if (!nextBatchStatus.isOK()) {
return nextBatchStatus;
}
@@ -348,12 +349,13 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
// _currentEvent with a new event, new results became available. In this case we have to signal
// the new event right away to propagate the fact that the previous event had been signaled to
// the new event.
- signalCurrentEventIfReady_inlock();
+ _signalCurrentEventIfReady(lk);
return eventToReturn;
}
-StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj& responseObj,
- const RemoteCursorData& remote) {
+StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse(
+ const BSONObj& responseObj, const RemoteCursorData& remote) {
+
auto getMoreParseStatus = CursorResponse::parseFromBSON(responseObj);
if (!getMoreParseStatus.isOK()) {
return getMoreParseStatus.getStatus();
@@ -372,76 +374,81 @@ StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj
return std::move(cursorResponse);
}
-void AsyncResultsMerger::handleBatchResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- auto& remote = _remotes[remoteIndex];
+void AsyncResultsMerger::_handleBatchResponse(WithLock lk,
+ CbData const& cbData,
+ size_t remoteIndex) {
+ // Got a response from remote, so indicate we are no longer waiting for one.
+ _remotes[remoteIndex].cbHandle = executor::TaskExecutor::CallbackHandle();
- // Clear the callback handle. This indicates that we are no longer waiting on a response from
- // 'remote'.
- remote.cbHandle = executor::TaskExecutor::CallbackHandle();
-
- // If we're in the process of shutting down then there's no need to process the batch.
+ // On shutdown, there is no need to process the response.
if (_lifecycleState != kAlive) {
- invariant(_lifecycleState == kKillStarted);
-
- // Make sure to wake up anyone waiting on '_currentEvent' if we're shutting down.
- signalCurrentEventIfReady_inlock();
-
- // If we're killed and we're not waiting on any more batches to come back, then we are ready
- // to kill the cursors on the remote hosts and clean up this cursor. Schedule the
- // killCursors command and signal that this cursor is safe now safe to destroy. We have to
- // promise not to touch any members of this class because 'this' could become invalid as
- // soon as we signal the event.
- if (!haveOutstandingBatchRequests_inlock()) {
- // If the event handle is invalid, then the executor is in the middle of shutting down,
- // and we can't schedule any more work for it to complete.
- if (_killCursorsScheduledEvent.isValid()) {
- scheduleKillCursors_inlock(_opCtx);
- _executor->signalEvent(_killCursorsScheduledEvent);
- }
+ _signalCurrentEventIfReady(lk); // First, wake up anyone waiting on '_currentEvent'.
+ _cleanUpKilledBatch(lk);
+ return;
+ }
+ try {
+ _processBatchResults(lk, cbData.response, remoteIndex);
+ } catch (DBException const& e) {
+ _remotes[remoteIndex].status = e.toStatus();
+ }
+ _signalCurrentEventIfReady(lk); // Wake up anyone waiting on '_currentEvent'.
+}
- _lifecycleState = kKillComplete;
+void AsyncResultsMerger::_cleanUpKilledBatch(WithLock lk) {
+ invariant(_lifecycleState == kKillStarted);
+
+ // If we're killed and we're not waiting on any more batches to come back, then we are ready
+ // to kill the cursors on the remote hosts and clean up this cursor. Schedule the killCursors
+ // command and signal that this cursor is now safe to destroy. We must not touch this object
+ // again after dropping the lock, because 'this' could become invalid immediately.
+ if (!_haveOutstandingBatchRequests(lk)) {
+ // If the event handle is invalid, then the executor is in the middle of shutting down,
+ // and we can't schedule any more work for it to complete.
+ if (_killCursorsScheduledEvent.isValid()) {
+ _scheduleKillCursors(lk, _opCtx);
+ _executor->signalEvent(_killCursorsScheduledEvent);
}
- return;
+ _lifecycleState = kKillComplete;
+ }
+}
+
+void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t remoteIndex) {
+ auto& remote = _remotes[remoteIndex];
+ remote.status = std::move(status);
+ // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
+ // remove the unreachable host entirely from consideration by marking it as exhausted.
+ if (_params->isAllowPartialResults) {
+ remote.status = Status::OK();
+
+ // Clear the results buffer and cursor id.
+ std::queue<ClusterQueryResult> emptyBuffer;
+ std::swap(remote.docBuffer, emptyBuffer);
+ remote.cursorId = 0;
}
+}
- // Early return from this point on signal anyone waiting on an event, if ready() is true.
- ScopeGuard signaller = MakeGuard(&AsyncResultsMerger::signalCurrentEventIfReady_inlock, this);
- StatusWith<CursorResponse> cursorResponseStatus(
- cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote)
- : cbData.response.status);
+void AsyncResultsMerger::_processBatchResults(WithLock lk,
+ CbResponse const& response,
+ size_t remoteIndex) {
+ auto& remote = _remotes[remoteIndex];
+ if (!response.isOK()) {
+ _cleanUpFailedBatch(lk, response.status, remoteIndex);
+ return;
+ }
+ auto cursorResponseStatus = _parseCursorResponse(response.data, remote);
if (!cursorResponseStatus.isOK()) {
- if (cursorResponseStatus == ErrorCodes::ExceededTimeLimit &&
- _params->tailableMode != TailableMode::kNormal) {
- // We timed out before hearing back from the shard,
- }
- remote.status = cursorResponseStatus.getStatus();
- // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
- // remove the unreachable host entirely from consideration by marking it as exhausted.
- if (_params->isAllowPartialResults) {
- remote.status = Status::OK();
-
- // Clear the results buffer and cursor id.
- std::queue<ClusterQueryResult> emptyBuffer;
- std::swap(remote.docBuffer, emptyBuffer);
- remote.cursorId = 0;
- }
-
+ _cleanUpFailedBatch(lk, cursorResponseStatus.getStatus(), remoteIndex);
return;
}
- // Response successfully received.
-
- auto cursorResponse = std::move(cursorResponseStatus.getValue());
+ CursorResponse cursorResponse = std::move(cursorResponseStatus.getValue());
// Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard.
remote.cursorId = cursorResponse.getCursorId();
// Save the batch in the remote's buffer.
- if (!addBatchToBuffer(remoteIndex, cursorResponse.getBatch())) {
+ if (!_addBatchToBuffer(lk, remoteIndex, cursorResponse.getBatch())) {
return;
}
@@ -458,21 +465,16 @@ void AsyncResultsMerger::handleBatchResponse(
//
// We do not ask for the next batch if the cursor is tailable, as batches received from remote
// tailable cursors should be passed through to the client without asking for more batches.
- if (_params->tailableMode == TailableMode::kNormal && !remote.hasNext() &&
- !remote.exhausted()) {
- remote.status = askForNextBatch_inlock(remoteIndex);
- if (!remote.status.isOK()) {
- return;
- }
+ if (_params->tailableMode != TailableMode::kNormal || remote.hasNext() || remote.exhausted()) {
+ return;
}
- // ScopeGuard requires dismiss on success, but we want waiter to be signalled on success as
- // well as failure.
- signaller.Dismiss();
- signalCurrentEventIfReady_inlock();
+ remote.status = _askForNextBatch(lk, remoteIndex);
}
-bool AsyncResultsMerger::addBatchToBuffer(size_t remoteIndex, const std::vector<BSONObj>& batch) {
+bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
+ size_t remoteIndex,
+ std::vector<BSONObj> const& batch) {
auto& remote = _remotes[remoteIndex];
for (const auto& obj : batch) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
@@ -499,8 +501,8 @@ bool AsyncResultsMerger::addBatchToBuffer(size_t remoteIndex, const std::vector<
return true;
}
-void AsyncResultsMerger::signalCurrentEventIfReady_inlock() {
- if (ready_inlock() && _currentEvent.isValid()) {
+void AsyncResultsMerger::_signalCurrentEventIfReady(WithLock lk) {
+ if (_ready(lk) && _currentEvent.isValid()) {
// To prevent ourselves from signalling the event twice, we set '_currentEvent' as
// invalid after signalling it.
_executor->signalEvent(_currentEvent);
@@ -508,7 +510,7 @@ void AsyncResultsMerger::signalCurrentEventIfReady_inlock() {
}
}
-bool AsyncResultsMerger::haveOutstandingBatchRequests_inlock() {
+bool AsyncResultsMerger::_haveOutstandingBatchRequests(WithLock) {
for (const auto& remote : _remotes) {
if (remote.cbHandle.isValid()) {
return true;
@@ -518,7 +520,7 @@ bool AsyncResultsMerger::haveOutstandingBatchRequests_inlock() {
return false;
}
-void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* opCtx) {
+void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx) {
invariant(_lifecycleState == kKillStarted);
invariant(_killCursorsScheduledEvent.isValid());
@@ -531,20 +533,13 @@ void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* opCtx) {
executor::RemoteCommandRequest request(
remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx);
- _executor
- ->scheduleRemoteCommand(request,
- stdx::bind(&AsyncResultsMerger::handleKillCursorsResponse,
- stdx::placeholders::_1))
- .status_with_transitional_ignore();
+ // Send kill request; discard callback handle, if any, or failure report, if not.
+ Status s = _executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus();
+ std::move(s).ignore();
}
}
}
-void AsyncResultsMerger::handleKillCursorsResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- // We just ignore any killCursors command responses.
-}
-
executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_killCursorsScheduledEvent.isValid()) {
@@ -559,7 +554,7 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o
auto statusWithEvent = _executor->makeEvent();
if (ErrorCodes::isShutdownError(statusWithEvent.getStatus().code())) {
// The underlying task executor is shutting down.
- if (!haveOutstandingBatchRequests_inlock()) {
+ if (!_haveOutstandingBatchRequests(lk)) {
_lifecycleState = kKillComplete;
}
return executor::TaskExecutor::EventHandle();
@@ -570,8 +565,8 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o
// If we're not waiting for responses from remotes, we can schedule killCursors commands on the
// remotes now. Otherwise, we have to wait until all responses are back, and then we can kill
// the remote cursors.
- if (!haveOutstandingBatchRequests_inlock()) {
- scheduleKillCursors_inlock(opCtx);
+ if (!_haveOutstandingBatchRequests(lk)) {
+ _scheduleKillCursors(lk, opCtx);
_lifecycleState = kKillComplete;
_executor->signalEvent(_killCursorsScheduledEvent);
} else {
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index c6ec2a26052..6a1ebb4ee8c 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -40,6 +40,7 @@
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_query_result.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -270,18 +271,12 @@ private:
enum LifecycleState { kAlive, kKillStarted, kKillComplete };
/**
- * Callback run to handle a response from a killCursors command.
- */
- static void handleKillCursorsResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData);
-
- /**
* Parses the find or getMore command response object to a CursorResponse.
*
* Returns a non-OK response if the response fails to parse or if there is a cursor id mismatch.
*/
- static StatusWith<CursorResponse> parseCursorResponse(const BSONObj& responseObj,
- const RemoteCursorData& remote);
+ static StatusWith<CursorResponse> _parseCursorResponse(const BSONObj& responseObj,
+ const RemoteCursorData& remote);
/**
* Helper to schedule a command asking the remote node for another batch of results.
@@ -291,43 +286,60 @@ private:
*
* Returns success if the command to retrieve the next batch was scheduled successfully.
*/
- Status askForNextBatch_inlock(size_t remoteIndex);
+ Status _askForNextBatch(WithLock, size_t remoteIndex);
/**
* Checks whether or not the remote cursors are all exhausted.
*/
- bool remotesExhausted_inlock();
+ bool _remotesExhausted(WithLock);
//
// Helpers for ready().
//
- bool ready_inlock();
- bool readySorted_inlock();
- bool readyUnsorted_inlock();
+ bool _ready(WithLock);
+ bool _readySorted(WithLock);
+ bool _readyUnsorted(WithLock);
//
// Helpers for nextReady().
//
- ClusterQueryResult nextReadySorted();
- ClusterQueryResult nextReadyUnsorted();
+ ClusterQueryResult _nextReadySorted(WithLock);
+ ClusterQueryResult _nextReadyUnsorted(WithLock);
+
+ using CbData = executor::TaskExecutor::RemoteCommandCallbackArgs;
+ using CbResponse = executor::TaskExecutor::ResponseStatus;
/**
- * When nextEvent() schedules remote work, it passes this method as a callback. The TaskExecutor
- * will call this function, passing the response from the remote.
+ * When nextEvent() schedules remote work, the callback uses this function to process results.
*
* 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore
* indicates which node the response came from and where the new result documents should be
* buffered.
*/
- void handleBatchResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
- size_t remoteIndex);
+ void _handleBatchResponse(WithLock, CbData const&, size_t remoteIndex);
+
+ /**
+ * Cleans up if the remote cursor was killed while waiting for a response.
+ */
+ void _cleanUpKilledBatch(WithLock);
+
+ /**
+ * Cleans up after remote query failure.
+ */
+ void _cleanUpFailedBatch(WithLock lk, Status status, size_t remoteIndex);
+
+ /**
+ * Processes results from a remote query.
+ */
+ void _processBatchResults(WithLock, CbResponse const&, size_t remoteIndex);
+
/**
* Adds the batch of results to the RemoteCursorData. Returns false if there was an error
* parsing the batch.
*/
- bool addBatchToBuffer(size_t remoteIndex, const std::vector<BSONObj>& batch);
+ bool _addBatchToBuffer(WithLock, size_t remoteIndex, std::vector<BSONObj> const& batch);
/**
* If there is a valid unsignaled event that has been requested via nextReady() and there are
@@ -336,17 +348,17 @@ private:
* Invalidates the current event, as we must signal the event exactly once and we only keep a
* handle to a valid event if it is unsignaled.
*/
- void signalCurrentEventIfReady_inlock();
+ void _signalCurrentEventIfReady(WithLock);
/**
* Returns true if this async cursor is waiting to receive another batch from a remote.
*/
- bool haveOutstandingBatchRequests_inlock();
+ bool _haveOutstandingBatchRequests(WithLock);
/**
* Schedules a killCursors command to be run on all remote hosts that have open cursors.
*/
- void scheduleKillCursors_inlock(OperationContext* opCtx);
+ void _scheduleKillCursors(WithLock, OperationContext* opCtx);
OperationContext* _opCtx;
executor::TaskExecutor* _executor;
@@ -357,7 +369,6 @@ private:
BSONObj _metadataObj;
// Must be acquired before accessing any data members (other than _params, which is read-only).
- // Must also be held when calling any of the '_inlock()' helper functions.
stdx::mutex _mutex;
// Data tracking the state of our communication with each of the remote nodes.
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 5e9b8872289..5683071689b 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -196,10 +196,10 @@ ClusterCursorManager::~ClusterCursorManager() {
}
void ClusterCursorManager::shutdown(OperationContext* opCtx) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- lk.unlock();
-
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+ }
killAllCursors();
reapZombieCursors(opCtx);
}
@@ -272,7 +272,7 @@ StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCur
"Cannot check out cursor as we are in the process of shutting down");
}
- CursorEntry* entry = getEntry_inlock(nss, cursorId);
+ CursorEntry* entry = _getEntry(lk, nss, cursorId);
if (!entry) {
return cursorNotFoundStatus(nss, cursorId);
}
@@ -317,7 +317,7 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu
const bool remotesExhausted = cursor->remotesExhausted();
- CursorEntry* entry = getEntry_inlock(nss, cursorId);
+ CursorEntry* entry = _getEntry(lk, nss, cursorId);
invariant(entry);
entry->setLastActive(now);
@@ -336,7 +336,7 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu
// The cursor is exhausted, is not already scheduled for deletion, and does not have any
// remote cursor state left to clean up. We can delete the cursor right away.
- auto detachedCursor = detachCursor_inlock(nss, cursorId);
+ auto detachedCursor = _detachCursor(lk, nss, cursorId);
invariantOK(detachedCursor.getStatus());
// Deletion of the cursor can happen out of the lock.
@@ -347,7 +347,7 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu
Status ClusterCursorManager::killCursor(const NamespaceString& nss, CursorId cursorId) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- CursorEntry* entry = getEntry_inlock(nss, cursorId);
+ CursorEntry* entry = _getEntry(lk, nss, cursorId);
if (!entry) {
return cursorNotFoundStatus(nss, cursorId);
}
@@ -416,7 +416,7 @@ std::size_t ClusterCursorManager::reapZombieCursors(OperationContext* opCtx) {
for (auto& cursorDescriptor : zombieCursorDescriptors) {
StatusWith<std::unique_ptr<ClusterClientCursor>> zombieCursor =
- detachCursor_inlock(cursorDescriptor.ns, cursorDescriptor.cursorId);
+ _detachCursor(lk, cursorDescriptor.ns, cursorDescriptor.cursorId);
if (!zombieCursor.isOK()) {
// Cursor in use, or has already been deleted.
continue;
@@ -536,8 +536,9 @@ boost::optional<NamespaceString> ClusterCursorManager::getNamespaceForCursorId(
return it->second;
}
-ClusterCursorManager::CursorEntry* ClusterCursorManager::getEntry_inlock(const NamespaceString& nss,
- CursorId cursorId) {
+auto ClusterCursorManager::_getEntry(WithLock, NamespaceString const& nss, CursorId cursorId)
+ -> CursorEntry* {
+
auto nsToContainerIt = _namespaceToContainerMap.find(nss);
if (nsToContainerIt == _namespaceToContainerMap.end()) {
return nullptr;
@@ -551,9 +552,10 @@ ClusterCursorManager::CursorEntry* ClusterCursorManager::getEntry_inlock(const N
return &entryMapIt->second;
}
-StatusWith<std::unique_ptr<ClusterClientCursor>> ClusterCursorManager::detachCursor_inlock(
- const NamespaceString& nss, CursorId cursorId) {
- CursorEntry* entry = getEntry_inlock(nss, cursorId);
+StatusWith<std::unique_ptr<ClusterClientCursor>> ClusterCursorManager::_detachCursor(
+ WithLock lk, NamespaceString const& nss, CursorId cursorId) {
+
+ CursorEntry* entry = _getEntry(lk, nss, cursorId);
if (!entry) {
return cursorNotFoundStatus(nss, cursorId);
}
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index d6eb8f47abf..6754624583d 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -39,6 +39,7 @@
#include "mongo/s/query/cluster_client_cursor.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
+#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -413,7 +414,7 @@ private:
*
* Not thread-safe.
*/
- CursorEntry* getEntry_inlock(const NamespaceString& nss, CursorId cursorId);
+ CursorEntry* _getEntry(WithLock, NamespaceString const& nss, CursorId cursorId);
/**
* De-registers the given cursor, and returns an owned pointer to the underlying
@@ -424,8 +425,9 @@ private:
*
* Not thread-safe.
*/
- StatusWith<std::unique_ptr<ClusterClientCursor>> detachCursor_inlock(const NamespaceString& nss,
- CursorId cursorId);
+ StatusWith<std::unique_ptr<ClusterClientCursor>> _detachCursor(WithLock,
+ NamespaceString const& nss,
+ CursorId cursorId);
/**
* CursorEntry is a moveable, non-copyable container for a single cursor.