summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/async_results_merger.cpp
diff options
context:
space:
mode:
authorNathan Myers <nathan.myers@10gen.com>2017-09-25 15:53:59 -0400
committerNathan Myers <nathan.myers@10gen.com>2017-09-25 15:53:59 -0400
commitd7aca6435e8ccc89005a97dc585dfbe429a17dec (patch)
tree8f42ebe1db5e80b71d5e97730747971acea7fd02 /src/mongo/s/query/async_results_merger.cpp
parent829c1d6afe8177433192e6af84bf4536c330caee (diff)
downloadmongo-d7aca6435e8ccc89005a97dc585dfbe429a17dec.tar.gz
SERVER-30838 Remove _inlock names in sharding subsystem
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp207
1 files changed, 101 insertions, 106 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index b867c7b43df..9d05cacfa57 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -42,7 +42,6 @@
#include "mongo/s/grid.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
-#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
@@ -65,9 +64,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
remote.cursorResponse.getNSS(),
remote.cursorResponse.getCursorId());
- // We don't check the return value of addBatchToBuffer here; if there was an error,
+ // We don't check the return value of _addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
- addBatchToBuffer(remoteIndex, remote.cursorResponse.getBatch());
+ _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse.getBatch());
++remoteIndex;
}
@@ -81,15 +80,15 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
AsyncResultsMerger::~AsyncResultsMerger() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(remotesExhausted_inlock() || _lifecycleState == kKillComplete);
+ invariant(_remotesExhausted(lk) || _lifecycleState == kKillComplete);
}
bool AsyncResultsMerger::remotesExhausted() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return remotesExhausted_inlock();
+ return _remotesExhausted(lk);
}
-bool AsyncResultsMerger::remotesExhausted_inlock() {
+bool AsyncResultsMerger::_remotesExhausted(WithLock) {
for (const auto& remote : _remotes) {
if (!remote.exhausted()) {
return false;
@@ -113,7 +112,7 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
bool AsyncResultsMerger::ready() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return ready_inlock();
+ return _ready(lk);
}
void AsyncResultsMerger::detachFromOperationContext() {
@@ -130,7 +129,7 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) {
_opCtx = opCtx;
}
-bool AsyncResultsMerger::ready_inlock() {
+bool AsyncResultsMerger::_ready(WithLock lk) {
if (_lifecycleState != kAlive) {
return true;
}
@@ -150,10 +149,10 @@ bool AsyncResultsMerger::ready_inlock() {
}
const bool hasSort = !_params->sort.isEmpty();
- return hasSort ? readySorted_inlock() : readyUnsorted_inlock();
+ return hasSort ? _readySorted(lk) : _readyUnsorted(lk);
}
-bool AsyncResultsMerger::readySorted_inlock() {
+bool AsyncResultsMerger::_readySorted(WithLock) {
// Tailable cursors cannot have a sort.
invariant(_params->tailableMode == TailableMode::kNormal);
@@ -166,7 +165,7 @@ bool AsyncResultsMerger::readySorted_inlock() {
return true;
}
-bool AsyncResultsMerger::readyUnsorted_inlock() {
+bool AsyncResultsMerger::_readyUnsorted(WithLock) {
bool allExhausted = true;
for (const auto& remote : _remotes) {
if (!remote.exhausted()) {
@@ -183,7 +182,7 @@ bool AsyncResultsMerger::readyUnsorted_inlock() {
StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- dassert(ready_inlock());
+ dassert(_ready(lk));
if (_lifecycleState != kAlive) {
return Status(ErrorCodes::IllegalOperation, "AsyncResultsMerger killed");
}
@@ -198,10 +197,10 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
}
const bool hasSort = !_params->sort.isEmpty();
- return hasSort ? nextReadySorted() : nextReadyUnsorted();
+ return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
}
-ClusterQueryResult AsyncResultsMerger::nextReadySorted() {
+ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) {
// Tailable cursors cannot have a sort.
invariant(_params->tailableMode == TailableMode::kNormal);
@@ -227,7 +226,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadySorted() {
return front;
}
-ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() {
+ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) {
size_t remotesAttempted = 0;
while (remotesAttempted < _remotes.size()) {
// It is illegal to call this method if there is an error received from any shard.
@@ -258,7 +257,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() {
return {};
}
-Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
+Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
@@ -283,10 +282,12 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
executor::RemoteCommandRequest request(
remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _metadataObj, _opCtx);
- auto callbackStatus = _executor->scheduleRemoteCommand(
- request,
- stdx::bind(
- &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, remoteIndex));
+ auto callbackStatus =
+ _executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) {
+ stdx::lock_guard<stdx::mutex> lk(this->_mutex);
+ this->_handleBatchResponse(lk, cbData, remoteIndex);
+ });
+
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
}
@@ -330,7 +331,7 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) {
// If this remote is not exhausted and there is no outstanding request for it, schedule
// work to retrieve the next batch.
- auto nextBatchStatus = askForNextBatch_inlock(i);
+ auto nextBatchStatus = _askForNextBatch(lk, i);
if (!nextBatchStatus.isOK()) {
return nextBatchStatus;
}
@@ -348,12 +349,13 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
// _currentEvent with a new event, new results became available. In this case we have to signal
// the new event right away to propagate the fact that the previous event had been signaled to
// the new event.
- signalCurrentEventIfReady_inlock();
+ _signalCurrentEventIfReady(lk);
return eventToReturn;
}
-StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj& responseObj,
- const RemoteCursorData& remote) {
+StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse(
+ const BSONObj& responseObj, const RemoteCursorData& remote) {
+
auto getMoreParseStatus = CursorResponse::parseFromBSON(responseObj);
if (!getMoreParseStatus.isOK()) {
return getMoreParseStatus.getStatus();
@@ -372,76 +374,81 @@ StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj
return std::move(cursorResponse);
}
-void AsyncResultsMerger::handleBatchResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- auto& remote = _remotes[remoteIndex];
+void AsyncResultsMerger::_handleBatchResponse(WithLock lk,
+ CbData const& cbData,
+ size_t remoteIndex) {
+ // Got a response from remote, so indicate we are no longer waiting for one.
+ _remotes[remoteIndex].cbHandle = executor::TaskExecutor::CallbackHandle();
- // Clear the callback handle. This indicates that we are no longer waiting on a response from
- // 'remote'.
- remote.cbHandle = executor::TaskExecutor::CallbackHandle();
-
- // If we're in the process of shutting down then there's no need to process the batch.
+ // On shutdown, there is no need to process the response.
if (_lifecycleState != kAlive) {
- invariant(_lifecycleState == kKillStarted);
-
- // Make sure to wake up anyone waiting on '_currentEvent' if we're shutting down.
- signalCurrentEventIfReady_inlock();
-
- // If we're killed and we're not waiting on any more batches to come back, then we are ready
- // to kill the cursors on the remote hosts and clean up this cursor. Schedule the
- // killCursors command and signal that this cursor is safe now safe to destroy. We have to
- // promise not to touch any members of this class because 'this' could become invalid as
- // soon as we signal the event.
- if (!haveOutstandingBatchRequests_inlock()) {
- // If the event handle is invalid, then the executor is in the middle of shutting down,
- // and we can't schedule any more work for it to complete.
- if (_killCursorsScheduledEvent.isValid()) {
- scheduleKillCursors_inlock(_opCtx);
- _executor->signalEvent(_killCursorsScheduledEvent);
- }
+ _signalCurrentEventIfReady(lk); // First, wake up anyone waiting on '_currentEvent'.
+ _cleanUpKilledBatch(lk);
+ return;
+ }
+ try {
+ _processBatchResults(lk, cbData.response, remoteIndex);
+ } catch (DBException const& e) {
+ _remotes[remoteIndex].status = e.toStatus();
+ }
+ _signalCurrentEventIfReady(lk); // Wake up anyone waiting on '_currentEvent'.
+}
- _lifecycleState = kKillComplete;
+void AsyncResultsMerger::_cleanUpKilledBatch(WithLock lk) {
+ invariant(_lifecycleState == kKillStarted);
+
+ // If we're killed and we're not waiting on any more batches to come back, then we are ready
+ // to kill the cursors on the remote hosts and clean up this cursor. Schedule the killCursors
+ // command and signal that this cursor is now safe to destroy. We must not touch this object
+ // again after dropping the lock, because 'this' could become invalid immediately.
+ if (!_haveOutstandingBatchRequests(lk)) {
+ // If the event handle is invalid, then the executor is in the middle of shutting down,
+ // and we can't schedule any more work for it to complete.
+ if (_killCursorsScheduledEvent.isValid()) {
+ _scheduleKillCursors(lk, _opCtx);
+ _executor->signalEvent(_killCursorsScheduledEvent);
}
- return;
+ _lifecycleState = kKillComplete;
+ }
+}
+
+void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t remoteIndex) {
+ auto& remote = _remotes[remoteIndex];
+ remote.status = std::move(status);
+ // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
+ // remove the unreachable host entirely from consideration by marking it as exhausted.
+ if (_params->isAllowPartialResults) {
+ remote.status = Status::OK();
+
+ // Clear the results buffer and cursor id.
+ std::queue<ClusterQueryResult> emptyBuffer;
+ std::swap(remote.docBuffer, emptyBuffer);
+ remote.cursorId = 0;
}
+}
- // Early return from this point on signal anyone waiting on an event, if ready() is true.
- ScopeGuard signaller = MakeGuard(&AsyncResultsMerger::signalCurrentEventIfReady_inlock, this);
- StatusWith<CursorResponse> cursorResponseStatus(
- cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote)
- : cbData.response.status);
+void AsyncResultsMerger::_processBatchResults(WithLock lk,
+ CbResponse const& response,
+ size_t remoteIndex) {
+ auto& remote = _remotes[remoteIndex];
+ if (!response.isOK()) {
+ _cleanUpFailedBatch(lk, response.status, remoteIndex);
+ return;
+ }
+ auto cursorResponseStatus = _parseCursorResponse(response.data, remote);
if (!cursorResponseStatus.isOK()) {
- if (cursorResponseStatus == ErrorCodes::ExceededTimeLimit &&
- _params->tailableMode != TailableMode::kNormal) {
- // We timed out before hearing back from the shard,
- }
- remote.status = cursorResponseStatus.getStatus();
- // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
- // remove the unreachable host entirely from consideration by marking it as exhausted.
- if (_params->isAllowPartialResults) {
- remote.status = Status::OK();
-
- // Clear the results buffer and cursor id.
- std::queue<ClusterQueryResult> emptyBuffer;
- std::swap(remote.docBuffer, emptyBuffer);
- remote.cursorId = 0;
- }
-
+ _cleanUpFailedBatch(lk, cursorResponseStatus.getStatus(), remoteIndex);
return;
}
- // Response successfully received.
-
- auto cursorResponse = std::move(cursorResponseStatus.getValue());
+ CursorResponse cursorResponse = std::move(cursorResponseStatus.getValue());
// Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard.
remote.cursorId = cursorResponse.getCursorId();
// Save the batch in the remote's buffer.
- if (!addBatchToBuffer(remoteIndex, cursorResponse.getBatch())) {
+ if (!_addBatchToBuffer(lk, remoteIndex, cursorResponse.getBatch())) {
return;
}
@@ -458,21 +465,16 @@ void AsyncResultsMerger::handleBatchResponse(
//
// We do not ask for the next batch if the cursor is tailable, as batches received from remote
// tailable cursors should be passed through to the client without asking for more batches.
- if (_params->tailableMode == TailableMode::kNormal && !remote.hasNext() &&
- !remote.exhausted()) {
- remote.status = askForNextBatch_inlock(remoteIndex);
- if (!remote.status.isOK()) {
- return;
- }
+ if (_params->tailableMode != TailableMode::kNormal || remote.hasNext() || remote.exhausted()) {
+ return;
}
- // ScopeGuard requires dismiss on success, but we want waiter to be signalled on success as
- // well as failure.
- signaller.Dismiss();
- signalCurrentEventIfReady_inlock();
+ remote.status = _askForNextBatch(lk, remoteIndex);
}
-bool AsyncResultsMerger::addBatchToBuffer(size_t remoteIndex, const std::vector<BSONObj>& batch) {
+bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
+ size_t remoteIndex,
+ std::vector<BSONObj> const& batch) {
auto& remote = _remotes[remoteIndex];
for (const auto& obj : batch) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
@@ -499,8 +501,8 @@ bool AsyncResultsMerger::addBatchToBuffer(size_t remoteIndex, const std::vector<
return true;
}
-void AsyncResultsMerger::signalCurrentEventIfReady_inlock() {
- if (ready_inlock() && _currentEvent.isValid()) {
+void AsyncResultsMerger::_signalCurrentEventIfReady(WithLock lk) {
+ if (_ready(lk) && _currentEvent.isValid()) {
// To prevent ourselves from signalling the event twice, we set '_currentEvent' as
// invalid after signalling it.
_executor->signalEvent(_currentEvent);
@@ -508,7 +510,7 @@ void AsyncResultsMerger::signalCurrentEventIfReady_inlock() {
}
}
-bool AsyncResultsMerger::haveOutstandingBatchRequests_inlock() {
+bool AsyncResultsMerger::_haveOutstandingBatchRequests(WithLock) {
for (const auto& remote : _remotes) {
if (remote.cbHandle.isValid()) {
return true;
@@ -518,7 +520,7 @@ bool AsyncResultsMerger::haveOutstandingBatchRequests_inlock() {
return false;
}
-void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* opCtx) {
+void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx) {
invariant(_lifecycleState == kKillStarted);
invariant(_killCursorsScheduledEvent.isValid());
@@ -531,20 +533,13 @@ void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* opCtx) {
executor::RemoteCommandRequest request(
remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx);
- _executor
- ->scheduleRemoteCommand(request,
- stdx::bind(&AsyncResultsMerger::handleKillCursorsResponse,
- stdx::placeholders::_1))
- .status_with_transitional_ignore();
+ // Send kill request; discard callback handle, if any, or failure report, if not.
+ Status s = _executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus();
+ std::move(s).ignore();
}
}
}
-void AsyncResultsMerger::handleKillCursorsResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- // We just ignore any killCursors command responses.
-}
-
executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_killCursorsScheduledEvent.isValid()) {
@@ -559,7 +554,7 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o
auto statusWithEvent = _executor->makeEvent();
if (ErrorCodes::isShutdownError(statusWithEvent.getStatus().code())) {
// The underlying task executor is shutting down.
- if (!haveOutstandingBatchRequests_inlock()) {
+ if (!_haveOutstandingBatchRequests(lk)) {
_lifecycleState = kKillComplete;
}
return executor::TaskExecutor::EventHandle();
@@ -570,8 +565,8 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o
// If we're not waiting for responses from remotes, we can schedule killCursors commands on the
// remotes now. Otherwise, we have to wait until all responses are back, and then we can kill
// the remote cursors.
- if (!haveOutstandingBatchRequests_inlock()) {
- scheduleKillCursors_inlock(opCtx);
+ if (!_haveOutstandingBatchRequests(lk)) {
+ _scheduleKillCursors(lk, opCtx);
_lifecycleState = kKillComplete;
_executor->signalEvent(_killCursorsScheduledEvent);
} else {