summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/s/query/SConscript2
-rw-r--r--src/mongo/s/query/async_results_merger.cpp262
-rw-r--r--src/mongo/s/query/async_results_merger.h105
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp1048
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h5
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h74
-rw-r--r--src/mongo/s/query/cluster_find.cpp51
-rw-r--r--src/mongo/s/query/establish_cursors.cpp162
-rw-r--r--src/mongo/s/query/establish_cursors.h74
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp3
14 files changed, 895 insertions, 903 deletions
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 51b60590239..1e721da28e8 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -62,10 +62,12 @@ env.Library(
source=[
"async_results_merger.cpp",
"cluster_client_cursor_params.cpp",
+ "establish_cursors.cpp",
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/query/command_request_response",
"$BUILD_DIR/mongo/executor/task_executor_interface",
+ "$BUILD_DIR/mongo/s/async_requests_sender",
"$BUILD_DIR/mongo/s/client/sharding_client",
"$BUILD_DIR/mongo/s/coreshard",
],
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 614002978f1..bd3c67bf44a 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -58,21 +58,19 @@ AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor,
: _executor(executor),
_params(params),
_mergeQueue(MergingComparator(_remotes, _params->sort)) {
+ size_t remoteIndex = 0;
for (const auto& remote : _params->remotes) {
- if (remote.shardId) {
- invariant(remote.cmdObj);
- invariant(!remote.cursorId);
- invariant(!remote.hostAndPort);
- _remotes.emplace_back(*remote.shardId, *remote.cmdObj);
- } else {
- invariant(!remote.cmdObj);
- invariant(remote.cursorId);
- invariant(remote.hostAndPort);
- _remotes.emplace_back(*remote.hostAndPort, *remote.cursorId);
- }
+ _remotes.emplace_back(remote.hostAndPort, remote.cursorResponse.getCursorId());
+
+ // We don't check the return value of addBatchToBuffer here; if there was an error,
+ // it will be stored in the remote and the first call to ready() will return true.
+ addBatchToBuffer(remoteIndex, remote.cursorResponse.getBatch());
+ ++remoteIndex;
}
- // Initialize command metadata to handle the read preference.
+ // Initialize command metadata to handle the read preference. We do this in case the readPref
+ // is primaryOnly, in which case if the remote host for one of the cursors changes roles, the
+ // remote will return an error.
if (_params->readPreference) {
BSONObjBuilder metadataBuilder;
rpc::ServerSelectionMetadata metadata(
@@ -136,13 +134,6 @@ bool AsyncResultsMerger::ready_inlock() {
_status = remote.status;
return true;
}
-
- // We don't return any results until we have received at least one response from each remote
- // node. This is necessary for versioned commands: we have to ensure that we've properly
- // established the shard version on each node before we can start returning results.
- if (!remote.cursorId) {
- return false;
- }
}
const bool hasSort = !_params->sort.isEmpty();
@@ -262,34 +253,19 @@ Status AsyncResultsMerger::askForNextBatch_inlock(OperationContext* opCtx, size_
// request to fetch the remaining docs only. If the remote node has a plan with OR for top k and
// a full sort as is the case for the OP_QUERY find then this optimization will prevent
// switching to the full sort plan branch.
- BSONObj cmdObj;
-
- if (remote.cursorId) {
- auto adjustedBatchSize = _params->batchSize;
-
- if (_params->batchSize && *_params->batchSize > remote.fetchedCount) {
- adjustedBatchSize = *_params->batchSize - remote.fetchedCount;
- }
-
- cmdObj = GetMoreRequest(_params->nsString,
- *remote.cursorId,
- adjustedBatchSize,
- _awaitDataTimeout,
- boost::none,
- boost::none)
- .toBSON();
- } else {
- // Do the first time shard host resolution.
- invariant(_params->readPreference);
- Status resolveStatus = remote.resolveShardIdToHostAndPort(*_params->readPreference);
- if (!resolveStatus.isOK()) {
- return resolveStatus;
- }
-
- remote.fetchedCount = 0;
- cmdObj = *remote.initialCmdObj;
+ auto adjustedBatchSize = _params->batchSize;
+ if (_params->batchSize && *_params->batchSize > remote.fetchedCount) {
+ adjustedBatchSize = *_params->batchSize - remote.fetchedCount;
}
+ BSONObj cmdObj = GetMoreRequest(_params->nsString,
+ remote.cursorId,
+ adjustedBatchSize,
+ _awaitDataTimeout,
+ boost::none,
+ boost::none)
+ .toBSON();
+
executor::RemoteCommandRequest request(
remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _metadataObj, opCtx);
@@ -342,9 +318,8 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent(
}
if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) {
- // If we already have established a cursor with this remote, and there is no outstanding
- // request for which we have a valid callback handle, then schedule work to retrieve the
- // next batch.
+ // If this remote is not exhausted and there is no outstanding request for it, schedule
+ // work to retrieve the next batch.
auto nextBatchStatus = askForNextBatch_inlock(opCtx, i);
if (!nextBatchStatus.isOK()) {
return nextBatchStatus;
@@ -377,12 +352,11 @@ StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj
auto cursorResponse = std::move(getMoreParseStatus.getValue());
- // If we have a cursor established, and we get a non-zero cursor id that is not equal to the
- // established cursor id, we will fail the operation.
- if (remote.cursorId && cursorResponse.getCursorId() != 0 &&
- *remote.cursorId != cursorResponse.getCursorId()) {
+ // If we get a non-zero cursor id that is not equal to the established cursor id, we will fail
+ // the operation.
+ if (cursorResponse.getCursorId() != 0 && remote.cursorId != cursorResponse.getCursorId()) {
return Status(ErrorCodes::BadValue,
- str::stream() << "Expected cursorid " << *remote.cursorId << " but received "
+ str::stream() << "Expected cursorid " << remote.cursorId << " but received "
<< cursorResponse.getCursorId());
}
@@ -408,15 +382,6 @@ void AsyncResultsMerger::handleBatchResponse(
// Make sure to wake up anyone waiting on '_currentEvent' if we're shutting down.
signalCurrentEventIfReady_inlock();
- // Make a best effort to parse the response and retrieve the cursor id. We need the cursor
- // id in order to issue a killCursors command against it.
- if (cbData.response.isOK()) {
- auto cursorResponse = parseCursorResponse(cbData.response.data, remote);
- if (cursorResponse.isOK()) {
- remote.cursorId = cursorResponse.getValue().getCursorId();
- }
- }
-
// If we're killed and we're not waiting on any more batches to come back, then we are ready
// to kill the cursors on the remote hosts and clean up this cursor. Schedule the
// killCursors command and signal that this cursor is safe now safe to destroy. We have to
@@ -444,77 +409,14 @@ void AsyncResultsMerger::handleBatchResponse(
: cbData.response.status);
if (!cursorResponseStatus.isOK()) {
- // In the case a read is performed against a view, the shard primary can return an error
- // indicating that the underlying collection may be sharded. When this occurs the return
- // message will include an expanded view definition and collection namespace which we need
- // to store. This allows for a second attempt at the read directly against the underlying
- // collection.
- if (cursorResponseStatus.getStatus() ==
- ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) {
- auto& responseObj = cbData.response.data;
- if (!responseObj.hasField("resolvedView")) {
- remote.status = Status(ErrorCodes::InternalError,
- str::stream() << "Missing field 'resolvedView' in document: "
- << responseObj);
- return;
- }
-
- auto resolvedViewObj = responseObj.getObjectField("resolvedView");
- if (resolvedViewObj.isEmpty()) {
- remote.status = Status(ErrorCodes::InternalError,
- str::stream() << "Field 'resolvedView' must be an object: "
- << responseObj);
- return;
- }
-
- _params->viewDefinition = resolvedViewObj.getOwned();
- remote.status = cursorResponseStatus.getStatus();
- return;
- }
-
auto shard = remote.getShard();
if (!shard) {
remote.status = Status(cursorResponseStatus.getStatus().code(),
- str::stream() << "Could not find shard " << *remote.shardId
- << " containing host "
+ str::stream() << "Could not find shard containing host "
<< remote.getTargetHost().toString());
} else {
shard->updateReplSetMonitor(remote.getTargetHost(), cursorResponseStatus.getStatus());
-
- // If we can still retry the initial cursor establishment, reset the state so it can be
- // retried the next time nextEvent is called. Never retry getMores to avoid
- // accidentally skipping results.
- if (!remote.cursorId && remote.retryCount < kMaxNumFailedHostRetryAttempts &&
- shard->isRetriableError(cursorResponseStatus.getStatus().code(),
- Shard::RetryPolicy::kIdempotent)) {
- invariant(remote.shardId);
- invariant(remote.docBuffer.empty());
-
- LOG(1) << "Initial cursor establishment failed with retriable error and will be "
- "retried"
- << causedBy(redact(cursorResponseStatus.getStatus()));
-
- ++remote.retryCount;
- remote.status = Status::OK(); // Reset status so it can be retried.
-
- // Signal the merger thread to make it retry this remote again.
- if (_currentEvent.isValid()) {
- // To prevent ourselves from signalling the event twice,
- // we set '_currentEvent' as invalid after signalling it.
- _executor->signalEvent(_currentEvent);
- _currentEvent = executor::TaskExecutor::EventHandle();
- }
-
- return;
- } else {
- remote.status = cursorResponseStatus.getStatus();
- if (remote.status == ErrorCodes::CallbackCanceled) {
- // This callback should only be canceled as part of the shutdown sequence, so we
- // promote a canceled callback error to an error that will make more sense to
- // the client.
- remote.status = Status(ErrorCodes::ShutdownInProgress, "shutdown in progress");
- }
- }
+ remote.status = cursorResponseStatus.getStatus();
}
// Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
@@ -531,36 +433,22 @@ void AsyncResultsMerger::handleBatchResponse(
return;
}
- // Cursor id successfully established.
- auto cursorResponse = std::move(cursorResponseStatus.getValue());
- remote.cursorId = cursorResponse.getCursorId();
- remote.initialCmdObj = boost::none;
+ // Response successfully received.
- for (const auto& obj : cursorResponse.getBatch()) {
- // If there's a sort, we're expecting the remote node to give us back a sort key.
- if (!_params->sort.isEmpty() &&
- obj[ClusterClientCursorParams::kSortKeyField].type() != BSONType::Object) {
- remote.status = Status(ErrorCodes::InternalError,
- str::stream() << "Missing field '"
- << ClusterClientCursorParams::kSortKeyField
- << "' in document: "
- << obj);
- return;
- }
+ auto cursorResponse = std::move(cursorResponseStatus.getValue());
- ClusterQueryResult result(obj);
- remote.docBuffer.push(result);
- ++remote.fetchedCount;
- }
+ // Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard.
+ remote.cursorId = cursorResponse.getCursorId();
- // If we're doing a sorted merge, then we have to make sure to put this remote onto the
- // merge queue.
- if (!_params->sort.isEmpty() && !cursorResponse.getBatch().empty()) {
- _mergeQueue.push(remoteIndex);
+ // Save the batch in the remote's buffer.
+ if (!addBatchToBuffer(remoteIndex, cursorResponse.getBatch())) {
+ return;
}
// If the cursor is tailable and we just received an empty batch, the next return value should
// be boost::none in order to indicate the end of the batch.
+ // (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from
+ // one shard means the end of the overall batch).
if (_params->isTailable && !remote.hasNext()) {
_eofNext = true;
}
@@ -583,6 +471,33 @@ void AsyncResultsMerger::handleBatchResponse(
signalCurrentEventIfReady_inlock();
}
+bool AsyncResultsMerger::addBatchToBuffer(size_t remoteIndex, const std::vector<BSONObj>& batch) {
+ auto& remote = _remotes[remoteIndex];
+ for (const auto& obj : batch) {
+ // If there's a sort, we're expecting the remote node to have given us back a sort key.
+ if (!_params->sort.isEmpty() &&
+ obj[ClusterClientCursorParams::kSortKeyField].type() != BSONType::Object) {
+ remote.status = Status(ErrorCodes::InternalError,
+ str::stream() << "Missing field '"
+ << ClusterClientCursorParams::kSortKeyField
+ << "' in document: "
+ << obj);
+ return false;
+ }
+
+ ClusterQueryResult result(obj);
+ remote.docBuffer.push(result);
+ ++remote.fetchedCount;
+ }
+
+ // If we're doing a sorted merge, then we have to make sure to put this remote onto the
+ // merge queue.
+ if (!_params->sort.isEmpty() && !batch.empty()) {
+ _mergeQueue.push(remoteIndex);
+ }
+ return true;
+}
+
void AsyncResultsMerger::signalCurrentEventIfReady_inlock() {
if (ready_inlock() && _currentEvent.isValid()) {
// To prevent ourselves from signalling the event twice, we set '_currentEvent' as
@@ -610,7 +525,7 @@ void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* opCtx) {
invariant(!remote.cbHandle.isValid());
if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) {
- BSONObj cmdObj = KillCursorsRequest(_params->nsString, {*remote.cursorId}).toBSON();
+ BSONObj cmdObj = KillCursorsRequest(_params->nsString, {remote.cursorId}).toBSON();
executor::RemoteCommandRequest request(
remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx);
@@ -656,6 +571,12 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o
scheduleKillCursors_inlock(opCtx);
_lifecycleState = kKillComplete;
_executor->signalEvent(_killCursorsScheduledEvent);
+ } else {
+ for (const auto& remote : _remotes) {
+ if (remote.cbHandle.isValid()) {
+ _executor->cancel(remote.cbHandle);
+ }
+ }
}
return _killCursorsScheduledEvent;
@@ -665,16 +586,12 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o
// AsyncResultsMerger::RemoteCursorData
//
-AsyncResultsMerger::RemoteCursorData::RemoteCursorData(ShardId shardId, BSONObj cmdObj)
- : shardId(std::move(shardId)), initialCmdObj(std::move(cmdObj)) {}
-
AsyncResultsMerger::RemoteCursorData::RemoteCursorData(HostAndPort hostAndPort,
CursorId establishedCursorId)
- : cursorId(establishedCursorId), _shardHostAndPort(std::move(hostAndPort)) {}
+ : cursorId(establishedCursorId), shardHostAndPort(std::move(hostAndPort)) {}
const HostAndPort& AsyncResultsMerger::RemoteCursorData::getTargetHost() const {
- invariant(_shardHostAndPort);
- return *_shardHostAndPort;
+ return shardHostAndPort;
}
bool AsyncResultsMerger::RemoteCursorData::hasNext() const {
@@ -682,38 +599,11 @@ bool AsyncResultsMerger::RemoteCursorData::hasNext() const {
}
bool AsyncResultsMerger::RemoteCursorData::exhausted() const {
- return cursorId && (*cursorId == 0);
-}
-
-Status AsyncResultsMerger::RemoteCursorData::resolveShardIdToHostAndPort(
- const ReadPreferenceSetting& readPref) {
- invariant(shardId);
- invariant(!cursorId);
-
- const auto shard = getShard();
- if (!shard) {
- return Status(ErrorCodes::ShardNotFound,
- str::stream() << "Could not find shard " << *shardId);
- }
-
- // TODO: Pass down an OperationContext* to use here.
- auto findHostStatus = shard->getTargeter()->findHostWithMaxWait(readPref, Seconds{20});
- if (!findHostStatus.isOK()) {
- return findHostStatus.getStatus();
- }
-
- _shardHostAndPort = std::move(findHostStatus.getValue());
-
- return Status::OK();
+ return cursorId == 0;
}
std::shared_ptr<Shard> AsyncResultsMerger::RemoteCursorData::getShard() {
- invariant(shardId || _shardHostAndPort);
- if (shardId) {
- return grid.shardRegistry()->getShardNoReload(*shardId);
- } else {
- return grid.shardRegistry()->getShardNoReload(_shardHostAndPort->toString());
- }
+ return grid.shardRegistry()->getShardNoReload(shardHostAndPort.toString());
}
//
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index e6766a4faa3..6b30730392f 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -48,11 +48,12 @@ namespace mongo {
class CursorResponse;
/**
- * AsyncResultsMerger is used to generate results from cursor-generating commands on one or more
- * remote hosts. A cursor-generating command (e.g. the find command) is one that establishes a
- * ClientCursor and a matching cursor id on the remote host. In order to retrieve all command
- * results, getMores must be issued against each of the remote cursors until they are exhausted. The
- * results from the remote nodes are merged to present either a single sorted or unsorted stream.
+ * Given a set of cursorIds across one or more shards, the AsyncResultsMerger calls getMore on the
+ * cursors to present a single sorted or unsorted stream of documents.
+ *
+ * (A cursor-generating command (e.g. the find command) is one that establishes a ClientCursor and a
+ * matching cursorId on the remote host. In order to retrieve all document results, getMores must be
+ * issued against each of the remote cursors until they are exhausted).
*
* The ARM offers a non-blocking interface: if no results are immediately available on this host for
* retrieval, calling nextEvent() schedules work on the remote hosts in order to generate further
@@ -74,16 +75,20 @@ class AsyncResultsMerger {
public:
/**
- * Constructs a new AsyncResultsMerger. The TaskExecutor* must remain valid for the lifetime of
- * the ARM.
+ * Takes ownership of the cursors from ClusterClientCursorParams by storing their cursorIds and
+ * the hosts on which they exist in _remotes.
+ *
+ * Additionally copies each remote's first batch of results, if one exists, into that remote's
+ * docBuffer. If a sort is specified in the ClusterClientCursorParams, places the remotes with
+ * buffered results onto _mergeQueue.
+ *
+ * The TaskExecutor* must remain valid for the lifetime of the ARM.
*/
AsyncResultsMerger(executor::TaskExecutor* executor, ClusterClientCursorParams* params);
/**
- * In order to be destroyed, either
- * --the cursor must have been kill()'ed and the event return from kill() must have been
- * signaled, or
- * --all cursors must have been exhausted.
+ * In order to be destroyed, either the ARM must have been kill()'ed or all cursors must have
+ * been exhausted. This is so that any unexhausted cursors are cleaned up by the ARM.
*/
virtual ~AsyncResultsMerger();
@@ -148,25 +153,22 @@ public:
* function, that has not yet been signaled. If there is an outstanding unsignaled event,
* returns an error.
*
- * Conditions when event can be signaled:
- * - Finished collecting results from all remotes.
- * - One of the host failed with a retriable error. In this case, if ready() is false, then
- * the caller should call nextEvent() to retry the request on the hosts that errored. If
- * ready() is true, then either the error was not retriable or it has exhausted max retries.
+ * If there is a sort, the event is signaled when there are buffered results for all
+ * non-exhausted remotes.
+ * If there is no sort, the event is signaled when some remote has a buffered result.
*/
StatusWith<executor::TaskExecutor::EventHandle> nextEvent(OperationContext* opCtx);
/**
- * Starts shutting down this ARM. Returns a handle to an event which is signaled when this
- * cursor is safe to destroy.
+ * Starts shutting down this ARM by canceling all pending requests. Returns a handle to an event
+ * that is signaled when this ARM is safe to destroy.
+ * If there are no pending requests, schedules killCursors and signals the event immediately.
+ * Otherwise, the last callback that runs after kill() is called schedules killCursors and
+ * signals the event.
*
* Returns an invalid handle if the underlying task executor is shutting down. In this case,
* killing is considered complete and the ARM may be destroyed immediately.
*
- * When the underlying task executor is *not* shutting down, an ARM can only be destroyed if
- * either 1) all its results have been exhausted or 2) the kill event returned by this method
- * has been signaled.
- *
* May be called multiple times (idempotent).
*/
executor::TaskExecutor::EventHandle kill(OperationContext* opCtx);
@@ -178,17 +180,6 @@ private:
* reported from the remote.
*/
struct RemoteCursorData {
- /**
- * Creates a new uninitialized remote cursor state, which will have to send a command in
- * order to establish its cursor id. Must only be used if the remote cursor ids are not yet
- * known.
- */
- RemoteCursorData(ShardId shardId, BSONObj cmdObj);
-
- /**
- * Instantiates a new initialized remote cursor, which has an established cursor id. It may
- * only be used for getMore operations.
- */
RemoteCursorData(HostAndPort hostAndPort, CursorId establishedCursorId);
/**
@@ -208,50 +199,31 @@ private:
bool exhausted() const;
/**
- * Given the shard id with which the cursor was initialized and a read preference, selects
- * a host on which the cursor should be created.
- *
- * May not be called once a cursor has already been established.
- */
- Status resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref);
-
- /**
* Returns the Shard object associated with this remote cursor.
*/
std::shared_ptr<Shard> getShard();
- // ShardId on which a cursor will be created.
- // TODO: This should always be set.
- const boost::optional<ShardId> shardId;
-
- // The command object for sending to the remote to establish the cursor. If a remote cursor
- // has not been established yet, this member will be set to a valid command object. If a
- // remote cursor has already been established, this member will be unset.
- boost::optional<BSONObj> initialCmdObj;
+ // The cursor id for the remote cursor. If a remote cursor is not yet exhausted, this member
+ // will be set to a valid non-zero cursor id. If a remote cursor is now exhausted, this
+ // member will be set to zero.
+ CursorId cursorId;
- // The cursor id for the remote cursor. If a remote cursor has not been established yet,
- // this member will be unset. If a remote cursor has been established and is not yet
- // exhausted, this member will be set to a valid non-zero cursor id. If a remote cursor was
- // established but is now exhausted, this member will be set to zero.
- boost::optional<CursorId> cursorId;
+ // The exact host in the shard on which the cursor resides.
+ HostAndPort shardHostAndPort;
+ // The buffer of results that have been retrieved but not yet returned to the caller.
std::queue<ClusterQueryResult> docBuffer;
+
+ // Is valid if there is currently a pending request to this remote.
executor::TaskExecutor::CallbackHandle cbHandle;
- Status status = Status::OK();
- // Counts how many times we retried the initial cursor establishment command. It is used to
- // make a decision based on the error type and the retry count about whether we are allowed
- // to retry sending the request to another host from this shard.
- int retryCount = 0;
+ // Set to an error status if there is an error retrieving a response from this remote or if
+ // the command result contained an error.
+ Status status = Status::OK();
// Count of fetched docs during ARM processing of the current batch. Used to reduce the
// batchSize in getMore when mongod returned less docs than the requested batchSize.
long long fetchedCount = 0;
-
- private:
- // For a cursor, which has shard id associated contains the exact host on which the remote
- // cursor resides.
- boost::optional<HostAndPort> _shardHostAndPort;
};
class MergingComparator {
@@ -324,6 +296,11 @@ private:
void handleBatchResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
OperationContext* opCtx,
size_t remoteIndex);
+ /**
+ * Adds the batch of results to the RemoteCursorData. Returns false if there was an error
+ * parsing the batch.
+ */
+ bool addBatchToBuffer(size_t remoteIndex, const std::vector<BSONObj>& batch);
/**
* If there is a valid unsignaled event that has been requested via nextReady() and there are
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 0dc06aaa5bd..2fd5513d67c 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -105,47 +105,32 @@ public:
protected:
/**
- * Given a find command specification, 'findCmd', and a list of remote host:port pairs,
- * constructs the appropriate ARM.
+ * Constructs an ARM with the given vector of existing cursors.
*
- * If 'batchSize' is set (i.e. not equal to boost::none), this batchSize is used for each
- * getMore. If 'findCmd' has a batchSize, this is used just for the initial find operation.
+ * If 'findCmd' is not set, the default ClusterClientCursorParams are used.
+ * Otherwise, the 'findCmd' is used to construct the ClusterClientCursorParams.
+ *
+ * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the
+ * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.'
*/
- void makeCursorFromFindCmd(
- const BSONObj& findCmd,
- const std::vector<ShardId>& shardIds,
+ void makeCursorFromExistingCursors(
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors,
+ boost::optional<BSONObj> findCmd = boost::none,
boost::optional<long long> getMoreBatchSize = boost::none,
ReadPreferenceSetting readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly)) {
- const bool isExplain = true;
- const auto qr =
- unittest::assertGet(QueryRequest::makeFromFindCommand(_nss, findCmd, isExplain));
-
_params = stdx::make_unique<ClusterClientCursorParams>(_nss, UserNameIterator(), readPref);
- _params->sort = qr->getSort();
- _params->limit = qr->getLimit();
- _params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize();
- _params->skip = qr->getSkip();
- _params->isTailable = qr->isTailable();
- _params->isAwaitData = qr->isAwaitData();
- _params->isAllowPartialResults = qr->isAllowPartialResults();
-
- for (const auto& shardId : shardIds) {
- _params->remotes.emplace_back(shardId, findCmd);
- }
-
- arm = stdx::make_unique<AsyncResultsMerger>(executor(), _params.get());
- }
-
- /**
- * Given a vector of (HostAndPort, CursorIds) representing a set of existing cursors, constructs
- * the appropriate ARM. The default CCC parameters are used.
- */
- void makeCursorFromExistingCursors(
- const std::vector<std::pair<HostAndPort, CursorId>>& remotes) {
- _params = stdx::make_unique<ClusterClientCursorParams>(_nss, UserNameIterator());
-
- for (const auto& hostIdPair : remotes) {
- _params->remotes.emplace_back(hostIdPair.first, hostIdPair.second);
+ _params->remotes = std::move(remoteCursors);
+
+ if (findCmd) {
+ const auto qr = unittest::assertGet(
+ QueryRequest::makeFromFindCommand(_nss, *findCmd, false /* isExplain */));
+ _params->sort = qr->getSort();
+ _params->limit = qr->getLimit();
+ _params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize();
+ _params->skip = qr->getSkip();
+ _params->isTailable = qr->isTailable();
+ _params->isAwaitData = qr->isAwaitData();
+ _params->isAllowPartialResults = qr->isAllowPartialResults();
}
arm = stdx::make_unique<AsyncResultsMerger>(executor(), _params.get());
@@ -219,6 +204,13 @@ protected:
net->exitNetwork();
}
+ void runReadyCallbacks() {
+ executor::NetworkInterfaceMock* net = network();
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ }
+
void blackHoleNextRequest() {
executor::NetworkInterfaceMock* net = network();
net->enterNetwork();
@@ -233,263 +225,338 @@ protected:
std::unique_ptr<AsyncResultsMerger> arm;
};
-TEST_F(AsyncResultsMergerTest, ClusterFind) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
- ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ // Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
ASSERT_FALSE(arm->remotesExhausted());
- // First shard responds.
- std::vector<CursorResponse> responses;
- std::vector<BSONObj> batch1 = {
- fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- responses.emplace_back(_nss, CursorId(0), batch1);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ // Schedule requests.
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- // Can't return any results until we have a response from all three shards.
+ // Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
ASSERT_FALSE(arm->remotesExhausted());
- // Second two shards respond.
- responses.clear();
- std::vector<BSONObj> batch2 = {fromjson("{_id: 4}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
- std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(_nss, CursorId(0), batch3);
+ // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor()->waitForEvent(readyEvent);
+ // Now that the responses have been delivered, ARM is ready to return results.
+ ASSERT_TRUE(arm->ready());
+ // Because the response contained a cursorId of 0, ARM marked the remote as exhausted.
ASSERT_TRUE(arm->remotesExhausted());
- ASSERT_TRUE(arm->ready());
+
+ // ARM returns the correct results.
+ executor()->waitForEvent(readyEvent);
ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
+
+ // After returning all the buffered results, ARM returns EOF immediately because the cursor was
+ // exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
-TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
- BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ // Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+
+ // Schedule requests.
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+
+ // Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+ // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
- std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(10), batch1);
- std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(_nss, CursorId(11), batch2);
- std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(_nss, CursorId(12), batch3);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor()->waitForEvent(readyEvent);
+ std::vector<BSONObj> batch = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")};
+ responses.emplace_back(_nss, CursorId(0), batch);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
- ASSERT_FALSE(arm->remotesExhausted());
+ // Now that the responses have been delivered, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
+
+ // Because the response contained a cursorId of 0, ARM marked the remote as exhausted.
+ ASSERT_TRUE(arm->remotesExhausted());
+
+ // ARM returns all results in order.
+ executor()->waitForEvent(readyEvent);
+ ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 5}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 6}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
+
+ // After returning all the buffered results, ARM returns EOF immediately because the cursor was
+ // exhausted.
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
+}
+TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
+
+ // Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ ASSERT_FALSE(arm->remotesExhausted());
+
+ // Schedule requests.
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+
+ // Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
ASSERT_FALSE(arm->remotesExhausted());
- responses.clear();
- std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")};
- responses.emplace_back(_nss, CursorId(10), batch4);
- std::vector<BSONObj> batch5 = {fromjson("{_id: 9}")};
- responses.emplace_back(_nss, CursorId(0), batch5);
- std::vector<BSONObj> batch6 = {fromjson("{_id: 10}")};
- responses.emplace_back(_nss, CursorId(0), batch6);
+ // First shard responds; the handleBatchResponse callback is run and ARM's remote gets updated.
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch1 = {
+ fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
+ responses.emplace_back(_nss, CursorId(0), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor()->waitForEvent(readyEvent);
- ASSERT_FALSE(arm->remotesExhausted());
+ // ARM is ready to return first result.
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 10}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady()).getResult());
+
+ // ARM is not exhausted, because second shard has yet to respond.
+ ASSERT_FALSE(arm->remotesExhausted());
+
+ // ARM returns results from first shard immediately.
+ executor()->waitForEvent(readyEvent);
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
+ // There are no further buffered results, so ARM is not ready.
ASSERT_FALSE(arm->ready());
+
+ // Make next event to be signaled.
readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
- ASSERT_FALSE(arm->remotesExhausted());
+ // Second shard responds; the handleBatchResponse callback is run and ARM's remote gets updated.
responses.clear();
- std::vector<BSONObj> batch7 = {fromjson("{_id: 11}")};
- responses.emplace_back(_nss, CursorId(0), batch7);
+ std::vector<BSONObj> batch2 = {
+ fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
+ responses.emplace_back(_nss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor()->waitForEvent(readyEvent);
+ // ARM is ready to return remaining results.
+ ASSERT_TRUE(arm->ready());
ASSERT_TRUE(arm->remotesExhausted());
+
+ // ARM returns results from second shard immediately.
+ executor()->waitForEvent(readyEvent);
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 11}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_TRUE(arm->ready());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
+
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
-TEST_F(AsyncResultsMergerTest, ClusterFindSorted) {
- BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}, batchSize: 2}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ // Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+
+ // Schedule requests.
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+
+ // Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+ // First shard responds; the handleBatchResponse callback is run and ARM's remote gets updated.
std::vector<CursorResponse> responses;
- std::vector<BSONObj> batch1 = {fromjson("{_id: 5, $sortKey: {'': 5}}"),
- fromjson("{_id: 6, $sortKey: {'': 6}}")};
+ std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"),
+ fromjson("{$sortKey: {'': 6}}")};
responses.emplace_back(_nss, CursorId(0), batch1);
- std::vector<BSONObj> batch2 = {fromjson("{_id: 3, $sortKey: {'': 3}}"),
- fromjson("{_id: 9, $sortKey: {'': 9}}")};
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+
+ // ARM is not ready to return results until receiving responses from all remotes.
+ ASSERT_FALSE(arm->ready());
+
+ // ARM is not exhausted, because second shard has yet to respond.
+ ASSERT_FALSE(arm->remotesExhausted());
+
+ // Second shard responds; the handleBatchResponse callback is run and ARM's remote gets updated.
+ responses.clear();
+ std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"),
+ fromjson("{$sortKey: {'': 9}}")};
responses.emplace_back(_nss, CursorId(0), batch2);
- std::vector<BSONObj> batch3 = {fromjson("{_id: 4, $sortKey: {'': 4}}"),
- fromjson("{_id: 8, $sortKey: {'': 8}}")};
- responses.emplace_back(_nss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor()->waitForEvent(readyEvent);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+ // Now that all remotes have responded, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 3, $sortKey: {'': 3}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 4, $sortKey: {'': 4}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 5, $sortKey: {'': 5}}"),
+ ASSERT_TRUE(arm->remotesExhausted());
+
+ // ARM returns all results in sorted order.
+ executor()->waitForEvent(readyEvent);
+ ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 3}}"),
*unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 6, $sortKey: {'': 6}}"),
+ ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 5}}"),
*unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 8, $sortKey: {'': 8}}"),
+ ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 6}}"),
*unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 9, $sortKey: {'': 9}}"),
+ ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 9}}"),
*unittest::assertGet(arm->nextReady()).getResult());
+
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
-TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
- BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}, batchSize: 2}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
+ // Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+
+ // Schedule requests.
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
+ // First shard responds; the handleBatchResponse callback is run and ARM's remote gets updated.
std::vector<CursorResponse> responses;
- std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"),
- fromjson("{$sortKey: {'': 6}}")};
- responses.emplace_back(_nss, CursorId(1), batch1);
- std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"),
- fromjson("{$sortKey: {'': 4}}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
- std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 7}}"),
- fromjson("{$sortKey: {'': 8}}")};
- responses.emplace_back(_nss, CursorId(2), batch3);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor()->waitForEvent(readyEvent);
+ std::vector<BSONObj> batch1 = {
+ fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
+ responses.emplace_back(_nss, CursorId(5), batch1);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+ // ARM is ready to return first result.
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 3}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 4}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
+
+ // ARM is not exhausted, because second shard has yet to respond and first shard's response did
+ // not contain cursorId=0.
+ ASSERT_FALSE(arm->remotesExhausted());
+
+ // ARM returns results from first shard immediately.
+ executor()->waitForEvent(readyEvent);
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 5}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 6}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
+ // There are no further buffered results, so ARM is not ready.
ASSERT_FALSE(arm->ready());
+
+ // Make next event to be signaled.
readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
+ // Second shard responds; the handleBatchResponse callback is run and ARM's remote gets updated.
responses.clear();
- std::vector<BSONObj> batch4 = {fromjson("{$sortKey: {'': 7}}"),
- fromjson("{$sortKey: {'': 10}}")};
- responses.emplace_back(_nss, CursorId(0), batch4);
+ std::vector<BSONObj> batch2 = {
+ fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
+ responses.emplace_back(_nss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor()->waitForEvent(readyEvent);
+ // ARM is ready to return second shard's results.
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 7}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
+
+ // ARM is not exhausted, because first shard's response did not contain cursorId=0.
+ ASSERT_FALSE(arm->remotesExhausted());
+
+ // ARM returns results from second shard immediately.
+ executor()->waitForEvent(readyEvent);
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 7}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 8}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
+ // ARM is not ready to return results until further results are obtained from first shard.
ASSERT_FALSE(arm->ready());
+
+ // Make next event to be signaled.
readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
+ // First shard returns remainder of results.
responses.clear();
- std::vector<BSONObj> batch5 = {fromjson("{$sortKey: {'': 9}}"),
- fromjson("{$sortKey: {'': 10}}")};
- responses.emplace_back(_nss, CursorId(0), batch5);
+ std::vector<BSONObj> batch3 = {
+ fromjson("{_id: 7}"), fromjson("{_id: 8}"), fromjson("{_id: 9}")};
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor()->waitForEvent(readyEvent);
+ // ARM is ready to return remaining results.
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 9}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_TRUE(arm->remotesExhausted());
+
+ // ARM returns remaining results immediately.
+ executor()->waitForEvent(readyEvent);
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 10}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 10}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult());
+
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
-TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) {
- BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}, batchSize: 2}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}");
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 7, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
+ // Schedule requests.
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
+ // Deliver responses.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5, '': 9}}"),
fromjson("{$sortKey: {'': 4, '': 20}}")};
@@ -500,10 +567,13 @@ TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) {
std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"),
fromjson("{$sortKey: {'': 5, '': 9}}")};
responses.emplace_back(_nss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
+ // ARM returns all results in sorted order.
ASSERT_TRUE(arm->ready());
+ ASSERT_TRUE(arm->remotesExhausted());
ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 10, '': 11}}"),
*unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
@@ -521,13 +591,18 @@ TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) {
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"),
*unittest::assertGet(arm->nextReady()).getResult());
+
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
-TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) {
- BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}, batchSize: 2}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}");
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -537,7 +612,8 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")};
responses.emplace_back(_nss, CursorId(1), batch1);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -550,133 +626,126 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) {
executor()->waitForEvent(killEvent);
}
+TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
+ std::vector<BSONObj> firstBatch = {
+ fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch)));
+ makeCursorFromExistingCursors(std::move(cursors));
+
+ // Because there was firstBatch, ARM is immediately ready to return results.
+ ASSERT_TRUE(arm->ready());
+
+ // Because the cursorId was not zero, ARM is not exhausted.
+ ASSERT_FALSE(arm->remotesExhausted());
-TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
- // Initial batchSize sent with the find command is zero; batchSize sent with each getMore
- // command is one.
- BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 0}");
- const long long getMoreBatchSize = 1LL;
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]}, getMoreBatchSize);
+ // ARM returns the correct results.
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_TRUE(arm->ready());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_TRUE(arm->ready());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
+ // Now that the firstBatch results have been returned, ARM must wait for further results.
ASSERT_FALSE(arm->ready());
+
+ // Schedule requests.
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+
+ // Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
- // Both shards give back empty responses. Second shard doesn't have any results so it
- // sends back a cursor id of zero.
+ // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
- responses.emplace_back(_nss, CursorId(1), std::vector<BSONObj>());
- responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>());
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
-
- // In handling the responses from the first shard, the ARM should have already asked
- // for an additional batch from that shard. It won't have anything to return until it
- // gets a non-empty response.
- ASSERT_FALSE(arm->ready());
- responses.clear();
- std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
- responses.emplace_back(_nss, CursorId(1), batch1);
+ std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor()->waitForEvent(readyEvent);
+ // Now that the responses have been delivered, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
-
- // The shard responds with another empty batch but leaves the cursor open. It probably shouldn't
- // do this, but there's no reason the ARM can't handle this by asking for more.
- responses.clear();
- responses.emplace_back(_nss, CursorId(1), std::vector<BSONObj>());
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ // Because the response contained a cursorId of 0, ARM marked the remote as exhausted.
+ ASSERT_TRUE(arm->remotesExhausted());
- // The shard responds with another batch and closes the cursor.
- ASSERT_FALSE(arm->ready());
- responses.clear();
- std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ // ARM returns the correct results.
executor()->waitForEvent(readyEvent);
-
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_TRUE(arm->ready());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
+
+ // After returning all the buffered results, ARM returns EOF immediately because the cursor was
+ // exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
-TEST_F(AsyncResultsMergerTest, ReceivedViewDefinitionFromShard) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
-
- ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
-
- std::string inputNs = "views_sharded.coll";
- std::string inputPipeline = "[ { $match: { a: { $gte: 5.0 } } } ]";
- scheduleNetworkViewResponse(inputNs, inputPipeline);
-
- executor()->waitForEvent(readyEvent);
+TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
+ std::vector<BSONObj> firstBatch = {
+ fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch)));
+ cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 0, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
+ // Because there was firstBatch, ARM is immediately ready to return results.
ASSERT_TRUE(arm->ready());
- auto statusWithNext = arm->nextReady();
- ASSERT(!statusWithNext.isOK());
- ASSERT_EQ(statusWithNext.getStatus().code(),
- ErrorCodes::CommandOnShardedViewNotSupportedOnMongod);
-
- ASSERT(_params->viewDefinition);
-
- auto outputPipeline = (*_params->viewDefinition)["pipeline"];
- ASSERT(!outputPipeline.eoo());
- ASSERT_BSONOBJ_EQ(fromjson(inputPipeline), outputPipeline.Obj());
-
- auto outputNs = (*_params->viewDefinition)["ns"];
- ASSERT(!outputNs.eoo());
- ASSERT_EQ(outputNs.String(), inputNs);
-
- // Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(nullptr);
- executor()->waitForEvent(killEvent);
-}
+ // Because one of the remotes' cursorId was not zero, ARM is not exhausted.
+ ASSERT_FALSE(arm->remotesExhausted());
-TEST_F(AsyncResultsMergerTest, ExistingCursors) {
- makeCursorFromExistingCursors({{kTestShardHosts[0], 5}, {kTestShardHosts[1], 6}});
+ // ARM returns the correct results.
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_TRUE(arm->ready());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_TRUE(arm->ready());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
+ // Now that the firstBatch results have been returned, ARM must wait for further results.
ASSERT_FALSE(arm->ready());
+
+ // Schedule requests.
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+
+ // Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+ // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
- std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(0), batch1);
- std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
-
- executor()->waitForEvent(readyEvent);
+ std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
+ responses.emplace_back(_nss, CursorId(0), batch);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+ // Now that the responses have been delivered, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
+
+ // Because the response contained a cursorId of 0, ARM marked the remote as exhausted.
+ ASSERT_TRUE(arm->remotesExhausted());
+
+ // ARM returns the correct results.
+ executor()->waitForEvent(readyEvent);
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
+
+ // After returning all the buffered results, ARM returns EOF immediately because the cursor was
+ // exhausted.
ASSERT_TRUE(arm->ready());
- ASSERT(unittest::assertGet(arm->nextReady()).isEOF());
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
-
TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
- BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -688,7 +757,8 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
responses.emplace_back(_nss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(_nss, CursorId(2), batch2);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -744,35 +814,17 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
}
TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
- std::vector<BSONObj> batch1 = {
- fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- responses.emplace_back(_nss, CursorId(123), batch1);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor()->waitForEvent(readyEvent);
-
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_FALSE(arm->ready());
-
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
-
- responses.clear();
- std::vector<BSONObj> batch2 = {
- fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(_nss, CursorId(456), batch2);
+ std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
+ responses.emplace_back(_nss, CursorId(456), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -786,8 +838,11 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
}
TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 456, {}));
+ cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 789, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -795,14 +850,13 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1)
- .toBSON(CursorResponse::ResponseType::InitialResponse);
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
BSONObj response2 = fromjson("{foo: 'bar'}");
std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")};
- BSONObj response3 = CursorResponse(_nss, CursorId(456), batch3)
- .toBSON(CursorResponse::ResponseType::InitialResponse);
+ BSONObj response3 = CursorResponse(_nss, CursorId(789), batch3)
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
scheduleNetworkResponseObjs({response1, response2, response3});
executor()->waitForEvent(readyEvent);
-
ASSERT_TRUE(arm->ready());
auto statusWithNext = arm->nextReady();
ASSERT(!statusWithNext.isOK());
@@ -813,8 +867,11 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
}
TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 3, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -825,7 +882,8 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
responses.emplace_back(_nss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(_nss, CursorId(2), batch2);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"});
executor()->waitForEvent(readyEvent);
@@ -842,8 +900,9 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
}
TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -854,7 +913,8 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -870,8 +930,10 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
}
TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
+
executor()->shutdown();
ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent(nullptr).getStatus());
auto killEvent = arm->kill(nullptr);
@@ -879,8 +941,9 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
}
TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatches) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Make a request to the shard that will never get answered.
ASSERT_FALSE(arm->ready());
@@ -895,8 +958,9 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch
}
TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto killedEvent = arm->kill(nullptr);
@@ -910,8 +974,11 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
}
TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
- BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -924,7 +991,8 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
responses.emplace_back(_nss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(_nss, CursorId(123), batch3);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
// Kill should be able to return right away if there are no pending batches.
auto killedEvent = arm->kill(nullptr);
@@ -934,8 +1002,11 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
}
TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
- BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 3, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -944,85 +1015,24 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(0), batch1);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
-
- // Kill event will only be signalled once the pending batches are received.
- auto killedEvent = arm->kill(nullptr);
-
- // After the kill, the ARM waits for outstanding batches to come back. This ensures that we
- // receive cursor ids for any established remote cursors, and can clean them up by issuing
- // killCursors commands.
- responses.clear();
- std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(_nss, CursorId(123), batch2);
- std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")};
- responses.emplace_back(_nss, CursorId(0), batch2);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
-
- // Only one of the responses has a non-zero cursor id. The ARM should have issued a killCursors
- // command against this id.
- BSONObj expectedCmdObj = BSON("killCursors"
- << "testcoll"
- << "cursors"
- << BSON_ARRAY(CursorId(123)));
- ASSERT_BSONOBJ_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj);
-
- // Ensure that we properly signal both those waiting for the kill, and those waiting for more
- // results to be ready.
- executor()->waitForEvent(readyEvent);
- executor()->waitForEvent(killedEvent);
-}
-
-TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) {
- BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
-
- ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
-
- std::vector<CursorResponse> responses;
- std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(123), batch1);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
-
- // First batch received.
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
-
- // This will schedule a getMore on cursor id 123.
- ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
-
- auto killedEvent = arm->kill(nullptr);
-
- // The kill can't complete until the getMore response is received.
- responses.clear();
- std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(_nss, CursorId(123), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- // While processing the getMore response, a killCursors against id 123 should have been
- // scheduled.
- BSONObj expectedCmdObj = BSON("killCursors"
- << "testcoll"
- << "cursors"
- << BSON_ARRAY(CursorId(123)));
- ASSERT_BSONOBJ_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj);
+ // Kill event will only be signalled once the callbacks for the pending batches have run.
+ auto killedEvent = arm->kill(nullptr);
- // Ensure that we properly signal both those waiting for the kill, and those waiting for more
- // results to be ready.
+ // The pending requests have been canceled, so run their callbacks.
+ runReadyCallbacks();
+
+ // Ensure that we properly signal those waiting for more results to be ready.
executor()->waitForEvent(readyEvent);
executor()->waitForEvent(killedEvent);
}
TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
- BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -1031,7 +1041,8 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(1), batch1);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
auto killedEvent = arm->kill(nullptr);
@@ -1042,8 +1053,9 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
}
TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
- BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
auto killedEvent1 = arm->kill(nullptr);
ASSERT(killedEvent1.isValid());
auto killedEvent2 = arm->kill(nullptr);
@@ -1054,7 +1066,9 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
TEST_F(AsyncResultsMergerTest, TailableBasic) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -1063,7 +1077,8 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(123), batch1);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1100,7 +1115,9 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -1110,7 +1127,8 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
responses.emplace_back(_nss, CursorId(123), batch);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
// After receiving an empty batch, the ARM should return boost::none, but remotes should not be
@@ -1125,7 +1143,9 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -1135,7 +1155,8 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
// Afterwards, the ARM should return boost::none and remote cursors should be marked as
@@ -1147,7 +1168,9 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 3}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -1157,7 +1180,8 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(1), batch1);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1188,9 +1212,12 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
}
TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
- BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(
- findCmd, {kTestShardIds[0]}, boost::none, ReadPreferenceSetting(ReadPreference::Nearest));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors),
+ boost::none,
+ boost::none,
+ ReadPreferenceSetting(ReadPreference::Nearest));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -1202,7 +1229,8 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(0), batch1);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1213,13 +1241,17 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- makeCursorFromFindCmd(findCmd, kTestShardIds);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 97, {}));
+ cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 98, {}));
+ cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 99, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
- // An unretriable error occurs with the first host.
+ // An error occurs with the first host.
scheduleErrorResponse({ErrorCodes::AuthenticationFailed, "authentication failed"});
ASSERT_FALSE(arm->ready());
@@ -1230,7 +1262,8 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.emplace_back(_nss, CursorId(98), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(99), batch2);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1275,7 +1308,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 98, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -1284,7 +1319,8 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(98), batch);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1303,81 +1339,12 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
-TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
-
- ASSERT_FALSE(arm->ready());
-
- // First and second attempts return an error.
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
- executor()->waitForEvent(readyEvent);
- ASSERT_FALSE(arm->ready());
-
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
-
- executor()->waitForEvent(readyEvent);
- ASSERT_FALSE(arm->ready());
-
- // Third attempt succeeds.
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- std::vector<CursorResponse> responses;
- std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
- responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor()->waitForEvent(readyEvent);
-
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
-
- ASSERT_TRUE(arm->remotesExhausted());
- ASSERT_TRUE(arm->ready());
-}
-
-TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
-
- ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
-
- // All attempts return an error (one attempt plus three retries)
- scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
- executor()->waitForEvent(readyEvent);
- ASSERT_FALSE(arm->ready());
-
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
- scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
- executor()->waitForEvent(readyEvent);
- ASSERT_FALSE(arm->ready());
-
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
- scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
- executor()->waitForEvent(readyEvent);
- ASSERT_FALSE(arm->ready());
-
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
- scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
- executor()->waitForEvent(readyEvent);
- ASSERT_TRUE(arm->ready());
-
- auto status = arm->nextReady();
- ASSERT_EQ(status.getStatus().code(), ErrorCodes::NotMasterNoSlaveOk);
-
- // Protocol is to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(nullptr);
- executor()->waitForEvent(killEvent);
-}
-
-TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) {
+TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 2, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -1387,30 +1354,12 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
-
- // From the second host all attempts return an error (one attempt plus three retries)
- scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
- executor()->waitForEvent(readyEvent);
- ASSERT_FALSE(arm->ready());
-
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
- scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
- executor()->waitForEvent(readyEvent);
- ASSERT_FALSE(arm->ready());
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
+ // From the second host we get a network (retriable) error.
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
- executor()->waitForEvent(readyEvent);
- ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
- scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
- executor()->waitForEvent(readyEvent);
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1420,57 +1369,39 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) {
ASSERT_TRUE(arm->ready());
}
-TEST_F(AsyncResultsMergerTest, ErrorAtFirstAttemptAtSameTimeShouldEventuallyReturnResults) {
+TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 2, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
- // Both hosts return an error which indicates that the request should be retried.
+ // Both hosts return network (retriable) errors.
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
executor()->waitForEvent(readyEvent);
- ASSERT_FALSE(arm->ready());
-
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- ASSERT_FALSE(arm->ready());
-
-
- // Return valid data on both hosts.
- {
- std::vector<CursorResponse> responses;
- std::vector<BSONObj> batch = {fromjson("{_id: 1, $sortKey: {'': 1}}")};
- responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::InitialResponse);
- }
-
- {
- std::vector<CursorResponse> responses;
- std::vector<BSONObj> batch = {fromjson("{_id: 2, $sortKey: {'': 2}}}")};
- responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::InitialResponse);
- }
-
- executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 1, $sortKey: {'': 1}}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 2, $sortKey: {'': 2}}}}"),
- *unittest::assertGet(arm->nextReady()).getResult());
+ auto statusWithNext = arm->nextReady();
+ ASSERT(!statusWithNext.isOK());
+ ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::HostUnreachable);
+ ASSERT_EQ(statusWithNext.getStatus().reason(), "host unreachable");
- ASSERT_TRUE(arm->remotesExhausted());
- ASSERT_TRUE(arm->ready());
+ // Required to kill the 'arm' on error before destruction.
+ auto killEvent = arm->kill(nullptr);
+ executor()->waitForEvent(killEvent);
}
TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -1479,7 +1410,8 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(123), batch1);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1513,9 +1445,13 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
+
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
+
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
auto killEvent = arm->kill(nullptr);
executor()->waitForEvent(killEvent);
@@ -1523,7 +1459,10 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
+
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
auto killEvent = arm->kill(nullptr);
executor()->waitForEvent(killEvent);
@@ -1531,7 +1470,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
@@ -1544,33 +1485,6 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
executor()->waitForEvent(killEvent);
}
-TEST_F(AsyncResultsMergerTest, RetryWhenShardHasRetriableErrorInBetweenReadyAndNextEvent) {
- BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
-
- ASSERT_FALSE(arm->ready());
-
- // First attempt returns a retriable error.
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
-
- // We expect to be able to retrieve another event, and be waiting on the retry to succeed.
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
- std::vector<CursorResponse> responses;
- std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
-
- executor()->waitForEvent(readyEvent);
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_TRUE(arm->remotesExhausted());
- ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 45d5d76a147..1754b233eca 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -86,11 +86,6 @@ public:
virtual UserNameIterator getAuthenticatedUsers() const = 0;
/**
- * Returns the view definition associated with this cursor, if any.
- */
- virtual boost::optional<BSONObj> viewDefinition() const = 0;
-
- /**
* Returns the number of result documents returned so far by this cursor via the next() method.
*/
virtual long long getNumReturnedSoFar() const = 0;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index dec40936d7c..1bd2bd644a8 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -104,10 +104,6 @@ UserNameIterator ClusterClientCursorImpl::getAuthenticatedUsers() const {
_params.authenticatedUsers.end());
}
-boost::optional<BSONObj> ClusterClientCursorImpl::viewDefinition() const {
- return _params.viewDefinition;
-}
-
long long ClusterClientCursorImpl::getNumReturnedSoFar() const {
return _numReturnedSoFar;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 27b31dba14c..bb08223f618 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -103,8 +103,6 @@ public:
UserNameIterator getAuthenticatedUsers() const final;
- boost::optional<BSONObj> viewDefinition() const final;
-
long long getNumReturnedSoFar() const final;
void queueResult(const ClusterQueryResult& result) final;
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 8b4d64f4d41..9b092358920 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -85,10 +85,6 @@ UserNameIterator ClusterClientCursorMock::getAuthenticatedUsers() const {
return makeUserNameIterator(emptyAuthenticatedUsers.begin(), emptyAuthenticatedUsers.end());
}
-boost::optional<BSONObj> ClusterClientCursorMock::viewDefinition() const {
- return boost::none;
-}
-
void ClusterClientCursorMock::queueResult(const ClusterQueryResult& result) {
_resultsQueue.push({result});
}
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 23d857d59d6..5ee4143f45f 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -51,8 +51,6 @@ public:
UserNameIterator getAuthenticatedUsers() const final;
- boost::optional<BSONObj> viewDefinition() const final;
-
long long getNumReturnedSoFar() const final;
void queueResult(const ClusterQueryResult& result) final;
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 03db1e37194..2fcaf317b3b 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -37,6 +37,7 @@
#include "mongo/db/auth/user_name.h"
#include "mongo/db/cursor_id.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/query/cursor_response.h"
#include "mongo/s/client/shard.h"
#include "mongo/util/net/hostandport.h"
@@ -44,57 +45,30 @@ namespace mongo {
class OperationContext;
+/**
+ * The resulting ClusterClientCursor will take ownership of the existing remote cursor, generating
+ * results based on the cursor's current state.
+ *
+ * Note that any results already generated from this cursor will not be returned by the resulting
+ * ClusterClientCursor. The caller is responsible for ensuring that results previously generated by
+ * this cursor have been processed.
+ */
struct ClusterClientCursorParams {
// When mongos has to do a merge in order to return results to the client in the correct sort
// order, it requests a sortKey meta-projection using this field name.
static const char kSortKeyField[];
- /**
- * Contains any CCC parameters that are specified per-remote node.
- */
- struct Remote {
- /**
- * Use when a new cursor should be created on the remote.
- */
- Remote(ShardId sid, BSONObj cmdObj) : shardId(std::move(sid)), cmdObj(std::move(cmdObj)) {}
-
- /**
- * Use when an a cursor already exists on the remote. The resulting CCC will take ownership
- * of the existing remote cursor, generating results based on its current state.
- *
- * Note that any results already generated from this cursor will not be returned by the
- * resulting CCC. The caller is responsible for ensuring that results previously generated
- * by this cursor have been processed.
- */
- Remote(HostAndPort hostAndPort, CursorId cursorId)
- : hostAndPort(std::move(hostAndPort)), cursorId(cursorId) {}
-
- // If this is a regular query cursor, this value will be set and shard id retargeting may
- // occur on certain networking or replication errors.
- //
- // If this is an externally-prepared cursor (as is in the case of aggregation cursors),
- // this value will never be set and no retargeting will occur.
- boost::optional<ShardId> shardId;
-
- // If this is an externally-specified cursor (e.g. aggregation), this value will be set and
- // used directly and no re-targeting may happen on errors.
- boost::optional<HostAndPort> hostAndPort;
-
- // The raw command parameters to send to this remote (e.g. the find command specification).
- //
- // Exactly one of 'cmdObj' or 'cursorId' must be set.
- boost::optional<BSONObj> cmdObj;
-
- // The cursorId for the remote node, if one already exists.
- //
- // Exactly one of 'cmdObj' or 'cursorId' must be set.
- boost::optional<CursorId> cursorId;
+ struct RemoteCursor {
+ RemoteCursor(HostAndPort hostAndPort, CursorResponse cursorResponse)
+ : hostAndPort(std::move(hostAndPort)), cursorResponse(std::move(cursorResponse)) {}
+
+ // The exact host (within the shard) on which the cursor resides.
+ HostAndPort hostAndPort;
+
+ // Encompasses the state of the established cursor.
+ CursorResponse cursorResponse;
};
- /**
- * Read preference must be provided if initial shard host targeting is necessary (i.e., we don't
- * know yet the remote cursor id).
- */
ClusterClientCursorParams(NamespaceString nss,
UserNameIterator authenticatedUsersIter,
boost::optional<ReadPreferenceSetting> readPref = boost::none)
@@ -107,14 +81,14 @@ struct ClusterClientCursorParams {
}
}
- // Namespace against which to query.
+ // Namespace against which the cursors exist.
NamespaceString nsString;
// The set of authenticated users when this cursor was created.
std::vector<UserName> authenticatedUsers;
// Per-remote node data.
- std::vector<Remote> remotes;
+ std::vector<RemoteCursor> remotes;
// The sort specification. Leave empty if there is no sort.
BSONObj sort;
@@ -137,18 +111,12 @@ struct ClusterClientCursorParams {
// Whether this cursor has the awaitData option set.
bool isAwaitData = false;
- // Read preference for where to target the query. This value is only set if initial shard host
- // targeting is necessary and not used if using externally prepared cursor ids.
+ // Set if a readPreference must be respected throughout the lifetime of the cursor.
boost::optional<ReadPreferenceSetting> readPreference;
// Whether the client indicated that it is willing to receive partial results in the case of an
// unreachable host.
bool isAllowPartialResults = false;
-
- // If the read is done against a view, an error is returned along with the view definition in
- // the first response from the primary shard for the base collection. Calling code can re-run
- // the read against the base collection by using this returned view definition.
- boost::optional<BSONObj> viewDefinition;
};
} // mongo
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index a6dcf1de83e..7249eebc7eb 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -52,6 +52,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_client_cursor_impl.h"
#include "mongo/s/query/cluster_cursor_manager.h"
+#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/store_possible_cursor.h"
#include "mongo/s/stale_exception.h"
#include "mongo/stdx/memory.h"
@@ -159,6 +160,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
// Get the set of shards on which we will run the query.
+
std::vector<std::shared_ptr<Shard>> shards;
if (primary) {
shards.emplace_back(std::move(primary));
@@ -180,6 +182,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
}
}
+ // Construct the query and parameters.
+
ClusterClientCursorParams params(
query.nss(),
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
@@ -213,12 +217,12 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
return qrToForward.getStatus();
}
- // Use read pref to target a particular host from each shard. Also construct the find command
- // that we will forward to each shard.
+ // Construct the find command that we will use to establish cursors, attaching the shardVersion.
+
+ std::vector<std::pair<ShardId, BSONObj>> requests;
for (const auto& shard : shards) {
invariant(!shard->isConfig() || shard->getConnString().type() != ConnectionString::INVALID);
- // Build the find command, and attach shard version if necessary.
BSONObjBuilder cmdBuilder;
qrToForward.getValue()->asFindCommand(&cmdBuilder);
@@ -230,28 +234,44 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
version.appendForCommands(&cmdBuilder);
}
- params.remotes.emplace_back(shard->getId(), cmdBuilder.obj());
+ requests.emplace_back(shard->getId(), cmdBuilder.obj());
}
+ // Establish the cursors with a consistent shardVersion across shards.
+
+ auto swCursors = establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ query.nss(),
+ readPref,
+ requests,
+ query.getQueryRequest().isAllowPartialResults(),
+ viewDefinition);
+ if (!swCursors.isOK()) {
+ // Make sure a viewDefinition was set if the find was on a view.
+ if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swCursors.getStatus().code()) {
+ if (!viewDefinition) {
+ return {ErrorCodes::InternalError,
+ str::stream() << "Missing resolved view definition, but remote returned "
+ << ErrorCodes::errorString(swCursors.getStatus().code())};
+ }
+ }
+ return swCursors.getStatus();
+ }
+
+ // Transfer the established cursors to a ClusterClientCursor.
+
+ params.remotes = std::move(swCursors.getValue());
auto ccc = ClusterClientCursorImpl::make(
opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
+ // Retrieve enough data from the ClusterClientCursor for the first batch of results.
+
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
int bytesBuffered = 0;
while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) {
auto next = ccc->next(opCtx);
if (!next.isOK()) {
- if (viewDefinition &&
- ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == next.getStatus().code()) {
- if (!ccc->viewDefinition()) {
- return {ErrorCodes::InternalError,
- str::stream()
- << "Missing resolved view definition, but remote returned "
- << ErrorCodes::errorString(next.getStatus().code())};
- }
- *viewDefinition = BSON("resolvedView" << *ccc->viewDefinition());
- }
return next.getStatus();
}
@@ -291,7 +311,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
return CursorId(0);
}
- // Register the cursor with the cursor manager.
+ // Register the cursor with the cursor manager for subsequent getMore's.
+
auto cursorManager = Grid::get(opCtx)->getCursorManager();
const auto cursorType = chunkManager ? ClusterCursorManager::CursorType::NamespaceSharded
: ClusterCursorManager::CursorType::NamespaceNotSharded;
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
new file mode 100644
index 00000000000..240b47d4fb9
--- /dev/null
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -0,0 +1,162 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/query/establish_cursors.h"
+
+#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/query/getmore_request.h"
+#include "mongo/db/query/killcursors_request.h"
+#include "mongo/executor/remote_command_request.h"
+#include "mongo/executor/remote_command_response.h"
+#include "mongo/s/async_requests_sender.h"
+#include "mongo/s/grid.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishCursors(
+ OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults,
+ BSONObj* viewDefinition) {
+ // Construct the requests
+ std::vector<AsyncRequestsSender::Request> requests;
+ for (const auto& remote : remotes) {
+ requests.emplace_back(remote.first, remote.second);
+ }
+
+ // Send the requests
+ AsyncRequestsSender ars(opCtx, executor, nss.db().toString(), std::move(requests), readPref);
+
+ // Get the responses
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
+ Status status = Status::OK();
+ while (!ars.done()) {
+ auto response = ars.next();
+
+ StatusWith<CursorResponse> swCursorResponse(
+ response.swResponse.isOK()
+ ? CursorResponse::parseFromBSON(response.swResponse.getValue().data)
+ : response.swResponse.getStatus());
+
+ if (swCursorResponse.isOK()) {
+ remoteCursors.emplace_back(std::move(*response.shardHostAndPort),
+ std::move(swCursorResponse.getValue()));
+ continue;
+ }
+
+ // In the case a read is performed against a view, the shard primary can return an error
+ // indicating that the underlying collection may be sharded. When this occurs the return
+ // message will include an expanded view definition and collection namespace which we
+ // need to store. This allows for a second attempt at the read directly against the
+ // underlying collection.
+ if (swCursorResponse.getStatus() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) {
+ auto& responseObj = response.swResponse.getValue().data;
+ if (!responseObj.hasField("resolvedView")) {
+ status = Status(ErrorCodes::InternalError,
+ str::stream() << "Missing field 'resolvedView' in document: "
+ << responseObj);
+ break;
+ }
+
+ auto resolvedViewObj = responseObj.getObjectField("resolvedView");
+ if (resolvedViewObj.isEmpty()) {
+ status = Status(ErrorCodes::InternalError,
+ str::stream() << "Field 'resolvedView' must be an object: "
+ << responseObj);
+ break;
+ }
+
+ status = std::move(swCursorResponse.getStatus());
+ if (viewDefinition) {
+ *viewDefinition = BSON("resolvedView" << resolvedViewObj.getOwned());
+ }
+ break;
+ }
+
+ // Unreachable host errors are swallowed if the 'allowPartialResults' option is set.
+ if (allowPartialResults) {
+ continue;
+ }
+ status = std::move(swCursorResponse.getStatus());
+ break;
+ }
+
+ // If one of the remotes had an error, we make a best effort to finish retrieving responses for
+ // other requests that were already sent, so that we can send killCursors to any cursors that we
+ // know were established.
+ if (!status.isOK()) {
+ // Do not schedule any new requests.
+ ars.stopRetrying();
+
+ // Collect responses from all requests that were already sent.
+ while (!ars.done()) {
+ auto response = ars.next();
+
+ // Check if the response contains an established cursor, and if so, store it.
+ StatusWith<CursorResponse> swCursorResponse(
+ response.swResponse.isOK()
+ ? CursorResponse::parseFromBSON(response.swResponse.getValue().data)
+ : response.swResponse.getStatus());
+
+ if (swCursorResponse.isOK()) {
+ remoteCursors.emplace_back(*response.shardHostAndPort,
+ std::move(swCursorResponse.getValue()));
+ }
+ }
+
+ // Schedule killCursors against all cursors that were established.
+ for (const auto& remoteCursor : remoteCursors) {
+ BSONObj cmdObj =
+ KillCursorsRequest(nss, {remoteCursor.cursorResponse.getCursorId()}).toBSON();
+ executor::RemoteCommandRequest request(
+ remoteCursor.hostAndPort, nss.db().toString(), cmdObj, opCtx);
+
+ // We do not process the response to the killCursors request (we make a good-faith
+ // attempt at cleaning up the cursors, but ignore any returned errors).
+ executor->scheduleRemoteCommand(
+ request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {});
+ }
+
+ return status;
+ }
+
+ return std::move(remoteCursors);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h
new file mode 100644
index 00000000000..4d921ad7a8a
--- /dev/null
+++ b/src/mongo/s/query/establish_cursors.h
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/cursor_id.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/query/cluster_client_cursor_params.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class CursorResponse;
+
+/**
+ * Establishes cursors on the remote shards by issuing requests in parallel, using the readPref to
+ * select a host within each shard.
+ *
+ * If any of the cursors fails to be established, performs cleanup by sending killCursors to any
+ * cursors that were established and returns a non-OK status.
+ *
+ * If an OK status is returned, the ownership of the cursors is transferred to the caller. This
+ * means the caller is now responsible for either exhausting the cursors or sending killCursors to
+ * them.
+ *
+ * @param allowPartialResults: If true, unreachable hosts are ignored, and only cursors established
+ * on reachable hosts are returned.
+ * @param viewDefinition: If the namespace represents a view, an error is returned and the view
+ * definition is stored in this parameter. Calling code can then attempt to
+ * establish cursors against the base collection using this viewDefinition.
+ */
+StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishCursors(
+ OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults,
+ BSONObj* viewDefinition);
+
+} // namespace mongo
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 4f53b2441bc..d2926602249 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -62,7 +62,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
ClusterClientCursorParams params(
incomingCursorResponse.getValue().getNSS(),
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames());
- params.remotes.emplace_back(server, incomingCursorResponse.getValue().getCursorId());
+ params.remotes.emplace_back(
+ server, CursorResponse(requestedNss, incomingCursorResponse.getValue().getCursorId(), {}));
auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params));