diff options
author | Nathan Myers <nathan.myers@10gen.com> | 2017-09-25 15:53:59 -0400 |
---|---|---|
committer | Nathan Myers <nathan.myers@10gen.com> | 2017-09-25 15:53:59 -0400 |
commit | d7aca6435e8ccc89005a97dc585dfbe429a17dec (patch) | |
tree | 8f42ebe1db5e80b71d5e97730747971acea7fd02 | |
parent | 829c1d6afe8177433192e6af84bf4536c330caee (diff) | |
download | mongo-d7aca6435e8ccc89005a97dc585dfbe429a17dec.tar.gz |
SERVER-30838 Remove _inlock names in sharding subsystem
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 6 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache.cpp | 31 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache.h | 10 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 28 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.h | 16 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 207 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 59 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager.cpp | 30 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager.h | 8 |
10 files changed, 208 insertions, 197 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 4821fb0f504..0dc955fe119 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -77,7 +77,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, // We lock so that no callbacks signal the notification until after we are done scheduling // requests, to prevent signaling the notification twice, which is illegal. stdx::lock_guard<stdx::mutex> lk(_mutex); - _scheduleRequests_inlock(); + _scheduleRequests(lk); } AsyncRequestsSender::~AsyncRequestsSender() { _cancelPendingRequests(); @@ -142,7 +142,7 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() { _notification.emplace(); if (!_stopRetrying) { - _scheduleRequests_inlock(); + _scheduleRequests(lk); } // Check if any remote is ready. @@ -171,7 +171,7 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() { return boost::none; } -void AsyncRequestsSender::_scheduleRequests_inlock() { +void AsyncRequestsSender::_scheduleRequests(WithLock lk) { invariant(!_stopRetrying); // Schedule remote work on hosts for which we have not sent a request or need to retry. for (size_t i = 0; i < _remotes.size(); ++i) { @@ -212,7 +212,7 @@ void AsyncRequestsSender::_scheduleRequests_inlock() { // If the remote does not have a response or pending request, schedule remote work for it. if (!remote.swResponse && !remote.cbHandle.isValid()) { - auto scheduleStatus = _scheduleRequest_inlock(i); + auto scheduleStatus = _scheduleRequest(lk, i); if (!scheduleStatus.isOK()) { remote.swResponse = std::move(scheduleStatus); // Signal the notification indicating the remote had an error (we need to do this @@ -226,7 +226,7 @@ void AsyncRequestsSender::_scheduleRequests_inlock() { } } -Status AsyncRequestsSender::_scheduleRequest_inlock(size_t remoteIndex) { +Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) { auto& remote = _remotes[remoteIndex]; invariant(!remote.cbHandle.isValid()); diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 8dfc013c380..70d574a8ef7 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -41,6 +41,7 @@ #include "mongo/s/shard_id.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/notification.h" +#include "mongo/util/concurrency/with_lock.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -227,7 +228,7 @@ private: * * On failure to schedule a request, signals the notification. */ - void _scheduleRequests_inlock(); + void _scheduleRequests(WithLock); /** * Helper to schedule a command to a remote. @@ -237,7 +238,7 @@ private: * * Returns success if the command was scheduled successfully. */ - Status _scheduleRequest_inlock(size_t remoteIndex); + Status _scheduleRequest(WithLock, size_t remoteIndex); /** * The callback for a remote command. @@ -276,7 +277,6 @@ private: Status _interruptStatus = Status::OK(); // Must be acquired before accessing the below data members. - // Must also be held when calling any of the '_inlock()' helper functions. stdx::mutex _mutex; // Data tracking the state of our communication with each of the remote nodes. diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 59602925efc..5681d0afa05 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -42,6 +42,7 @@ #include "mongo/s/catalog/type_database.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/util/concurrency/with_lock.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" @@ -160,8 +161,7 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( if (!refreshNotification) { refreshNotification = (collEntry.refreshCompletionNotification = std::make_shared<Notification<Status>>()); - _scheduleCollectionRefresh_inlock( - dbEntry, std::move(collEntry.routingInfo), nss, 1); + _scheduleCollectionRefresh(ul, dbEntry, std::move(collEntry.routingInfo), nss, 1); } // Wait on the notification outside of the mutex @@ -313,18 +313,18 @@ std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase(Oper dbDesc.getPrimary(), dbDesc.getSharded(), std::move(collectionEntries)}); } -void CatalogCache::_scheduleCollectionRefresh_inlock( - std::shared_ptr<DatabaseInfoEntry> dbEntry, - std::shared_ptr<ChunkManager> existingRoutingInfo, - const NamespaceString& nss, - int refreshAttempt) { +void CatalogCache::_scheduleCollectionRefresh(WithLock lk, + std::shared_ptr<DatabaseInfoEntry> dbEntry, + std::shared_ptr<ChunkManager> existingRoutingInfo, + NamespaceString const& nss, + int refreshAttempt) { Timer t; const ChunkVersion startingCollectionVersion = (existingRoutingInfo ? existingRoutingInfo->getVersion() : ChunkVersion::UNSHARDED()); - const auto refreshFailed_inlock = - [ this, t, dbEntry, nss, refreshAttempt ](const Status& status) noexcept { + const auto refreshFailed = + [ this, t, dbEntry, nss, refreshAttempt ](WithLock lk, const Status& status) noexcept { log() << "Refresh for collection " << nss << " took " << t.millis() << " ms and failed" << causedBy(redact(status)); @@ -337,7 +337,7 @@ void CatalogCache::_scheduleCollectionRefresh_inlock( // refresh again if (status == ErrorCodes::ConflictingOperationInProgress && refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) { - _scheduleCollectionRefresh_inlock(dbEntry, nullptr, nss, refreshAttempt + 1); + _scheduleCollectionRefresh(lk, dbEntry, nullptr, nss, refreshAttempt + 1); } else { // Leave needsRefresh to true so that any subsequent get attempts will kick off // another round of refresh @@ -346,17 +346,16 @@ void CatalogCache::_scheduleCollectionRefresh_inlock( } }; - const auto refreshCallback = - [ this, t, dbEntry, nss, existingRoutingInfo, refreshFailed_inlock ]( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { + const auto refreshCallback = [ this, t, dbEntry, nss, existingRoutingInfo, refreshFailed ]( + OperationContext * opCtx, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { std::shared_ptr<ChunkManager> newRoutingInfo; try { newRoutingInfo = refreshCollectionRoutingInfo( opCtx, nss, std::move(existingRoutingInfo), std::move(swCollAndChunks)); } catch (const DBException& ex) { stdx::lock_guard<stdx::mutex> lg(_mutex); - refreshFailed_inlock(ex.toStatus()); + refreshFailed(lg, ex.toStatus()); return; } @@ -397,7 +396,7 @@ void CatalogCache::_scheduleCollectionRefresh_inlock( invariant(status != ErrorCodes::ConflictingOperationInProgress); stdx::lock_guard<stdx::mutex> lg(_mutex); - refreshFailed_inlock(status); + refreshFailed(lg, status); } } diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index 00f3465d4c3..a2506e1c217 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -37,6 +37,7 @@ #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/notification.h" +#include "mongo/util/concurrency/with_lock.h" #include "mongo/util/string_map.h" namespace mongo { @@ -161,10 +162,11 @@ private: * Non-blocking call which schedules an asynchronous refresh for the specified namespace. The * namespace must be in the 'needRefresh' state. */ - void _scheduleCollectionRefresh_inlock(std::shared_ptr<DatabaseInfoEntry> dbEntry, - std::shared_ptr<ChunkManager> existingRoutingInfo, - const NamespaceString& nss, - int refreshAttempt); + void _scheduleCollectionRefresh(WithLock, + std::shared_ptr<DatabaseInfoEntry> dbEntry, + std::shared_ptr<ChunkManager> existingRoutingInfo, + NamespaceString const& nss, + int refreshAttempt); // Interface from which chunks will be retrieved CatalogCacheLoader& _cacheLoader; diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 6fc44deefb2..4c581f18475 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -57,6 +57,7 @@ #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/with_lock.h" #include "mongo/util/log.h" #include "mongo/util/map_util.h" #include "mongo/util/mongoutils/str.h" @@ -367,10 +368,6 @@ void ShardRegistry::replicaSetChangeConfigServerUpdateHook(const std::string& se ////////////// ShardRegistryData ////////////////// ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shardFactory) { - _init(opCtx, shardFactory); -} - -void ShardRegistryData::_init(OperationContext* opCtx, ShardFactory* shardFactory) { auto shardsStatus = grid.catalogClient()->getAllShards(opCtx, repl::ReadConcernLevel::kMajorityReadConcern); @@ -412,7 +409,7 @@ void ShardRegistryData::_init(OperationContext* opCtx, ShardFactory* shardFactor auto shard = shardFactory->createShard(std::move(std::get<0>(shardInfo)), std::move(std::get<1>(shardInfo))); - _addShard_inlock(std::move(shard), false); + _addShard(WithLock::withoutLock(), std::move(shard), false); } } @@ -432,7 +429,7 @@ shared_ptr<Shard> ShardRegistryData::getConfigShard() const { void ShardRegistryData::addConfigShard(std::shared_ptr<Shard> shard) { stdx::lock_guard<stdx::mutex> lk(_mutex); _configShard = shard; - _addShard_inlock(shard, true); + _addShard(lk, shard, true); } shared_ptr<Shard> ShardRegistryData::findByRSName(const string& name) const { @@ -448,10 +445,10 @@ shared_ptr<Shard> ShardRegistryData::findByHostAndPort(const HostAndPort& hostAn shared_ptr<Shard> ShardRegistryData::findByShardId(const ShardId& shardId) const { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _findByShardId_inlock(shardId); + return _findByShardId(lk, shardId); } -shared_ptr<Shard> ShardRegistryData::_findByShardId_inlock(const ShardId& shardId) const { +shared_ptr<Shard> ShardRegistryData::_findByShardId(WithLock, ShardId const& shardId) const { auto i = _lookup.find(shardId); return (i != _lookup.end()) ? i->second : nullptr; } @@ -505,28 +502,31 @@ void ShardRegistryData::rebuildShardIfExists(const ConnectionString& newConnStri return; } - _rebuildShard_inlock(newConnString, factory); + _rebuildShard(updateConnStringLock, newConnString, factory); } -void ShardRegistryData::_rebuildShard_inlock(const ConnectionString& newConnString, - ShardFactory* factory) { +void ShardRegistryData::_rebuildShard(WithLock lk, + ConnectionString const& newConnString, + ShardFactory* factory) { auto it = _rsLookup.find(newConnString.getSetName()); invariant(it->second); auto shard = factory->createShard(it->second->getId(), newConnString); - _addShard_inlock(shard, true); + _addShard(lk, shard, true); if (shard->isConfig()) { _configShard = shard; } } -void ShardRegistryData::_addShard_inlock(const std::shared_ptr<Shard>& shard, bool useOriginalCS) { +void ShardRegistryData::_addShard(WithLock lk, + std::shared_ptr<Shard> const& shard, + bool useOriginalCS) { const ShardId shardId = shard->getId(); const ConnectionString connString = useOriginalCS ? shard->originalConnString() : shard->getConnString(); - auto currentShard = _findByShardId_inlock(shardId); + auto currentShard = _findByShardId(lk, shardId); if (currentShard) { auto oldConnString = currentShard->originalConnString(); diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 37ad27ba5b5..a1d6b333cf7 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -40,6 +40,7 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/unordered_map.h" +#include "mongo/util/concurrency/with_lock.h" namespace mongo { @@ -54,6 +55,9 @@ class ShardType; class ShardRegistryData { public: + /** + * Reads shards docs from the catalog client and fills in maps. + */ ShardRegistryData(OperationContext* opCtx, ShardFactory* shardFactory); ShardRegistryData() = default; ~ShardRegistryData() = default; @@ -100,22 +104,18 @@ public: private: /** - * Reads shards docs from the catalog client and fills in maps. - */ - void _init(OperationContext* opCtx, ShardFactory* factory); - - /** * Creates a shard based on the specified information and puts it into the lookup maps. * if useOriginalCS = true it will use the ConnectionSring used for shard creation to update * lookup maps. Otherwise the current connection string from the Shard's RemoteCommandTargeter * will be used. */ - void _addShard_inlock(const std::shared_ptr<Shard>&, bool useOriginalCS); - std::shared_ptr<Shard> _findByShardId_inlock(const ShardId&) const; - void _rebuildShard_inlock(const ConnectionString& newConnString, ShardFactory* factory); + void _addShard(WithLock, std::shared_ptr<Shard> const&, bool useOriginalCS); + auto _findByShardId(WithLock, ShardId const&) const -> std::shared_ptr<Shard>; + void _rebuildShard(WithLock, ConnectionString const& newConnString, ShardFactory* factory); // Protects the lookup maps below. mutable stdx::mutex _mutex; + using ShardMap = stdx::unordered_map<ShardId, std::shared_ptr<Shard>, ShardId::Hasher>; // Map of both shardName -> Shard and hostName -> Shard diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index b867c7b43df..9d05cacfa57 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -42,7 +42,6 @@ #include "mongo/s/grid.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" -#include "mongo/util/scopeguard.h" namespace mongo { namespace { @@ -65,9 +64,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, remote.cursorResponse.getNSS(), remote.cursorResponse.getCursorId()); - // We don't check the return value of addBatchToBuffer here; if there was an error, + // We don't check the return value of _addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. - addBatchToBuffer(remoteIndex, remote.cursorResponse.getBatch()); + _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse.getBatch()); ++remoteIndex; } @@ -81,15 +80,15 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, AsyncResultsMerger::~AsyncResultsMerger() { stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(remotesExhausted_inlock() || _lifecycleState == kKillComplete); + invariant(_remotesExhausted(lk) || _lifecycleState == kKillComplete); } bool AsyncResultsMerger::remotesExhausted() { stdx::lock_guard<stdx::mutex> lk(_mutex); - return remotesExhausted_inlock(); + return _remotesExhausted(lk); } -bool AsyncResultsMerger::remotesExhausted_inlock() { +bool AsyncResultsMerger::_remotesExhausted(WithLock) { for (const auto& remote : _remotes) { if (!remote.exhausted()) { return false; @@ -113,7 +112,7 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { bool AsyncResultsMerger::ready() { stdx::lock_guard<stdx::mutex> lk(_mutex); - return ready_inlock(); + return _ready(lk); } void AsyncResultsMerger::detachFromOperationContext() { @@ -130,7 +129,7 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) { _opCtx = opCtx; } -bool AsyncResultsMerger::ready_inlock() { +bool AsyncResultsMerger::_ready(WithLock lk) { if (_lifecycleState != kAlive) { return true; } @@ -150,10 +149,10 @@ bool AsyncResultsMerger::ready_inlock() { } const bool hasSort = !_params->sort.isEmpty(); - return hasSort ? readySorted_inlock() : readyUnsorted_inlock(); + return hasSort ? _readySorted(lk) : _readyUnsorted(lk); } -bool AsyncResultsMerger::readySorted_inlock() { +bool AsyncResultsMerger::_readySorted(WithLock) { // Tailable cursors cannot have a sort. invariant(_params->tailableMode == TailableMode::kNormal); @@ -166,7 +165,7 @@ bool AsyncResultsMerger::readySorted_inlock() { return true; } -bool AsyncResultsMerger::readyUnsorted_inlock() { +bool AsyncResultsMerger::_readyUnsorted(WithLock) { bool allExhausted = true; for (const auto& remote : _remotes) { if (!remote.exhausted()) { @@ -183,7 +182,7 @@ bool AsyncResultsMerger::readyUnsorted_inlock() { StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { stdx::lock_guard<stdx::mutex> lk(_mutex); - dassert(ready_inlock()); + dassert(_ready(lk)); if (_lifecycleState != kAlive) { return Status(ErrorCodes::IllegalOperation, "AsyncResultsMerger killed"); } @@ -198,10 +197,10 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { } const bool hasSort = !_params->sort.isEmpty(); - return hasSort ? nextReadySorted() : nextReadyUnsorted(); + return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk); } -ClusterQueryResult AsyncResultsMerger::nextReadySorted() { +ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) { // Tailable cursors cannot have a sort. invariant(_params->tailableMode == TailableMode::kNormal); @@ -227,7 +226,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadySorted() { return front; } -ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() { +ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) { size_t remotesAttempted = 0; while (remotesAttempted < _remotes.size()) { // It is illegal to call this method if there is an error received from any shard. @@ -258,7 +257,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() { return {}; } -Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { +Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { auto& remote = _remotes[remoteIndex]; invariant(!remote.cbHandle.isValid()); @@ -283,10 +282,12 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { executor::RemoteCommandRequest request( remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _metadataObj, _opCtx); - auto callbackStatus = _executor->scheduleRemoteCommand( - request, - stdx::bind( - &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, remoteIndex)); + auto callbackStatus = + _executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) { + stdx::lock_guard<stdx::mutex> lk(this->_mutex); + this->_handleBatchResponse(lk, cbData, remoteIndex); + }); + if (!callbackStatus.isOK()) { return callbackStatus.getStatus(); } @@ -330,7 +331,7 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) { // If this remote is not exhausted and there is no outstanding request for it, schedule // work to retrieve the next batch. - auto nextBatchStatus = askForNextBatch_inlock(i); + auto nextBatchStatus = _askForNextBatch(lk, i); if (!nextBatchStatus.isOK()) { return nextBatchStatus; } @@ -348,12 +349,13 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() // _currentEvent with a new event, new results became available. In this case we have to signal // the new event right away to propagate the fact that the previous event had been signaled to // the new event. - signalCurrentEventIfReady_inlock(); + _signalCurrentEventIfReady(lk); return eventToReturn; } -StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj& responseObj, - const RemoteCursorData& remote) { +StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse( + const BSONObj& responseObj, const RemoteCursorData& remote) { + auto getMoreParseStatus = CursorResponse::parseFromBSON(responseObj); if (!getMoreParseStatus.isOK()) { return getMoreParseStatus.getStatus(); @@ -372,76 +374,81 @@ StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj return std::move(cursorResponse); } -void AsyncResultsMerger::handleBatchResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - auto& remote = _remotes[remoteIndex]; +void AsyncResultsMerger::_handleBatchResponse(WithLock lk, + CbData const& cbData, + size_t remoteIndex) { + // Got a response from remote, so indicate we are no longer waiting for one. + _remotes[remoteIndex].cbHandle = executor::TaskExecutor::CallbackHandle(); - // Clear the callback handle. This indicates that we are no longer waiting on a response from - // 'remote'. - remote.cbHandle = executor::TaskExecutor::CallbackHandle(); - - // If we're in the process of shutting down then there's no need to process the batch. + // On shutdown, there is no need to process the response. if (_lifecycleState != kAlive) { - invariant(_lifecycleState == kKillStarted); - - // Make sure to wake up anyone waiting on '_currentEvent' if we're shutting down. - signalCurrentEventIfReady_inlock(); - - // If we're killed and we're not waiting on any more batches to come back, then we are ready - // to kill the cursors on the remote hosts and clean up this cursor. Schedule the - // killCursors command and signal that this cursor is safe now safe to destroy. We have to - // promise not to touch any members of this class because 'this' could become invalid as - // soon as we signal the event. - if (!haveOutstandingBatchRequests_inlock()) { - // If the event handle is invalid, then the executor is in the middle of shutting down, - // and we can't schedule any more work for it to complete. - if (_killCursorsScheduledEvent.isValid()) { - scheduleKillCursors_inlock(_opCtx); - _executor->signalEvent(_killCursorsScheduledEvent); - } + _signalCurrentEventIfReady(lk); // First, wake up anyone waiting on '_currentEvent'. + _cleanUpKilledBatch(lk); + return; + } + try { + _processBatchResults(lk, cbData.response, remoteIndex); + } catch (DBException const& e) { + _remotes[remoteIndex].status = e.toStatus(); + } + _signalCurrentEventIfReady(lk); // Wake up anyone waiting on '_currentEvent'. +} - _lifecycleState = kKillComplete; +void AsyncResultsMerger::_cleanUpKilledBatch(WithLock lk) { + invariant(_lifecycleState == kKillStarted); + + // If we're killed and we're not waiting on any more batches to come back, then we are ready + // to kill the cursors on the remote hosts and clean up this cursor. Schedule the killCursors + // command and signal that this cursor is now safe to destroy. We must not touch this object + // again after dropping the lock, because 'this' could become invalid immediately. + if (!_haveOutstandingBatchRequests(lk)) { + // If the event handle is invalid, then the executor is in the middle of shutting down, + // and we can't schedule any more work for it to complete. + if (_killCursorsScheduledEvent.isValid()) { + _scheduleKillCursors(lk, _opCtx); + _executor->signalEvent(_killCursorsScheduledEvent); } - return; + _lifecycleState = kKillComplete; + } +} + +void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t remoteIndex) { + auto& remote = _remotes[remoteIndex]; + remote.status = std::move(status); + // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We + // remove the unreachable host entirely from consideration by marking it as exhausted. + if (_params->isAllowPartialResults) { + remote.status = Status::OK(); + + // Clear the results buffer and cursor id. + std::queue<ClusterQueryResult> emptyBuffer; + std::swap(remote.docBuffer, emptyBuffer); + remote.cursorId = 0; } +} - // Early return from this point on signal anyone waiting on an event, if ready() is true. - ScopeGuard signaller = MakeGuard(&AsyncResultsMerger::signalCurrentEventIfReady_inlock, this); - StatusWith<CursorResponse> cursorResponseStatus( - cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote) - : cbData.response.status); +void AsyncResultsMerger::_processBatchResults(WithLock lk, + CbResponse const& response, + size_t remoteIndex) { + auto& remote = _remotes[remoteIndex]; + if (!response.isOK()) { + _cleanUpFailedBatch(lk, response.status, remoteIndex); + return; + } + auto cursorResponseStatus = _parseCursorResponse(response.data, remote); if (!cursorResponseStatus.isOK()) { - if (cursorResponseStatus == ErrorCodes::ExceededTimeLimit && - _params->tailableMode != TailableMode::kNormal) { - // We timed out before hearing back from the shard, - } - remote.status = cursorResponseStatus.getStatus(); - // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We - // remove the unreachable host entirely from consideration by marking it as exhausted. - if (_params->isAllowPartialResults) { - remote.status = Status::OK(); - - // Clear the results buffer and cursor id. - std::queue<ClusterQueryResult> emptyBuffer; - std::swap(remote.docBuffer, emptyBuffer); - remote.cursorId = 0; - } - + _cleanUpFailedBatch(lk, cursorResponseStatus.getStatus(), remoteIndex); return; } - // Response successfully received. - - auto cursorResponse = std::move(cursorResponseStatus.getValue()); + CursorResponse cursorResponse = std::move(cursorResponseStatus.getValue()); // Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard. remote.cursorId = cursorResponse.getCursorId(); // Save the batch in the remote's buffer. - if (!addBatchToBuffer(remoteIndex, cursorResponse.getBatch())) { + if (!_addBatchToBuffer(lk, remoteIndex, cursorResponse.getBatch())) { return; } @@ -458,21 +465,16 @@ void AsyncResultsMerger::handleBatchResponse( // // We do not ask for the next batch if the cursor is tailable, as batches received from remote // tailable cursors should be passed through to the client without asking for more batches. - if (_params->tailableMode == TailableMode::kNormal && !remote.hasNext() && - !remote.exhausted()) { - remote.status = askForNextBatch_inlock(remoteIndex); - if (!remote.status.isOK()) { - return; - } + if (_params->tailableMode != TailableMode::kNormal || remote.hasNext() || remote.exhausted()) { + return; } - // ScopeGuard requires dismiss on success, but we want waiter to be signalled on success as - // well as failure. - signaller.Dismiss(); - signalCurrentEventIfReady_inlock(); + remote.status = _askForNextBatch(lk, remoteIndex); } -bool AsyncResultsMerger::addBatchToBuffer(size_t remoteIndex, const std::vector<BSONObj>& batch) { +bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, + size_t remoteIndex, + std::vector<BSONObj> const& batch) { auto& remote = _remotes[remoteIndex]; for (const auto& obj : batch) { // If there's a sort, we're expecting the remote node to have given us back a sort key. @@ -499,8 +501,8 @@ bool AsyncResultsMerger::addBatchToBuffer(size_t remoteIndex, const std::vector< return true; } -void AsyncResultsMerger::signalCurrentEventIfReady_inlock() { - if (ready_inlock() && _currentEvent.isValid()) { +void AsyncResultsMerger::_signalCurrentEventIfReady(WithLock lk) { + if (_ready(lk) && _currentEvent.isValid()) { // To prevent ourselves from signalling the event twice, we set '_currentEvent' as // invalid after signalling it. _executor->signalEvent(_currentEvent); @@ -508,7 +510,7 @@ void AsyncResultsMerger::signalCurrentEventIfReady_inlock() { } } -bool AsyncResultsMerger::haveOutstandingBatchRequests_inlock() { +bool AsyncResultsMerger::_haveOutstandingBatchRequests(WithLock) { for (const auto& remote : _remotes) { if (remote.cbHandle.isValid()) { return true; @@ -518,7 +520,7 @@ bool AsyncResultsMerger::haveOutstandingBatchRequests_inlock() { return false; } -void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* opCtx) { +void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx) { invariant(_lifecycleState == kKillStarted); invariant(_killCursorsScheduledEvent.isValid()); @@ -531,20 +533,13 @@ void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* opCtx) { executor::RemoteCommandRequest request( remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx); - _executor - ->scheduleRemoteCommand(request, - stdx::bind(&AsyncResultsMerger::handleKillCursorsResponse, - stdx::placeholders::_1)) - .status_with_transitional_ignore(); + // Send kill request; discard callback handle, if any, or failure report, if not. + Status s = _executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus(); + std::move(s).ignore(); } } } -void AsyncResultsMerger::handleKillCursorsResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { - // We just ignore any killCursors command responses. -} - executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_killCursorsScheduledEvent.isValid()) { @@ -559,7 +554,7 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o auto statusWithEvent = _executor->makeEvent(); if (ErrorCodes::isShutdownError(statusWithEvent.getStatus().code())) { // The underlying task executor is shutting down. - if (!haveOutstandingBatchRequests_inlock()) { + if (!_haveOutstandingBatchRequests(lk)) { _lifecycleState = kKillComplete; } return executor::TaskExecutor::EventHandle(); @@ -570,8 +565,8 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o // If we're not waiting for responses from remotes, we can schedule killCursors commands on the // remotes now. Otherwise, we have to wait until all responses are back, and then we can kill // the remote cursors. - if (!haveOutstandingBatchRequests_inlock()) { - scheduleKillCursors_inlock(opCtx); + if (!_haveOutstandingBatchRequests(lk)) { + _scheduleKillCursors(lk, opCtx); _lifecycleState = kKillComplete; _executor->signalEvent(_killCursorsScheduledEvent); } else { diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index c6ec2a26052..6a1ebb4ee8c 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -40,6 +40,7 @@ #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/cluster_query_result.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/with_lock.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -270,18 +271,12 @@ private: enum LifecycleState { kAlive, kKillStarted, kKillComplete }; /** - * Callback run to handle a response from a killCursors command. - */ - static void handleKillCursorsResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData); - - /** * Parses the find or getMore command response object to a CursorResponse. * * Returns a non-OK response if the response fails to parse or if there is a cursor id mismatch. */ - static StatusWith<CursorResponse> parseCursorResponse(const BSONObj& responseObj, - const RemoteCursorData& remote); + static StatusWith<CursorResponse> _parseCursorResponse(const BSONObj& responseObj, + const RemoteCursorData& remote); /** * Helper to schedule a command asking the remote node for another batch of results. @@ -291,43 +286,60 @@ private: * * Returns success if the command to retrieve the next batch was scheduled successfully. */ - Status askForNextBatch_inlock(size_t remoteIndex); + Status _askForNextBatch(WithLock, size_t remoteIndex); /** * Checks whether or not the remote cursors are all exhausted. */ - bool remotesExhausted_inlock(); + bool _remotesExhausted(WithLock); // // Helpers for ready(). // - bool ready_inlock(); - bool readySorted_inlock(); - bool readyUnsorted_inlock(); + bool _ready(WithLock); + bool _readySorted(WithLock); + bool _readyUnsorted(WithLock); // // Helpers for nextReady(). // - ClusterQueryResult nextReadySorted(); - ClusterQueryResult nextReadyUnsorted(); + ClusterQueryResult _nextReadySorted(WithLock); + ClusterQueryResult _nextReadyUnsorted(WithLock); + + using CbData = executor::TaskExecutor::RemoteCommandCallbackArgs; + using CbResponse = executor::TaskExecutor::ResponseStatus; /** - * When nextEvent() schedules remote work, it passes this method as a callback. The TaskExecutor - * will call this function, passing the response from the remote. + * When nextEvent() schedules remote work, the callback uses this function to process results. * * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore * indicates which node the response came from and where the new result documents should be * buffered. */ - void handleBatchResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, - size_t remoteIndex); + void _handleBatchResponse(WithLock, CbData const&, size_t remoteIndex); + + /** + * Cleans up if the remote cursor was killed while waiting for a response. + */ + void _cleanUpKilledBatch(WithLock); + + /** + * Cleans up after remote query failure. + */ + void _cleanUpFailedBatch(WithLock lk, Status status, size_t remoteIndex); + + /** + * Processes results from a remote query. + */ + void _processBatchResults(WithLock, CbResponse const&, size_t remoteIndex); + /** * Adds the batch of results to the RemoteCursorData. Returns false if there was an error * parsing the batch. */ - bool addBatchToBuffer(size_t remoteIndex, const std::vector<BSONObj>& batch); + bool _addBatchToBuffer(WithLock, size_t remoteIndex, std::vector<BSONObj> const& batch); /** * If there is a valid unsignaled event that has been requested via nextReady() and there are @@ -336,17 +348,17 @@ private: * Invalidates the current event, as we must signal the event exactly once and we only keep a * handle to a valid event if it is unsignaled. */ - void signalCurrentEventIfReady_inlock(); + void _signalCurrentEventIfReady(WithLock); /** * Returns true if this async cursor is waiting to receive another batch from a remote. */ - bool haveOutstandingBatchRequests_inlock(); + bool _haveOutstandingBatchRequests(WithLock); /** * Schedules a killCursors command to be run on all remote hosts that have open cursors. */ - void scheduleKillCursors_inlock(OperationContext* opCtx); + void _scheduleKillCursors(WithLock, OperationContext* opCtx); OperationContext* _opCtx; executor::TaskExecutor* _executor; @@ -357,7 +369,6 @@ private: BSONObj _metadataObj; // Must be acquired before accessing any data members (other than _params, which is read-only). - // Must also be held when calling any of the '_inlock()' helper functions. stdx::mutex _mutex; // Data tracking the state of our communication with each of the remote nodes. diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 5e9b8872289..5683071689b 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -196,10 +196,10 @@ ClusterCursorManager::~ClusterCursorManager() { } void ClusterCursorManager::shutdown(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _inShutdown = true; - lk.unlock(); - + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _inShutdown = true; + } killAllCursors(); reapZombieCursors(opCtx); } @@ -272,7 +272,7 @@ StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCur "Cannot check out cursor as we are in the process of shutting down"); } - CursorEntry* entry = getEntry_inlock(nss, cursorId); + CursorEntry* entry = _getEntry(lk, nss, cursorId); if (!entry) { return cursorNotFoundStatus(nss, cursorId); } @@ -317,7 +317,7 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu const bool remotesExhausted = cursor->remotesExhausted(); - CursorEntry* entry = getEntry_inlock(nss, cursorId); + CursorEntry* entry = _getEntry(lk, nss, cursorId); invariant(entry); entry->setLastActive(now); @@ -336,7 +336,7 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu // The cursor is exhausted, is not already scheduled for deletion, and does not have any // remote cursor state left to clean up. We can delete the cursor right away. - auto detachedCursor = detachCursor_inlock(nss, cursorId); + auto detachedCursor = _detachCursor(lk, nss, cursorId); invariantOK(detachedCursor.getStatus()); // Deletion of the cursor can happen out of the lock. @@ -347,7 +347,7 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu Status ClusterCursorManager::killCursor(const NamespaceString& nss, CursorId cursorId) { stdx::lock_guard<stdx::mutex> lk(_mutex); - CursorEntry* entry = getEntry_inlock(nss, cursorId); + CursorEntry* entry = _getEntry(lk, nss, cursorId); if (!entry) { return cursorNotFoundStatus(nss, cursorId); } @@ -416,7 +416,7 @@ std::size_t ClusterCursorManager::reapZombieCursors(OperationContext* opCtx) { for (auto& cursorDescriptor : zombieCursorDescriptors) { StatusWith<std::unique_ptr<ClusterClientCursor>> zombieCursor = - detachCursor_inlock(cursorDescriptor.ns, cursorDescriptor.cursorId); + _detachCursor(lk, cursorDescriptor.ns, cursorDescriptor.cursorId); if (!zombieCursor.isOK()) { // Cursor in use, or has already been deleted. continue; @@ -536,8 +536,9 @@ boost::optional<NamespaceString> ClusterCursorManager::getNamespaceForCursorId( return it->second; } -ClusterCursorManager::CursorEntry* ClusterCursorManager::getEntry_inlock(const NamespaceString& nss, - CursorId cursorId) { +auto ClusterCursorManager::_getEntry(WithLock, NamespaceString const& nss, CursorId cursorId) + -> CursorEntry* { + auto nsToContainerIt = _namespaceToContainerMap.find(nss); if (nsToContainerIt == _namespaceToContainerMap.end()) { return nullptr; @@ -551,9 +552,10 @@ ClusterCursorManager::CursorEntry* ClusterCursorManager::getEntry_inlock(const N return &entryMapIt->second; } -StatusWith<std::unique_ptr<ClusterClientCursor>> ClusterCursorManager::detachCursor_inlock( - const NamespaceString& nss, CursorId cursorId) { - CursorEntry* entry = getEntry_inlock(nss, cursorId); +StatusWith<std::unique_ptr<ClusterClientCursor>> ClusterCursorManager::_detachCursor( + WithLock lk, NamespaceString const& nss, CursorId cursorId) { + + CursorEntry* entry = _getEntry(lk, nss, cursorId); if (!entry) { return cursorNotFoundStatus(nss, cursorId); } diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index d6eb8f47abf..6754624583d 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -39,6 +39,7 @@ #include "mongo/s/query/cluster_client_cursor.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/unordered_map.h" +#include "mongo/util/concurrency/with_lock.h" #include "mongo/util/time_support.h" namespace mongo { @@ -413,7 +414,7 @@ private: * * Not thread-safe. */ - CursorEntry* getEntry_inlock(const NamespaceString& nss, CursorId cursorId); + CursorEntry* _getEntry(WithLock, NamespaceString const& nss, CursorId cursorId); /** * De-registers the given cursor, and returns an owned pointer to the underlying @@ -424,8 +425,9 @@ private: * * Not thread-safe. */ - StatusWith<std::unique_ptr<ClusterClientCursor>> detachCursor_inlock(const NamespaceString& nss, - CursorId cursorId); + StatusWith<std::unique_ptr<ClusterClientCursor>> _detachCursor(WithLock, + NamespaceString const& nss, + CursorId cursorId); /** * CursorEntry is a moveable, non-copyable container for a single cursor. |