diff options
author | Nathan Myers <nathan.myers@10gen.com> | 2017-09-25 15:53:59 -0400 |
---|---|---|
committer | Nathan Myers <nathan.myers@10gen.com> | 2017-09-25 15:53:59 -0400 |
commit | d7aca6435e8ccc89005a97dc585dfbe429a17dec (patch) | |
tree | 8f42ebe1db5e80b71d5e97730747971acea7fd02 /src/mongo/s/query/async_results_merger.cpp | |
parent | 829c1d6afe8177433192e6af84bf4536c330caee (diff) | |
download | mongo-d7aca6435e8ccc89005a97dc585dfbe429a17dec.tar.gz |
SERVER-30838 Remove _inlock names in sharding subsystem
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 207 |
1 files changed, 101 insertions, 106 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index b867c7b43df..9d05cacfa57 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -42,7 +42,6 @@ #include "mongo/s/grid.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" -#include "mongo/util/scopeguard.h" namespace mongo { namespace { @@ -65,9 +64,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, remote.cursorResponse.getNSS(), remote.cursorResponse.getCursorId()); - // We don't check the return value of addBatchToBuffer here; if there was an error, + // We don't check the return value of _addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. - addBatchToBuffer(remoteIndex, remote.cursorResponse.getBatch()); + _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse.getBatch()); ++remoteIndex; } @@ -81,15 +80,15 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, AsyncResultsMerger::~AsyncResultsMerger() { stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(remotesExhausted_inlock() || _lifecycleState == kKillComplete); + invariant(_remotesExhausted(lk) || _lifecycleState == kKillComplete); } bool AsyncResultsMerger::remotesExhausted() { stdx::lock_guard<stdx::mutex> lk(_mutex); - return remotesExhausted_inlock(); + return _remotesExhausted(lk); } -bool AsyncResultsMerger::remotesExhausted_inlock() { +bool AsyncResultsMerger::_remotesExhausted(WithLock) { for (const auto& remote : _remotes) { if (!remote.exhausted()) { return false; @@ -113,7 +112,7 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { bool AsyncResultsMerger::ready() { stdx::lock_guard<stdx::mutex> lk(_mutex); - return ready_inlock(); + return _ready(lk); } void AsyncResultsMerger::detachFromOperationContext() { @@ -130,7 +129,7 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) { _opCtx = opCtx; } -bool AsyncResultsMerger::ready_inlock() { +bool AsyncResultsMerger::_ready(WithLock lk) { if (_lifecycleState != kAlive) { return true; } @@ -150,10 +149,10 @@ bool AsyncResultsMerger::ready_inlock() { } const bool hasSort = !_params->sort.isEmpty(); - return hasSort ? readySorted_inlock() : readyUnsorted_inlock(); + return hasSort ? _readySorted(lk) : _readyUnsorted(lk); } -bool AsyncResultsMerger::readySorted_inlock() { +bool AsyncResultsMerger::_readySorted(WithLock) { // Tailable cursors cannot have a sort. invariant(_params->tailableMode == TailableMode::kNormal); @@ -166,7 +165,7 @@ bool AsyncResultsMerger::readySorted_inlock() { return true; } -bool AsyncResultsMerger::readyUnsorted_inlock() { +bool AsyncResultsMerger::_readyUnsorted(WithLock) { bool allExhausted = true; for (const auto& remote : _remotes) { if (!remote.exhausted()) { @@ -183,7 +182,7 @@ bool AsyncResultsMerger::readyUnsorted_inlock() { StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { stdx::lock_guard<stdx::mutex> lk(_mutex); - dassert(ready_inlock()); + dassert(_ready(lk)); if (_lifecycleState != kAlive) { return Status(ErrorCodes::IllegalOperation, "AsyncResultsMerger killed"); } @@ -198,10 +197,10 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { } const bool hasSort = !_params->sort.isEmpty(); - return hasSort ? nextReadySorted() : nextReadyUnsorted(); + return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk); } -ClusterQueryResult AsyncResultsMerger::nextReadySorted() { +ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) { // Tailable cursors cannot have a sort. invariant(_params->tailableMode == TailableMode::kNormal); @@ -227,7 +226,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadySorted() { return front; } -ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() { +ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) { size_t remotesAttempted = 0; while (remotesAttempted < _remotes.size()) { // It is illegal to call this method if there is an error received from any shard. @@ -258,7 +257,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() { return {}; } -Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { +Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { auto& remote = _remotes[remoteIndex]; invariant(!remote.cbHandle.isValid()); @@ -283,10 +282,12 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { executor::RemoteCommandRequest request( remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _metadataObj, _opCtx); - auto callbackStatus = _executor->scheduleRemoteCommand( - request, - stdx::bind( - &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, remoteIndex)); + auto callbackStatus = + _executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) { + stdx::lock_guard<stdx::mutex> lk(this->_mutex); + this->_handleBatchResponse(lk, cbData, remoteIndex); + }); + if (!callbackStatus.isOK()) { return callbackStatus.getStatus(); } @@ -330,7 +331,7 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) { // If this remote is not exhausted and there is no outstanding request for it, schedule // work to retrieve the next batch. - auto nextBatchStatus = askForNextBatch_inlock(i); + auto nextBatchStatus = _askForNextBatch(lk, i); if (!nextBatchStatus.isOK()) { return nextBatchStatus; } @@ -348,12 +349,13 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() // _currentEvent with a new event, new results became available. In this case we have to signal // the new event right away to propagate the fact that the previous event had been signaled to // the new event. - signalCurrentEventIfReady_inlock(); + _signalCurrentEventIfReady(lk); return eventToReturn; } -StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj& responseObj, - const RemoteCursorData& remote) { +StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse( + const BSONObj& responseObj, const RemoteCursorData& remote) { + auto getMoreParseStatus = CursorResponse::parseFromBSON(responseObj); if (!getMoreParseStatus.isOK()) { return getMoreParseStatus.getStatus(); @@ -372,76 +374,81 @@ StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj return std::move(cursorResponse); } -void AsyncResultsMerger::handleBatchResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - auto& remote = _remotes[remoteIndex]; +void AsyncResultsMerger::_handleBatchResponse(WithLock lk, + CbData const& cbData, + size_t remoteIndex) { + // Got a response from remote, so indicate we are no longer waiting for one. + _remotes[remoteIndex].cbHandle = executor::TaskExecutor::CallbackHandle(); - // Clear the callback handle. This indicates that we are no longer waiting on a response from - // 'remote'. - remote.cbHandle = executor::TaskExecutor::CallbackHandle(); - - // If we're in the process of shutting down then there's no need to process the batch. + // On shutdown, there is no need to process the response. if (_lifecycleState != kAlive) { - invariant(_lifecycleState == kKillStarted); - - // Make sure to wake up anyone waiting on '_currentEvent' if we're shutting down. - signalCurrentEventIfReady_inlock(); - - // If we're killed and we're not waiting on any more batches to come back, then we are ready - // to kill the cursors on the remote hosts and clean up this cursor. Schedule the - // killCursors command and signal that this cursor is safe now safe to destroy. We have to - // promise not to touch any members of this class because 'this' could become invalid as - // soon as we signal the event. - if (!haveOutstandingBatchRequests_inlock()) { - // If the event handle is invalid, then the executor is in the middle of shutting down, - // and we can't schedule any more work for it to complete. - if (_killCursorsScheduledEvent.isValid()) { - scheduleKillCursors_inlock(_opCtx); - _executor->signalEvent(_killCursorsScheduledEvent); - } + _signalCurrentEventIfReady(lk); // First, wake up anyone waiting on '_currentEvent'. + _cleanUpKilledBatch(lk); + return; + } + try { + _processBatchResults(lk, cbData.response, remoteIndex); + } catch (DBException const& e) { + _remotes[remoteIndex].status = e.toStatus(); + } + _signalCurrentEventIfReady(lk); // Wake up anyone waiting on '_currentEvent'. +} - _lifecycleState = kKillComplete; +void AsyncResultsMerger::_cleanUpKilledBatch(WithLock lk) { + invariant(_lifecycleState == kKillStarted); + + // If we're killed and we're not waiting on any more batches to come back, then we are ready + // to kill the cursors on the remote hosts and clean up this cursor. Schedule the killCursors + // command and signal that this cursor is now safe to destroy. We must not touch this object + // again after dropping the lock, because 'this' could become invalid immediately. + if (!_haveOutstandingBatchRequests(lk)) { + // If the event handle is invalid, then the executor is in the middle of shutting down, + // and we can't schedule any more work for it to complete. + if (_killCursorsScheduledEvent.isValid()) { + _scheduleKillCursors(lk, _opCtx); + _executor->signalEvent(_killCursorsScheduledEvent); } - return; + _lifecycleState = kKillComplete; + } +} + +void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t remoteIndex) { + auto& remote = _remotes[remoteIndex]; + remote.status = std::move(status); + // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We + // remove the unreachable host entirely from consideration by marking it as exhausted. + if (_params->isAllowPartialResults) { + remote.status = Status::OK(); + + // Clear the results buffer and cursor id. + std::queue<ClusterQueryResult> emptyBuffer; + std::swap(remote.docBuffer, emptyBuffer); + remote.cursorId = 0; } +} - // Early return from this point on signal anyone waiting on an event, if ready() is true. - ScopeGuard signaller = MakeGuard(&AsyncResultsMerger::signalCurrentEventIfReady_inlock, this); - StatusWith<CursorResponse> cursorResponseStatus( - cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote) - : cbData.response.status); +void AsyncResultsMerger::_processBatchResults(WithLock lk, + CbResponse const& response, + size_t remoteIndex) { + auto& remote = _remotes[remoteIndex]; + if (!response.isOK()) { + _cleanUpFailedBatch(lk, response.status, remoteIndex); + return; + } + auto cursorResponseStatus = _parseCursorResponse(response.data, remote); if (!cursorResponseStatus.isOK()) { - if (cursorResponseStatus == ErrorCodes::ExceededTimeLimit && - _params->tailableMode != TailableMode::kNormal) { - // We timed out before hearing back from the shard, - } - remote.status = cursorResponseStatus.getStatus(); - // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We - // remove the unreachable host entirely from consideration by marking it as exhausted. - if (_params->isAllowPartialResults) { - remote.status = Status::OK(); - - // Clear the results buffer and cursor id. - std::queue<ClusterQueryResult> emptyBuffer; - std::swap(remote.docBuffer, emptyBuffer); - remote.cursorId = 0; - } - + _cleanUpFailedBatch(lk, cursorResponseStatus.getStatus(), remoteIndex); return; } - // Response successfully received. - - auto cursorResponse = std::move(cursorResponseStatus.getValue()); + CursorResponse cursorResponse = std::move(cursorResponseStatus.getValue()); // Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard. remote.cursorId = cursorResponse.getCursorId(); // Save the batch in the remote's buffer. - if (!addBatchToBuffer(remoteIndex, cursorResponse.getBatch())) { + if (!_addBatchToBuffer(lk, remoteIndex, cursorResponse.getBatch())) { return; } @@ -458,21 +465,16 @@ void AsyncResultsMerger::handleBatchResponse( // // We do not ask for the next batch if the cursor is tailable, as batches received from remote // tailable cursors should be passed through to the client without asking for more batches. - if (_params->tailableMode == TailableMode::kNormal && !remote.hasNext() && - !remote.exhausted()) { - remote.status = askForNextBatch_inlock(remoteIndex); - if (!remote.status.isOK()) { - return; - } + if (_params->tailableMode != TailableMode::kNormal || remote.hasNext() || remote.exhausted()) { + return; } - // ScopeGuard requires dismiss on success, but we want waiter to be signalled on success as - // well as failure. - signaller.Dismiss(); - signalCurrentEventIfReady_inlock(); + remote.status = _askForNextBatch(lk, remoteIndex); } -bool AsyncResultsMerger::addBatchToBuffer(size_t remoteIndex, const std::vector<BSONObj>& batch) { +bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, + size_t remoteIndex, + std::vector<BSONObj> const& batch) { auto& remote = _remotes[remoteIndex]; for (const auto& obj : batch) { // If there's a sort, we're expecting the remote node to have given us back a sort key. @@ -499,8 +501,8 @@ bool AsyncResultsMerger::addBatchToBuffer(size_t remoteIndex, const std::vector< return true; } -void AsyncResultsMerger::signalCurrentEventIfReady_inlock() { - if (ready_inlock() && _currentEvent.isValid()) { +void AsyncResultsMerger::_signalCurrentEventIfReady(WithLock lk) { + if (_ready(lk) && _currentEvent.isValid()) { // To prevent ourselves from signalling the event twice, we set '_currentEvent' as // invalid after signalling it. _executor->signalEvent(_currentEvent); @@ -508,7 +510,7 @@ void AsyncResultsMerger::signalCurrentEventIfReady_inlock() { } } -bool AsyncResultsMerger::haveOutstandingBatchRequests_inlock() { +bool AsyncResultsMerger::_haveOutstandingBatchRequests(WithLock) { for (const auto& remote : _remotes) { if (remote.cbHandle.isValid()) { return true; @@ -518,7 +520,7 @@ bool AsyncResultsMerger::haveOutstandingBatchRequests_inlock() { return false; } -void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* opCtx) { +void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx) { invariant(_lifecycleState == kKillStarted); invariant(_killCursorsScheduledEvent.isValid()); @@ -531,20 +533,13 @@ void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* opCtx) { executor::RemoteCommandRequest request( remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx); - _executor - ->scheduleRemoteCommand(request, - stdx::bind(&AsyncResultsMerger::handleKillCursorsResponse, - stdx::placeholders::_1)) - .status_with_transitional_ignore(); + // Send kill request; discard callback handle, if any, or failure report, if not. + Status s = _executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus(); + std::move(s).ignore(); } } } -void AsyncResultsMerger::handleKillCursorsResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { - // We just ignore any killCursors command responses. -} - executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_killCursorsScheduledEvent.isValid()) { @@ -559,7 +554,7 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o auto statusWithEvent = _executor->makeEvent(); if (ErrorCodes::isShutdownError(statusWithEvent.getStatus().code())) { // The underlying task executor is shutting down. - if (!haveOutstandingBatchRequests_inlock()) { + if (!_haveOutstandingBatchRequests(lk)) { _lifecycleState = kKillComplete; } return executor::TaskExecutor::EventHandle(); @@ -570,8 +565,8 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o // If we're not waiting for responses from remotes, we can schedule killCursors commands on the // remotes now. Otherwise, we have to wait until all responses are back, and then we can kill // the remote cursors. - if (!haveOutstandingBatchRequests_inlock()) { - scheduleKillCursors_inlock(opCtx); + if (!_haveOutstandingBatchRequests(lk)) { + _scheduleKillCursors(lk, opCtx); _lifecycleState = kKillComplete; _executor->signalEvent(_killCursorsScheduledEvent); } else { |