summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/async_results_merger.cpp
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2015-07-31 11:54:13 -0400
committerDavid Storch <david.storch@10gen.com>2015-08-06 09:18:40 -0400
commit1d2a75a01acd26de9969b1e980e301401120ed10 (patch)
treefe89f19dd1b17fac1104db36109976e2d6cc41d0 /src/mongo/s/query/async_results_merger.cpp
parent3ac6afd27ae0beda8da6f999bf6867e474af3ce2 (diff)
downloadmongo-1d2a75a01acd26de9969b1e980e301401120ed10.tar.gz
SERVER-18767 make find command shard version aware
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp91
1 files changed, 61 insertions, 30 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index d063f598406..f13c4c65126 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -43,10 +43,9 @@
namespace mongo {
AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor,
- const ClusterClientCursorParams& params,
- const std::vector<HostAndPort>& remotes)
+ const ClusterClientCursorParams& params)
: _executor(executor), _params(params), _mergeQueue(MergingComparator(_remotes, _params.sort)) {
- for (const auto& remote : remotes) {
+ for (const auto& remote : params.remotes) {
_remotes.emplace_back(remote);
}
}
@@ -74,12 +73,19 @@ bool AsyncResultsMerger::ready_inlock() {
return true;
}
- // First check whether any of the remotes reported an error.
for (const auto& remote : _remotes) {
+ // First check whether any of the remotes reported an error.
if (!remote.status.isOK()) {
_status = remote.status;
return true;
}
+
+ // We don't return any results until we have received at least one response from each remote
+ // node. This is necessary for versioned commands: we have to ensure that we've properly
+ // established the shard version on each node before we can start returning results.
+ if (!remote.gotFirstResponse) {
+ return false;
+ }
}
const bool hasSort = !_params.sort.isEmpty();
@@ -171,6 +177,31 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadyUnsorted() {
return boost::none;
}
+Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
+ auto& remote = _remotes[remoteIndex];
+
+ invariant(!remote.cbHandle.isValid());
+
+ BSONObj cmdObj = remote.cursorId
+ ? GetMoreRequest(_params.nsString, *remote.cursorId, _params.batchSize, boost::none)
+ .toBSON()
+ : remote.cmdObj;
+
+ executor::RemoteCommandRequest request(
+ remote.hostAndPort, _params.nsString.db().toString(), cmdObj);
+
+ auto callbackStatus = _executor->scheduleRemoteCommand(
+ request,
+ stdx::bind(
+ &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, remoteIndex));
+ if (!callbackStatus.isOK()) {
+ return callbackStatus.getStatus();
+ }
+
+ remote.cbHandle = callbackStatus.getValue();
+ return Status::OK();
+}
+
StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -196,25 +227,12 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) {
// If we already have established a cursor with this remote, and there is no outstanding
- // request for which we have a valid callback handle, send a getMore with the
- // appropriate cursorId. Otherwise, send the cursor-establishing command.
- BSONObj cmdObj = remote.cursorId
- ? GetMoreRequest(_params.nsString, *remote.cursorId, _params.batchSize, boost::none)
- .toBSON()
- : _params.cmdObj;
-
- executor::RemoteCommandRequest request(
- remote.hostAndPort, _params.nsString.db().toString(), cmdObj);
-
- auto callbackStatus = _executor->scheduleRemoteCommand(
- request,
- stdx::bind(
- &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, i));
- if (!callbackStatus.isOK()) {
- return callbackStatus.getStatus();
+ // request for which we have a valid callback handle, then schedule work to retrieve the
+ // next batch.
+ auto nextBatchStatus = askForNextBatch_inlock(i);
+ if (!nextBatchStatus.isOK()) {
+ return nextBatchStatus;
}
-
- remote.cbHandle = callbackStatus.getValue();
}
}
@@ -265,13 +283,13 @@ void AsyncResultsMerger::handleBatchResponse(
ScopeGuard signaller = MakeGuard(&AsyncResultsMerger::signalCurrentEvent_inlock, this);
if (!cbData.response.isOK()) {
- _remotes[remoteIndex].status = cbData.response.getStatus();
+ remote.status = cbData.response.getStatus();
return;
}
auto getMoreParseStatus = GetMoreResponse::parseFromBSON(cbData.response.getValue().data);
if (!getMoreParseStatus.isOK()) {
- _remotes[remoteIndex].status = getMoreParseStatus.getStatus();
+ remote.status = getMoreParseStatus.getStatus();
return;
}
@@ -281,13 +299,15 @@ void AsyncResultsMerger::handleBatchResponse(
// established cursorid, we will fail the operation.
if (remote.cursorId && getMoreResponse.cursorId != 0 &&
*remote.cursorId != getMoreResponse.cursorId) {
- _remotes[remoteIndex].status =
- Status(ErrorCodes::BadValue,
- str::stream() << "Expected cursorid " << *remote.cursorId << " but received "
- << getMoreResponse.cursorId);
+ remote.status = Status(ErrorCodes::BadValue,
+ str::stream() << "Expected cursorid " << *remote.cursorId
+ << " but received " << getMoreResponse.cursorId);
return;
}
+ // Mark that we've gotten a valid response back from 'remote' at least once.
+ remote.gotFirstResponse = true;
+
remote.cursorId = getMoreResponse.cursorId;
for (const auto& obj : getMoreResponse.batch) {
@@ -300,6 +320,16 @@ void AsyncResultsMerger::handleBatchResponse(
_mergeQueue.push(remoteIndex);
}
+ // If even after receiving this batch we still don't have anything buffered (i.e. the batchSize
+ // was zero), then can schedule work to retrieve the next batch right away.
+ if (!remote.hasNext() && !remote.exhausted()) {
+ auto nextBatchStatus = askForNextBatch_inlock(remoteIndex);
+ if (!nextBatchStatus.isOK()) {
+ remote.status = nextBatchStatus;
+ return;
+ }
+ }
+
// ScopeGuard requires dismiss on success, but we want waiter to be signalled on success as
// well as failure.
signaller.Dismiss();
@@ -395,8 +425,9 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill() {
// AsyncResultsMerger::RemoteCursorData
//
-AsyncResultsMerger::RemoteCursorData::RemoteCursorData(const HostAndPort& host)
- : hostAndPort(host) {}
+AsyncResultsMerger::RemoteCursorData::RemoteCursorData(
+ const ClusterClientCursorParams::Remote& params)
+ : hostAndPort(params.hostAndPort), cmdObj(params.cmdObj) {}
bool AsyncResultsMerger::RemoteCursorData::hasNext() const {
return !docBuffer.empty();