diff options
author | David Storch <david.storch@10gen.com> | 2015-07-31 11:54:13 -0400 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2015-08-06 09:18:40 -0400 |
commit | 1d2a75a01acd26de9969b1e980e301401120ed10 (patch) | |
tree | fe89f19dd1b17fac1104db36109976e2d6cc41d0 /src/mongo/s/query/async_results_merger.cpp | |
parent | 3ac6afd27ae0beda8da6f999bf6867e474af3ce2 (diff) | |
download | mongo-1d2a75a01acd26de9969b1e980e301401120ed10.tar.gz |
SERVER-18767 make find command shard version aware
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 91 |
1 files changed, 61 insertions, 30 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index d063f598406..f13c4c65126 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -43,10 +43,9 @@ namespace mongo { AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor, - const ClusterClientCursorParams& params, - const std::vector<HostAndPort>& remotes) + const ClusterClientCursorParams& params) : _executor(executor), _params(params), _mergeQueue(MergingComparator(_remotes, _params.sort)) { - for (const auto& remote : remotes) { + for (const auto& remote : params.remotes) { _remotes.emplace_back(remote); } } @@ -74,12 +73,19 @@ bool AsyncResultsMerger::ready_inlock() { return true; } - // First check whether any of the remotes reported an error. for (const auto& remote : _remotes) { + // First check whether any of the remotes reported an error. if (!remote.status.isOK()) { _status = remote.status; return true; } + + // We don't return any results until we have received at least one response from each remote + // node. This is necessary for versioned commands: we have to ensure that we've properly + // established the shard version on each node before we can start returning results. + if (!remote.gotFirstResponse) { + return false; + } } const bool hasSort = !_params.sort.isEmpty(); @@ -171,6 +177,31 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadyUnsorted() { return boost::none; } +Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { + auto& remote = _remotes[remoteIndex]; + + invariant(!remote.cbHandle.isValid()); + + BSONObj cmdObj = remote.cursorId + ? GetMoreRequest(_params.nsString, *remote.cursorId, _params.batchSize, boost::none) + .toBSON() + : remote.cmdObj; + + executor::RemoteCommandRequest request( + remote.hostAndPort, _params.nsString.db().toString(), cmdObj); + + auto callbackStatus = _executor->scheduleRemoteCommand( + request, + stdx::bind( + &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, remoteIndex)); + if (!callbackStatus.isOK()) { + return callbackStatus.getStatus(); + } + + remote.cbHandle = callbackStatus.getValue(); + return Status::OK(); +} + StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -196,25 +227,12 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) { // If we already have established a cursor with this remote, and there is no outstanding - // request for which we have a valid callback handle, send a getMore with the - // appropriate cursorId. Otherwise, send the cursor-establishing command. - BSONObj cmdObj = remote.cursorId - ? GetMoreRequest(_params.nsString, *remote.cursorId, _params.batchSize, boost::none) - .toBSON() - : _params.cmdObj; - - executor::RemoteCommandRequest request( - remote.hostAndPort, _params.nsString.db().toString(), cmdObj); - - auto callbackStatus = _executor->scheduleRemoteCommand( - request, - stdx::bind( - &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, i)); - if (!callbackStatus.isOK()) { - return callbackStatus.getStatus(); + // request for which we have a valid callback handle, then schedule work to retrieve the + // next batch. + auto nextBatchStatus = askForNextBatch_inlock(i); + if (!nextBatchStatus.isOK()) { + return nextBatchStatus; } - - remote.cbHandle = callbackStatus.getValue(); } } @@ -265,13 +283,13 @@ void AsyncResultsMerger::handleBatchResponse( ScopeGuard signaller = MakeGuard(&AsyncResultsMerger::signalCurrentEvent_inlock, this); if (!cbData.response.isOK()) { - _remotes[remoteIndex].status = cbData.response.getStatus(); + remote.status = cbData.response.getStatus(); return; } auto getMoreParseStatus = GetMoreResponse::parseFromBSON(cbData.response.getValue().data); if (!getMoreParseStatus.isOK()) { - _remotes[remoteIndex].status = getMoreParseStatus.getStatus(); + remote.status = getMoreParseStatus.getStatus(); return; } @@ -281,13 +299,15 @@ void AsyncResultsMerger::handleBatchResponse( // established cursorid, we will fail the operation. if (remote.cursorId && getMoreResponse.cursorId != 0 && *remote.cursorId != getMoreResponse.cursorId) { - _remotes[remoteIndex].status = - Status(ErrorCodes::BadValue, - str::stream() << "Expected cursorid " << *remote.cursorId << " but received " - << getMoreResponse.cursorId); + remote.status = Status(ErrorCodes::BadValue, + str::stream() << "Expected cursorid " << *remote.cursorId + << " but received " << getMoreResponse.cursorId); return; } + // Mark that we've gotten a valid response back from 'remote' at least once. + remote.gotFirstResponse = true; + remote.cursorId = getMoreResponse.cursorId; for (const auto& obj : getMoreResponse.batch) { @@ -300,6 +320,16 @@ void AsyncResultsMerger::handleBatchResponse( _mergeQueue.push(remoteIndex); } + // If even after receiving this batch we still don't have anything buffered (i.e. the batchSize + // was zero), then can schedule work to retrieve the next batch right away. + if (!remote.hasNext() && !remote.exhausted()) { + auto nextBatchStatus = askForNextBatch_inlock(remoteIndex); + if (!nextBatchStatus.isOK()) { + remote.status = nextBatchStatus; + return; + } + } + // ScopeGuard requires dismiss on success, but we want waiter to be signalled on success as // well as failure. signaller.Dismiss(); @@ -395,8 +425,9 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill() { // AsyncResultsMerger::RemoteCursorData // -AsyncResultsMerger::RemoteCursorData::RemoteCursorData(const HostAndPort& host) - : hostAndPort(host) {} +AsyncResultsMerger::RemoteCursorData::RemoteCursorData( + const ClusterClientCursorParams::Remote& params) + : hostAndPort(params.hostAndPort), cmdObj(params.cmdObj) {} bool AsyncResultsMerger::RemoteCursorData::hasNext() const { return !docBuffer.empty(); |