diff options
-rw-r--r-- | src/mongo/s/query/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 262 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 105 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 1048 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor.h | 5 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_mock.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_params.h | 74 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 51 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors.cpp | 162 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors.h | 74 | ||||
-rw-r--r-- | src/mongo/s/query/store_possible_cursor.cpp | 3 |
14 files changed, 895 insertions, 903 deletions
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 51b60590239..1e721da28e8 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -62,10 +62,12 @@ env.Library( source=[ "async_results_merger.cpp", "cluster_client_cursor_params.cpp", + "establish_cursors.cpp", ], LIBDEPS=[ "$BUILD_DIR/mongo/db/query/command_request_response", "$BUILD_DIR/mongo/executor/task_executor_interface", + "$BUILD_DIR/mongo/s/async_requests_sender", "$BUILD_DIR/mongo/s/client/sharding_client", "$BUILD_DIR/mongo/s/coreshard", ], diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 614002978f1..bd3c67bf44a 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -58,21 +58,19 @@ AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor, : _executor(executor), _params(params), _mergeQueue(MergingComparator(_remotes, _params->sort)) { + size_t remoteIndex = 0; for (const auto& remote : _params->remotes) { - if (remote.shardId) { - invariant(remote.cmdObj); - invariant(!remote.cursorId); - invariant(!remote.hostAndPort); - _remotes.emplace_back(*remote.shardId, *remote.cmdObj); - } else { - invariant(!remote.cmdObj); - invariant(remote.cursorId); - invariant(remote.hostAndPort); - _remotes.emplace_back(*remote.hostAndPort, *remote.cursorId); - } + _remotes.emplace_back(remote.hostAndPort, remote.cursorResponse.getCursorId()); + + // We don't check the return value of addBatchToBuffer here; if there was an error, + // it will be stored in the remote and the first call to ready() will return true. + addBatchToBuffer(remoteIndex, remote.cursorResponse.getBatch()); + ++remoteIndex; } - // Initialize command metadata to handle the read preference. + // Initialize command metadata to handle the read preference. We do this in case the readPref + // is primaryOnly, in which case if the remote host for one of the cursors changes roles, the + // remote will return an error. if (_params->readPreference) { BSONObjBuilder metadataBuilder; rpc::ServerSelectionMetadata metadata( @@ -136,13 +134,6 @@ bool AsyncResultsMerger::ready_inlock() { _status = remote.status; return true; } - - // We don't return any results until we have received at least one response from each remote - // node. This is necessary for versioned commands: we have to ensure that we've properly - // established the shard version on each node before we can start returning results. - if (!remote.cursorId) { - return false; - } } const bool hasSort = !_params->sort.isEmpty(); @@ -262,34 +253,19 @@ Status AsyncResultsMerger::askForNextBatch_inlock(OperationContext* opCtx, size_ // request to fetch the remaining docs only. If the remote node has a plan with OR for top k and // a full sort as is the case for the OP_QUERY find then this optimization will prevent // switching to the full sort plan branch. - BSONObj cmdObj; - - if (remote.cursorId) { - auto adjustedBatchSize = _params->batchSize; - - if (_params->batchSize && *_params->batchSize > remote.fetchedCount) { - adjustedBatchSize = *_params->batchSize - remote.fetchedCount; - } - - cmdObj = GetMoreRequest(_params->nsString, - *remote.cursorId, - adjustedBatchSize, - _awaitDataTimeout, - boost::none, - boost::none) - .toBSON(); - } else { - // Do the first time shard host resolution. - invariant(_params->readPreference); - Status resolveStatus = remote.resolveShardIdToHostAndPort(*_params->readPreference); - if (!resolveStatus.isOK()) { - return resolveStatus; - } - - remote.fetchedCount = 0; - cmdObj = *remote.initialCmdObj; + auto adjustedBatchSize = _params->batchSize; + if (_params->batchSize && *_params->batchSize > remote.fetchedCount) { + adjustedBatchSize = *_params->batchSize - remote.fetchedCount; } + BSONObj cmdObj = GetMoreRequest(_params->nsString, + remote.cursorId, + adjustedBatchSize, + _awaitDataTimeout, + boost::none, + boost::none) + .toBSON(); + executor::RemoteCommandRequest request( remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _metadataObj, opCtx); @@ -342,9 +318,8 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent( } if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) { - // If we already have established a cursor with this remote, and there is no outstanding - // request for which we have a valid callback handle, then schedule work to retrieve the - // next batch. + // If this remote is not exhausted and there is no outstanding request for it, schedule + // work to retrieve the next batch. auto nextBatchStatus = askForNextBatch_inlock(opCtx, i); if (!nextBatchStatus.isOK()) { return nextBatchStatus; @@ -377,12 +352,11 @@ StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj auto cursorResponse = std::move(getMoreParseStatus.getValue()); - // If we have a cursor established, and we get a non-zero cursor id that is not equal to the - // established cursor id, we will fail the operation. - if (remote.cursorId && cursorResponse.getCursorId() != 0 && - *remote.cursorId != cursorResponse.getCursorId()) { + // If we get a non-zero cursor id that is not equal to the established cursor id, we will fail + // the operation. + if (cursorResponse.getCursorId() != 0 && remote.cursorId != cursorResponse.getCursorId()) { return Status(ErrorCodes::BadValue, - str::stream() << "Expected cursorid " << *remote.cursorId << " but received " + str::stream() << "Expected cursorid " << remote.cursorId << " but received " << cursorResponse.getCursorId()); } @@ -408,15 +382,6 @@ void AsyncResultsMerger::handleBatchResponse( // Make sure to wake up anyone waiting on '_currentEvent' if we're shutting down. signalCurrentEventIfReady_inlock(); - // Make a best effort to parse the response and retrieve the cursor id. We need the cursor - // id in order to issue a killCursors command against it. - if (cbData.response.isOK()) { - auto cursorResponse = parseCursorResponse(cbData.response.data, remote); - if (cursorResponse.isOK()) { - remote.cursorId = cursorResponse.getValue().getCursorId(); - } - } - // If we're killed and we're not waiting on any more batches to come back, then we are ready // to kill the cursors on the remote hosts and clean up this cursor. Schedule the // killCursors command and signal that this cursor is safe now safe to destroy. We have to @@ -444,77 +409,14 @@ void AsyncResultsMerger::handleBatchResponse( : cbData.response.status); if (!cursorResponseStatus.isOK()) { - // In the case a read is performed against a view, the shard primary can return an error - // indicating that the underlying collection may be sharded. When this occurs the return - // message will include an expanded view definition and collection namespace which we need - // to store. This allows for a second attempt at the read directly against the underlying - // collection. - if (cursorResponseStatus.getStatus() == - ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { - auto& responseObj = cbData.response.data; - if (!responseObj.hasField("resolvedView")) { - remote.status = Status(ErrorCodes::InternalError, - str::stream() << "Missing field 'resolvedView' in document: " - << responseObj); - return; - } - - auto resolvedViewObj = responseObj.getObjectField("resolvedView"); - if (resolvedViewObj.isEmpty()) { - remote.status = Status(ErrorCodes::InternalError, - str::stream() << "Field 'resolvedView' must be an object: " - << responseObj); - return; - } - - _params->viewDefinition = resolvedViewObj.getOwned(); - remote.status = cursorResponseStatus.getStatus(); - return; - } - auto shard = remote.getShard(); if (!shard) { remote.status = Status(cursorResponseStatus.getStatus().code(), - str::stream() << "Could not find shard " << *remote.shardId - << " containing host " + str::stream() << "Could not find shard containing host " << remote.getTargetHost().toString()); } else { shard->updateReplSetMonitor(remote.getTargetHost(), cursorResponseStatus.getStatus()); - - // If we can still retry the initial cursor establishment, reset the state so it can be - // retried the next time nextEvent is called. Never retry getMores to avoid - // accidentally skipping results. - if (!remote.cursorId && remote.retryCount < kMaxNumFailedHostRetryAttempts && - shard->isRetriableError(cursorResponseStatus.getStatus().code(), - Shard::RetryPolicy::kIdempotent)) { - invariant(remote.shardId); - invariant(remote.docBuffer.empty()); - - LOG(1) << "Initial cursor establishment failed with retriable error and will be " - "retried" - << causedBy(redact(cursorResponseStatus.getStatus())); - - ++remote.retryCount; - remote.status = Status::OK(); // Reset status so it can be retried. - - // Signal the merger thread to make it retry this remote again. - if (_currentEvent.isValid()) { - // To prevent ourselves from signalling the event twice, - // we set '_currentEvent' as invalid after signalling it. - _executor->signalEvent(_currentEvent); - _currentEvent = executor::TaskExecutor::EventHandle(); - } - - return; - } else { - remote.status = cursorResponseStatus.getStatus(); - if (remote.status == ErrorCodes::CallbackCanceled) { - // This callback should only be canceled as part of the shutdown sequence, so we - // promote a canceled callback error to an error that will make more sense to - // the client. - remote.status = Status(ErrorCodes::ShutdownInProgress, "shutdown in progress"); - } - } + remote.status = cursorResponseStatus.getStatus(); } // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We @@ -531,36 +433,22 @@ void AsyncResultsMerger::handleBatchResponse( return; } - // Cursor id successfully established. - auto cursorResponse = std::move(cursorResponseStatus.getValue()); - remote.cursorId = cursorResponse.getCursorId(); - remote.initialCmdObj = boost::none; + // Response successfully received. - for (const auto& obj : cursorResponse.getBatch()) { - // If there's a sort, we're expecting the remote node to give us back a sort key. - if (!_params->sort.isEmpty() && - obj[ClusterClientCursorParams::kSortKeyField].type() != BSONType::Object) { - remote.status = Status(ErrorCodes::InternalError, - str::stream() << "Missing field '" - << ClusterClientCursorParams::kSortKeyField - << "' in document: " - << obj); - return; - } + auto cursorResponse = std::move(cursorResponseStatus.getValue()); - ClusterQueryResult result(obj); - remote.docBuffer.push(result); - ++remote.fetchedCount; - } + // Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard. + remote.cursorId = cursorResponse.getCursorId(); - // If we're doing a sorted merge, then we have to make sure to put this remote onto the - // merge queue. - if (!_params->sort.isEmpty() && !cursorResponse.getBatch().empty()) { - _mergeQueue.push(remoteIndex); + // Save the batch in the remote's buffer. + if (!addBatchToBuffer(remoteIndex, cursorResponse.getBatch())) { + return; } // If the cursor is tailable and we just received an empty batch, the next return value should // be boost::none in order to indicate the end of the batch. + // (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from + // one shard means the end of the overall batch). if (_params->isTailable && !remote.hasNext()) { _eofNext = true; } @@ -583,6 +471,33 @@ void AsyncResultsMerger::handleBatchResponse( signalCurrentEventIfReady_inlock(); } +bool AsyncResultsMerger::addBatchToBuffer(size_t remoteIndex, const std::vector<BSONObj>& batch) { + auto& remote = _remotes[remoteIndex]; + for (const auto& obj : batch) { + // If there's a sort, we're expecting the remote node to have given us back a sort key. + if (!_params->sort.isEmpty() && + obj[ClusterClientCursorParams::kSortKeyField].type() != BSONType::Object) { + remote.status = Status(ErrorCodes::InternalError, + str::stream() << "Missing field '" + << ClusterClientCursorParams::kSortKeyField + << "' in document: " + << obj); + return false; + } + + ClusterQueryResult result(obj); + remote.docBuffer.push(result); + ++remote.fetchedCount; + } + + // If we're doing a sorted merge, then we have to make sure to put this remote onto the + // merge queue. + if (!_params->sort.isEmpty() && !batch.empty()) { + _mergeQueue.push(remoteIndex); + } + return true; +} + void AsyncResultsMerger::signalCurrentEventIfReady_inlock() { if (ready_inlock() && _currentEvent.isValid()) { // To prevent ourselves from signalling the event twice, we set '_currentEvent' as @@ -610,7 +525,7 @@ void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* opCtx) { invariant(!remote.cbHandle.isValid()); if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) { - BSONObj cmdObj = KillCursorsRequest(_params->nsString, {*remote.cursorId}).toBSON(); + BSONObj cmdObj = KillCursorsRequest(_params->nsString, {remote.cursorId}).toBSON(); executor::RemoteCommandRequest request( remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx); @@ -656,6 +571,12 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o scheduleKillCursors_inlock(opCtx); _lifecycleState = kKillComplete; _executor->signalEvent(_killCursorsScheduledEvent); + } else { + for (const auto& remote : _remotes) { + if (remote.cbHandle.isValid()) { + _executor->cancel(remote.cbHandle); + } + } } return _killCursorsScheduledEvent; @@ -665,16 +586,12 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o // AsyncResultsMerger::RemoteCursorData // -AsyncResultsMerger::RemoteCursorData::RemoteCursorData(ShardId shardId, BSONObj cmdObj) - : shardId(std::move(shardId)), initialCmdObj(std::move(cmdObj)) {} - AsyncResultsMerger::RemoteCursorData::RemoteCursorData(HostAndPort hostAndPort, CursorId establishedCursorId) - : cursorId(establishedCursorId), _shardHostAndPort(std::move(hostAndPort)) {} + : cursorId(establishedCursorId), shardHostAndPort(std::move(hostAndPort)) {} const HostAndPort& AsyncResultsMerger::RemoteCursorData::getTargetHost() const { - invariant(_shardHostAndPort); - return *_shardHostAndPort; + return shardHostAndPort; } bool AsyncResultsMerger::RemoteCursorData::hasNext() const { @@ -682,38 +599,11 @@ bool AsyncResultsMerger::RemoteCursorData::hasNext() const { } bool AsyncResultsMerger::RemoteCursorData::exhausted() const { - return cursorId && (*cursorId == 0); -} - -Status AsyncResultsMerger::RemoteCursorData::resolveShardIdToHostAndPort( - const ReadPreferenceSetting& readPref) { - invariant(shardId); - invariant(!cursorId); - - const auto shard = getShard(); - if (!shard) { - return Status(ErrorCodes::ShardNotFound, - str::stream() << "Could not find shard " << *shardId); - } - - // TODO: Pass down an OperationContext* to use here. - auto findHostStatus = shard->getTargeter()->findHostWithMaxWait(readPref, Seconds{20}); - if (!findHostStatus.isOK()) { - return findHostStatus.getStatus(); - } - - _shardHostAndPort = std::move(findHostStatus.getValue()); - - return Status::OK(); + return cursorId == 0; } std::shared_ptr<Shard> AsyncResultsMerger::RemoteCursorData::getShard() { - invariant(shardId || _shardHostAndPort); - if (shardId) { - return grid.shardRegistry()->getShardNoReload(*shardId); - } else { - return grid.shardRegistry()->getShardNoReload(_shardHostAndPort->toString()); - } + return grid.shardRegistry()->getShardNoReload(shardHostAndPort.toString()); } // diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index e6766a4faa3..6b30730392f 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -48,11 +48,12 @@ namespace mongo { class CursorResponse; /** - * AsyncResultsMerger is used to generate results from cursor-generating commands on one or more - * remote hosts. A cursor-generating command (e.g. the find command) is one that establishes a - * ClientCursor and a matching cursor id on the remote host. In order to retrieve all command - * results, getMores must be issued against each of the remote cursors until they are exhausted. The - * results from the remote nodes are merged to present either a single sorted or unsorted stream. + * Given a set of cursorIds across one or more shards, the AsyncResultsMerger calls getMore on the + * cursors to present a single sorted or unsorted stream of documents. + * + * (A cursor-generating command (e.g. the find command) is one that establishes a ClientCursor and a + * matching cursorId on the remote host. In order to retrieve all document results, getMores must be + * issued against each of the remote cursors until they are exhausted). * * The ARM offers a non-blocking interface: if no results are immediately available on this host for * retrieval, calling nextEvent() schedules work on the remote hosts in order to generate further @@ -74,16 +75,20 @@ class AsyncResultsMerger { public: /** - * Constructs a new AsyncResultsMerger. The TaskExecutor* must remain valid for the lifetime of - * the ARM. + * Takes ownership of the cursors from ClusterClientCursorParams by storing their cursorIds and + * the hosts on which they exist in _remotes. + * + * Additionally copies each remote's first batch of results, if one exists, into that remote's + * docBuffer. If a sort is specified in the ClusterClientCursorParams, places the remotes with + * buffered results onto _mergeQueue. + * + * The TaskExecutor* must remain valid for the lifetime of the ARM. */ AsyncResultsMerger(executor::TaskExecutor* executor, ClusterClientCursorParams* params); /** - * In order to be destroyed, either - * --the cursor must have been kill()'ed and the event return from kill() must have been - * signaled, or - * --all cursors must have been exhausted. + * In order to be destroyed, either the ARM must have been kill()'ed or all cursors must have + * been exhausted. This is so that any unexhausted cursors are cleaned up by the ARM. */ virtual ~AsyncResultsMerger(); @@ -148,25 +153,22 @@ public: * function, that has not yet been signaled. If there is an outstanding unsignaled event, * returns an error. * - * Conditions when event can be signaled: - * - Finished collecting results from all remotes. - * - One of the host failed with a retriable error. In this case, if ready() is false, then - * the caller should call nextEvent() to retry the request on the hosts that errored. If - * ready() is true, then either the error was not retriable or it has exhausted max retries. + * If there is a sort, the event is signaled when there are buffered results for all + * non-exhausted remotes. + * If there is no sort, the event is signaled when some remote has a buffered result. */ StatusWith<executor::TaskExecutor::EventHandle> nextEvent(OperationContext* opCtx); /** - * Starts shutting down this ARM. Returns a handle to an event which is signaled when this - * cursor is safe to destroy. + * Starts shutting down this ARM by canceling all pending requests. Returns a handle to an event + * that is signaled when this ARM is safe to destroy. + * If there are no pending requests, schedules killCursors and signals the event immediately. + * Otherwise, the last callback that runs after kill() is called schedules killCursors and + * signals the event. * * Returns an invalid handle if the underlying task executor is shutting down. In this case, * killing is considered complete and the ARM may be destroyed immediately. * - * When the underlying task executor is *not* shutting down, an ARM can only be destroyed if - * either 1) all its results have been exhausted or 2) the kill event returned by this method - * has been signaled. - * * May be called multiple times (idempotent). */ executor::TaskExecutor::EventHandle kill(OperationContext* opCtx); @@ -178,17 +180,6 @@ private: * reported from the remote. */ struct RemoteCursorData { - /** - * Creates a new uninitialized remote cursor state, which will have to send a command in - * order to establish its cursor id. Must only be used if the remote cursor ids are not yet - * known. - */ - RemoteCursorData(ShardId shardId, BSONObj cmdObj); - - /** - * Instantiates a new initialized remote cursor, which has an established cursor id. It may - * only be used for getMore operations. - */ RemoteCursorData(HostAndPort hostAndPort, CursorId establishedCursorId); /** @@ -208,50 +199,31 @@ private: bool exhausted() const; /** - * Given the shard id with which the cursor was initialized and a read preference, selects - * a host on which the cursor should be created. - * - * May not be called once a cursor has already been established. - */ - Status resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref); - - /** * Returns the Shard object associated with this remote cursor. */ std::shared_ptr<Shard> getShard(); - // ShardId on which a cursor will be created. - // TODO: This should always be set. - const boost::optional<ShardId> shardId; - - // The command object for sending to the remote to establish the cursor. If a remote cursor - // has not been established yet, this member will be set to a valid command object. If a - // remote cursor has already been established, this member will be unset. - boost::optional<BSONObj> initialCmdObj; + // The cursor id for the remote cursor. If a remote cursor is not yet exhausted, this member + // will be set to a valid non-zero cursor id. If a remote cursor is now exhausted, this + // member will be set to zero. + CursorId cursorId; - // The cursor id for the remote cursor. If a remote cursor has not been established yet, - // this member will be unset. If a remote cursor has been established and is not yet - // exhausted, this member will be set to a valid non-zero cursor id. If a remote cursor was - // established but is now exhausted, this member will be set to zero. - boost::optional<CursorId> cursorId; + // The exact host in the shard on which the cursor resides. + HostAndPort shardHostAndPort; + // The buffer of results that have been retrieved but not yet returned to the caller. std::queue<ClusterQueryResult> docBuffer; + + // Is valid if there is currently a pending request to this remote. executor::TaskExecutor::CallbackHandle cbHandle; - Status status = Status::OK(); - // Counts how many times we retried the initial cursor establishment command. It is used to - // make a decision based on the error type and the retry count about whether we are allowed - // to retry sending the request to another host from this shard. - int retryCount = 0; + // Set to an error status if there is an error retrieving a response from this remote or if + // the command result contained an error. + Status status = Status::OK(); // Count of fetched docs during ARM processing of the current batch. Used to reduce the // batchSize in getMore when mongod returned less docs than the requested batchSize. long long fetchedCount = 0; - - private: - // For a cursor, which has shard id associated contains the exact host on which the remote - // cursor resides. - boost::optional<HostAndPort> _shardHostAndPort; }; class MergingComparator { @@ -324,6 +296,11 @@ private: void handleBatchResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, OperationContext* opCtx, size_t remoteIndex); + /** + * Adds the batch of results to the RemoteCursorData. Returns false if there was an error + * parsing the batch. + */ + bool addBatchToBuffer(size_t remoteIndex, const std::vector<BSONObj>& batch); /** * If there is a valid unsignaled event that has been requested via nextReady() and there are diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 0dc06aaa5bd..2fd5513d67c 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -105,47 +105,32 @@ public: protected: /** - * Given a find command specification, 'findCmd', and a list of remote host:port pairs, - * constructs the appropriate ARM. + * Constructs an ARM with the given vector of existing cursors. * - * If 'batchSize' is set (i.e. not equal to boost::none), this batchSize is used for each - * getMore. If 'findCmd' has a batchSize, this is used just for the initial find operation. + * If 'findCmd' is not set, the default ClusterClientCursorParams are used. + * Otherwise, the 'findCmd' is used to construct the ClusterClientCursorParams. + * + * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the + * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.' */ - void makeCursorFromFindCmd( - const BSONObj& findCmd, - const std::vector<ShardId>& shardIds, + void makeCursorFromExistingCursors( + std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors, + boost::optional<BSONObj> findCmd = boost::none, boost::optional<long long> getMoreBatchSize = boost::none, ReadPreferenceSetting readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly)) { - const bool isExplain = true; - const auto qr = - unittest::assertGet(QueryRequest::makeFromFindCommand(_nss, findCmd, isExplain)); - _params = stdx::make_unique<ClusterClientCursorParams>(_nss, UserNameIterator(), readPref); - _params->sort = qr->getSort(); - _params->limit = qr->getLimit(); - _params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize(); - _params->skip = qr->getSkip(); - _params->isTailable = qr->isTailable(); - _params->isAwaitData = qr->isAwaitData(); - _params->isAllowPartialResults = qr->isAllowPartialResults(); - - for (const auto& shardId : shardIds) { - _params->remotes.emplace_back(shardId, findCmd); - } - - arm = stdx::make_unique<AsyncResultsMerger>(executor(), _params.get()); - } - - /** - * Given a vector of (HostAndPort, CursorIds) representing a set of existing cursors, constructs - * the appropriate ARM. The default CCC parameters are used. - */ - void makeCursorFromExistingCursors( - const std::vector<std::pair<HostAndPort, CursorId>>& remotes) { - _params = stdx::make_unique<ClusterClientCursorParams>(_nss, UserNameIterator()); - - for (const auto& hostIdPair : remotes) { - _params->remotes.emplace_back(hostIdPair.first, hostIdPair.second); + _params->remotes = std::move(remoteCursors); + + if (findCmd) { + const auto qr = unittest::assertGet( + QueryRequest::makeFromFindCommand(_nss, *findCmd, false /* isExplain */)); + _params->sort = qr->getSort(); + _params->limit = qr->getLimit(); + _params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize(); + _params->skip = qr->getSkip(); + _params->isTailable = qr->isTailable(); + _params->isAwaitData = qr->isAwaitData(); + _params->isAllowPartialResults = qr->isAllowPartialResults(); } arm = stdx::make_unique<AsyncResultsMerger>(executor(), _params.get()); @@ -219,6 +204,13 @@ protected: net->exitNetwork(); } + void runReadyCallbacks() { + executor::NetworkInterfaceMock* net = network(); + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); + } + void blackHoleNextRequest() { executor::NetworkInterfaceMock* net = network(); net->enterNetwork(); @@ -233,263 +225,338 @@ protected: std::unique_ptr<AsyncResultsMerger> arm; }; -TEST_F(AsyncResultsMergerTest, ClusterFind) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); +TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {})); + makeCursorFromExistingCursors(std::move(cursors)); - ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); ASSERT_FALSE(arm->remotesExhausted()); - // First shard responds. - std::vector<CursorResponse> responses; - std::vector<BSONObj> batch1 = { - fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - responses.emplace_back(_nss, CursorId(0), batch1); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + // Schedule requests. + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - // Can't return any results until we have a response from all three shards. + // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); ASSERT_FALSE(arm->remotesExhausted()); - // Second two shards respond. - responses.clear(); - std::vector<BSONObj> batch2 = {fromjson("{_id: 4}")}; - responses.emplace_back(_nss, CursorId(0), batch2); - std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(_nss, CursorId(0), batch3); + // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; + responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor()->waitForEvent(readyEvent); + // Now that the responses have been delivered, ARM is ready to return results. + ASSERT_TRUE(arm->ready()); + // Because the response contained a cursorId of 0, ARM marked the remote as exhausted. ASSERT_TRUE(arm->remotesExhausted()); - ASSERT_TRUE(arm->ready()); + + // ARM returns the correct results. + executor()->waitForEvent(readyEvent); ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); + + // After returning all the buffered results, ARM returns EOF immediately because the cursor was + // exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } -TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { - BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); +TEST_F(AsyncResultsMergerTest, SingleShardSorted) { + BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}"); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); + // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + + // Schedule requests. auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + + // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. std::vector<CursorResponse> responses; - std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(10), batch1); - std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(_nss, CursorId(11), batch2); - std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(_nss, CursorId(12), batch3); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor()->waitForEvent(readyEvent); + std::vector<BSONObj> batch = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")}; + responses.emplace_back(_nss, CursorId(0), batch); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); - ASSERT_FALSE(arm->remotesExhausted()); + // Now that the responses have been delivered, ARM is ready to return results. ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); + + // Because the response contained a cursorId of 0, ARM marked the remote as exhausted. + ASSERT_TRUE(arm->remotesExhausted()); + + // ARM returns all results in order. + executor()->waitForEvent(readyEvent); + ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 5}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 6}}"), + *unittest::assertGet(arm->nextReady()).getResult()); + + // After returning all the buffered results, ARM returns EOF immediately because the cursor was + // exhausted. ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); +} +TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {})); + cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 6, {})); + makeCursorFromExistingCursors(std::move(cursors)); + + // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + ASSERT_FALSE(arm->remotesExhausted()); + + // Schedule requests. + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + + // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); ASSERT_FALSE(arm->remotesExhausted()); - responses.clear(); - std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")}; - responses.emplace_back(_nss, CursorId(10), batch4); - std::vector<BSONObj> batch5 = {fromjson("{_id: 9}")}; - responses.emplace_back(_nss, CursorId(0), batch5); - std::vector<BSONObj> batch6 = {fromjson("{_id: 10}")}; - responses.emplace_back(_nss, CursorId(0), batch6); + // First shard responds; the handleBatchResponse callback is run and ARM's remote gets updated. + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch1 = { + fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; + responses.emplace_back(_nss, CursorId(0), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor()->waitForEvent(readyEvent); - ASSERT_FALSE(arm->remotesExhausted()); + // ARM is ready to return first result. ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 10}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady()).getResult()); + + // ARM is not exhausted, because second shard has yet to respond. + ASSERT_FALSE(arm->remotesExhausted()); + + // ARM returns results from first shard immediately. + executor()->waitForEvent(readyEvent); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); + // There are no further buffered results, so ARM is not ready. ASSERT_FALSE(arm->ready()); + + // Make next event to be signaled. readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - ASSERT_FALSE(arm->remotesExhausted()); + // Second shard responds; the handleBatchResponse callback is run and ARM's remote gets updated. responses.clear(); - std::vector<BSONObj> batch7 = {fromjson("{_id: 11}")}; - responses.emplace_back(_nss, CursorId(0), batch7); + std::vector<BSONObj> batch2 = { + fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; + responses.emplace_back(_nss, CursorId(0), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor()->waitForEvent(readyEvent); + // ARM is ready to return remaining results. + ASSERT_TRUE(arm->ready()); ASSERT_TRUE(arm->remotesExhausted()); + + // ARM returns results from second shard immediately. + executor()->waitForEvent(readyEvent); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 11}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_TRUE(arm->ready()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); + + // After returning all the buffered results, the ARM returns EOF immediately because both + // shards cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } -TEST_F(AsyncResultsMergerTest, ClusterFindSorted) { - BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}, batchSize: 2}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); +TEST_F(AsyncResultsMergerTest, MultiShardSorted) { + BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}"); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {})); + cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 6, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); + // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + + // Schedule requests. auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + + // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + // First shard responds; the handleBatchResponse callback is run and ARM's remote gets updated. std::vector<CursorResponse> responses; - std::vector<BSONObj> batch1 = {fromjson("{_id: 5, $sortKey: {'': 5}}"), - fromjson("{_id: 6, $sortKey: {'': 6}}")}; + std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"), + fromjson("{$sortKey: {'': 6}}")}; responses.emplace_back(_nss, CursorId(0), batch1); - std::vector<BSONObj> batch2 = {fromjson("{_id: 3, $sortKey: {'': 3}}"), - fromjson("{_id: 9, $sortKey: {'': 9}}")}; + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); + + // ARM is not ready to return results until receiving responses from all remotes. + ASSERT_FALSE(arm->ready()); + + // ARM is not exhausted, because second shard has yet to respond. + ASSERT_FALSE(arm->remotesExhausted()); + + // Second shard responds; the handleBatchResponse callback is run and ARM's remote gets updated. + responses.clear(); + std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"), + fromjson("{$sortKey: {'': 9}}")}; responses.emplace_back(_nss, CursorId(0), batch2); - std::vector<BSONObj> batch3 = {fromjson("{_id: 4, $sortKey: {'': 4}}"), - fromjson("{_id: 8, $sortKey: {'': 8}}")}; - responses.emplace_back(_nss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor()->waitForEvent(readyEvent); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); + // Now that all remotes have responded, ARM is ready to return results. ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 3, $sortKey: {'': 3}}"), - *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 4, $sortKey: {'': 4}}"), - *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 5, $sortKey: {'': 5}}"), + ASSERT_TRUE(arm->remotesExhausted()); + + // ARM returns all results in sorted order. + executor()->waitForEvent(readyEvent); + ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 6, $sortKey: {'': 6}}"), + ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 5}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 8, $sortKey: {'': 8}}"), + ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 6}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 9, $sortKey: {'': 9}}"), + ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady()).getResult()); + + // After returning all the buffered results, the ARM returns EOF immediately because both + // shards cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } -TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { - BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}, batchSize: 2}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); +TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {})); + cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 6, {})); + makeCursorFromExistingCursors(std::move(cursors)); + // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + + // Schedule requests. auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); + // First shard responds; the handleBatchResponse callback is run and ARM's remote gets updated. std::vector<CursorResponse> responses; - std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"), - fromjson("{$sortKey: {'': 6}}")}; - responses.emplace_back(_nss, CursorId(1), batch1); - std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"), - fromjson("{$sortKey: {'': 4}}")}; - responses.emplace_back(_nss, CursorId(0), batch2); - std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 7}}"), - fromjson("{$sortKey: {'': 8}}")}; - responses.emplace_back(_nss, CursorId(2), batch3); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor()->waitForEvent(readyEvent); + std::vector<BSONObj> batch1 = { + fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; + responses.emplace_back(_nss, CursorId(5), batch1); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); + // ARM is ready to return first result. ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 3}}"), - *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 4}}"), - *unittest::assertGet(arm->nextReady()).getResult()); + + // ARM is not exhausted, because second shard has yet to respond and first shard's response did + // not contain cursorId=0. + ASSERT_FALSE(arm->remotesExhausted()); + + // ARM returns results from first shard immediately. + executor()->waitForEvent(readyEvent); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 5}}"), - *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 6}}"), - *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); + // There are no further buffered results, so ARM is not ready. ASSERT_FALSE(arm->ready()); + + // Make next event to be signaled. readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); + // Second shard responds; the handleBatchResponse callback is run and ARM's remote gets updated. responses.clear(); - std::vector<BSONObj> batch4 = {fromjson("{$sortKey: {'': 7}}"), - fromjson("{$sortKey: {'': 10}}")}; - responses.emplace_back(_nss, CursorId(0), batch4); + std::vector<BSONObj> batch2 = { + fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; + responses.emplace_back(_nss, CursorId(0), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor()->waitForEvent(readyEvent); + // ARM is ready to return second shard's results. ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 7}}"), - *unittest::assertGet(arm->nextReady()).getResult()); + + // ARM is not exhausted, because first shard's response did not contain cursorId=0. + ASSERT_FALSE(arm->remotesExhausted()); + + // ARM returns results from second shard immediately. + executor()->waitForEvent(readyEvent); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 7}}"), - *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 8}}"), - *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); + // ARM is not ready to return results until further results are obtained from first shard. ASSERT_FALSE(arm->ready()); + + // Make next event to be signaled. readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); + // First shard returns remainder of results. responses.clear(); - std::vector<BSONObj> batch5 = {fromjson("{$sortKey: {'': 9}}"), - fromjson("{$sortKey: {'': 10}}")}; - responses.emplace_back(_nss, CursorId(0), batch5); + std::vector<BSONObj> batch3 = { + fromjson("{_id: 7}"), fromjson("{_id: 8}"), fromjson("{_id: 9}")}; + responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor()->waitForEvent(readyEvent); + // ARM is ready to return remaining results. ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 9}}"), - *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_TRUE(arm->remotesExhausted()); + + // ARM returns remaining results immediately. + executor()->waitForEvent(readyEvent); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 10}}"), - *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 10}}"), - *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult()); + + // After returning all the buffered results, the ARM returns EOF immediately because both + // shards cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } -TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) { - BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}, batchSize: 2}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); +TEST_F(AsyncResultsMergerTest, CompoundSortKey) { + BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}"); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, {})); + cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 6, {})); + cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 7, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); + // Schedule requests. ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); + // Deliver responses. std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5, '': 9}}"), fromjson("{$sortKey: {'': 4, '': 20}}")}; @@ -500,10 +567,13 @@ TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) { std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"), fromjson("{$sortKey: {'': 5, '': 9}}")}; responses.emplace_back(_nss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); + // ARM returns all results in sorted order. ASSERT_TRUE(arm->ready()); + ASSERT_TRUE(arm->remotesExhausted()); ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 10, '': 11}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); @@ -521,13 +591,18 @@ TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) { ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"), *unittest::assertGet(arm->nextReady()).getResult()); + + // After returning all the buffered results, the ARM returns EOF immediately because both + // shards cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } -TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) { - BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}, batchSize: 2}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); +TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { + BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}"); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -537,7 +612,8 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -550,133 +626,126 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) { executor()->waitForEvent(killEvent); } +TEST_F(AsyncResultsMergerTest, HasFirstBatch) { + std::vector<BSONObj> firstBatch = { + fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch))); + makeCursorFromExistingCursors(std::move(cursors)); + + // Because there was firstBatch, ARM is immediately ready to return results. + ASSERT_TRUE(arm->ready()); + + // Because the cursorId was not zero, ARM is not exhausted. + ASSERT_FALSE(arm->remotesExhausted()); -TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { - // Initial batchSize sent with the find command is zero; batchSize sent with each getMore - // command is one. - BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 0}"); - const long long getMoreBatchSize = 1LL; - makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]}, getMoreBatchSize); + // ARM returns the correct results. + ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_TRUE(arm->ready()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_TRUE(arm->ready()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); + // Now that the firstBatch results have been returned, ARM must wait for further results. ASSERT_FALSE(arm->ready()); + + // Schedule requests. auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + + // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); - // Both shards give back empty responses. Second shard doesn't have any results so it - // sends back a cursor id of zero. + // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. std::vector<CursorResponse> responses; - responses.emplace_back(_nss, CursorId(1), std::vector<BSONObj>()); - responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>()); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - - // In handling the responses from the first shard, the ARM should have already asked - // for an additional batch from that shard. It won't have anything to return until it - // gets a non-empty response. - ASSERT_FALSE(arm->ready()); - responses.clear(); - std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; - responses.emplace_back(_nss, CursorId(1), batch1); + std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; + responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor()->waitForEvent(readyEvent); + // Now that the responses have been delivered, ARM is ready to return results. ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - - // The shard responds with another empty batch but leaves the cursor open. It probably shouldn't - // do this, but there's no reason the ARM can't handle this by asking for more. - responses.clear(); - responses.emplace_back(_nss, CursorId(1), std::vector<BSONObj>()); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + // Because the response contained a cursorId of 0, ARM marked the remote as exhausted. + ASSERT_TRUE(arm->remotesExhausted()); - // The shard responds with another batch and closes the cursor. - ASSERT_FALSE(arm->ready()); - responses.clear(); - std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(0), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + // ARM returns the correct results. executor()->waitForEvent(readyEvent); - + ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_TRUE(arm->ready()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); + + // After returning all the buffered results, ARM returns EOF immediately because the cursor was + // exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } -TEST_F(AsyncResultsMergerTest, ReceivedViewDefinitionFromShard) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); - - ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - - std::string inputNs = "views_sharded.coll"; - std::string inputPipeline = "[ { $match: { a: { $gte: 5.0 } } } ]"; - scheduleNetworkViewResponse(inputNs, inputPipeline); - - executor()->waitForEvent(readyEvent); +TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) { + std::vector<BSONObj> firstBatch = { + fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch))); + cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 0, {})); + makeCursorFromExistingCursors(std::move(cursors)); + // Because there was firstBatch, ARM is immediately ready to return results. ASSERT_TRUE(arm->ready()); - auto statusWithNext = arm->nextReady(); - ASSERT(!statusWithNext.isOK()); - ASSERT_EQ(statusWithNext.getStatus().code(), - ErrorCodes::CommandOnShardedViewNotSupportedOnMongod); - - ASSERT(_params->viewDefinition); - - auto outputPipeline = (*_params->viewDefinition)["pipeline"]; - ASSERT(!outputPipeline.eoo()); - ASSERT_BSONOBJ_EQ(fromjson(inputPipeline), outputPipeline.Obj()); - - auto outputNs = (*_params->viewDefinition)["ns"]; - ASSERT(!outputNs.eoo()); - ASSERT_EQ(outputNs.String(), inputNs); - - // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(nullptr); - executor()->waitForEvent(killEvent); -} + // Because one of the remotes' cursorId was not zero, ARM is not exhausted. + ASSERT_FALSE(arm->remotesExhausted()); -TEST_F(AsyncResultsMergerTest, ExistingCursors) { - makeCursorFromExistingCursors({{kTestShardHosts[0], 5}, {kTestShardHosts[1], 6}}); + // ARM returns the correct results. + ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_TRUE(arm->ready()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_TRUE(arm->ready()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); + // Now that the firstBatch results have been returned, ARM must wait for further results. ASSERT_FALSE(arm->ready()); + + // Schedule requests. auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + + // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. std::vector<CursorResponse> responses; - std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(0), batch1); - std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(_nss, CursorId(0), batch2); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - - executor()->waitForEvent(readyEvent); + std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; + responses.emplace_back(_nss, CursorId(0), batch); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); + // Now that the responses have been delivered, ARM is ready to return results. ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); + + // Because the response contained a cursorId of 0, ARM marked the remote as exhausted. + ASSERT_TRUE(arm->remotesExhausted()); + + // ARM returns the correct results. + executor()->waitForEvent(readyEvent); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); + + // After returning all the buffered results, ARM returns EOF immediately because the cursor was + // exhausted. ASSERT_TRUE(arm->ready()); - ASSERT(unittest::assertGet(arm->nextReady()).isEOF()); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } - TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { - BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 2, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -688,7 +757,8 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { responses.emplace_back(_nss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(2), batch2); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -744,35 +814,17 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { } TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector<CursorResponse> responses; - std::vector<BSONObj> batch1 = { - fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - responses.emplace_back(_nss, CursorId(123), batch1); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor()->waitForEvent(readyEvent); - - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_FALSE(arm->ready()); - - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - - responses.clear(); - std::vector<BSONObj> batch2 = { - fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(_nss, CursorId(456), batch2); + std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; + responses.emplace_back(_nss, CursorId(456), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -786,8 +838,11 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { } TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {})); + cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 456, {})); + cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 789, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -795,14 +850,13 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1) - .toBSON(CursorResponse::ResponseType::InitialResponse); + .toBSON(CursorResponse::ResponseType::SubsequentResponse); BSONObj response2 = fromjson("{foo: 'bar'}"); std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")}; - BSONObj response3 = CursorResponse(_nss, CursorId(456), batch3) - .toBSON(CursorResponse::ResponseType::InitialResponse); + BSONObj response3 = CursorResponse(_nss, CursorId(789), batch3) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); scheduleNetworkResponseObjs({response1, response2, response3}); executor()->waitForEvent(readyEvent); - ASSERT_TRUE(arm->ready()); auto statusWithNext = arm->nextReady(); ASSERT(!statusWithNext.isOK()); @@ -813,8 +867,11 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { } TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 2, {})); + cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 3, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -825,7 +882,8 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { responses.emplace_back(_nss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(2), batch2); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"}); executor()->waitForEvent(readyEvent); @@ -842,8 +900,9 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { } TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -854,7 +913,8 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -870,8 +930,10 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { } TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); + executor()->shutdown(); ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent(nullptr).getStatus()); auto killEvent = arm->kill(nullptr); @@ -879,8 +941,9 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { } TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatches) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); // Make a request to the shard that will never get answered. ASSERT_FALSE(arm->ready()); @@ -895,8 +958,9 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch } TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto killedEvent = arm->kill(nullptr); @@ -910,8 +974,11 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { } TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { - BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 2, {})); + cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -924,7 +991,8 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { responses.emplace_back(_nss, CursorId(0), batch2); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(123), batch3); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); // Kill should be able to return right away if there are no pending batches. auto killedEvent = arm->kill(nullptr); @@ -934,8 +1002,11 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { } TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { - BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 2, {})); + cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 3, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -944,85 +1015,24 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch1); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - - // Kill event will only be signalled once the pending batches are received. - auto killedEvent = arm->kill(nullptr); - - // After the kill, the ARM waits for outstanding batches to come back. This ensures that we - // receive cursor ids for any established remote cursors, and can clean them up by issuing - // killCursors commands. - responses.clear(); - std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(_nss, CursorId(123), batch2); - std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")}; - responses.emplace_back(_nss, CursorId(0), batch2); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - - // Only one of the responses has a non-zero cursor id. The ARM should have issued a killCursors - // command against this id. - BSONObj expectedCmdObj = BSON("killCursors" - << "testcoll" - << "cursors" - << BSON_ARRAY(CursorId(123))); - ASSERT_BSONOBJ_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj); - - // Ensure that we properly signal both those waiting for the kill, and those waiting for more - // results to be ready. - executor()->waitForEvent(readyEvent); - executor()->waitForEvent(killedEvent); -} - -TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) { - BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); - - ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - - std::vector<CursorResponse> responses; - std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(123), batch1); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - - // First batch received. - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); - - // This will schedule a getMore on cursor id 123. - ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - - auto killedEvent = arm->kill(nullptr); - - // The kill can't complete until the getMore response is received. - responses.clear(); - std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(_nss, CursorId(123), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - // While processing the getMore response, a killCursors against id 123 should have been - // scheduled. - BSONObj expectedCmdObj = BSON("killCursors" - << "testcoll" - << "cursors" - << BSON_ARRAY(CursorId(123))); - ASSERT_BSONOBJ_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj); + // Kill event will only be signalled once the callbacks for the pending batches have run. + auto killedEvent = arm->kill(nullptr); - // Ensure that we properly signal both those waiting for the kill, and those waiting for more - // results to be ready. + // The pending requests have been canceled, so run their callbacks. + runReadyCallbacks(); + + // Ensure that we properly signal those waiting for more results to be ready. executor()->waitForEvent(readyEvent); executor()->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { - BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -1031,7 +1041,8 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); auto killedEvent = arm->kill(nullptr); @@ -1042,8 +1053,9 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { } TEST_F(AsyncResultsMergerTest, KillCalledTwice) { - BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); auto killedEvent1 = arm->kill(nullptr); ASSERT(killedEvent1.isValid()); auto killedEvent2 = arm->kill(nullptr); @@ -1054,7 +1066,9 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) { TEST_F(AsyncResultsMergerTest, TailableBasic) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -1063,7 +1077,8 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(123), batch1); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1100,7 +1115,9 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -1110,7 +1127,8 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch; responses.emplace_back(_nss, CursorId(123), batch); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); // After receiving an empty batch, the ARM should return boost::none, but remotes should not be @@ -1125,7 +1143,9 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -1135,7 +1155,8 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch; responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); // Afterwards, the ARM should return boost::none and remote cursors should be marked as @@ -1147,7 +1168,9 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 3}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -1157,7 +1180,8 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1188,9 +1212,12 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { } TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) { - BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); - makeCursorFromFindCmd( - findCmd, {kTestShardIds[0]}, boost::none, ReadPreferenceSetting(ReadPreference::Nearest)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors), + boost::none, + boost::none, + ReadPreferenceSetting(ReadPreference::Nearest)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -1202,7 +1229,8 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(0), batch1); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1213,13 +1241,17 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) { TEST_F(AsyncResultsMergerTest, AllowPartialResults) { BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); - makeCursorFromFindCmd(findCmd, kTestShardIds); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 97, {})); + cursors.emplace_back(kTestShardHosts[1], CursorResponse(_nss, 98, {})); + cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 99, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); - // An unretriable error occurs with the first host. + // An error occurs with the first host. scheduleErrorResponse({ErrorCodes::AuthenticationFailed, "authentication failed"}); ASSERT_FALSE(arm->ready()); @@ -1230,7 +1262,8 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { responses.emplace_back(_nss, CursorId(98), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(99), batch2); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1275,7 +1308,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 98, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -1284,7 +1319,8 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(98), batch); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1303,81 +1339,12 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } -TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); - - ASSERT_FALSE(arm->ready()); - - // First and second attempts return an error. - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); - executor()->waitForEvent(readyEvent); - ASSERT_FALSE(arm->ready()); - - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); - - executor()->waitForEvent(readyEvent); - ASSERT_FALSE(arm->ready()); - - // Third attempt succeeds. - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - std::vector<CursorResponse> responses; - std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; - responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor()->waitForEvent(readyEvent); - - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); - - ASSERT_TRUE(arm->remotesExhausted()); - ASSERT_TRUE(arm->ready()); -} - -TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); - - ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - - // All attempts return an error (one attempt plus three retries) - scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); - executor()->waitForEvent(readyEvent); - ASSERT_FALSE(arm->ready()); - - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); - executor()->waitForEvent(readyEvent); - ASSERT_FALSE(arm->ready()); - - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); - executor()->waitForEvent(readyEvent); - ASSERT_FALSE(arm->ready()); - - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); - executor()->waitForEvent(readyEvent); - ASSERT_TRUE(arm->ready()); - - auto status = arm->nextReady(); - ASSERT_EQ(status.getStatus().code(), ErrorCodes::NotMasterNoSlaveOk); - - // Protocol is to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(nullptr); - executor()->waitForEvent(killEvent); -} - -TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) { +TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) { BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 2, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -1387,30 +1354,12 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - - // From the second host all attempts return an error (one attempt plus three retries) - scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); - executor()->waitForEvent(readyEvent); - ASSERT_FALSE(arm->ready()); - - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); - executor()->waitForEvent(readyEvent); - ASSERT_FALSE(arm->ready()); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); + // From the second host we get a network (retriable) error. scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); - executor()->waitForEvent(readyEvent); - ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); - executor()->waitForEvent(readyEvent); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1420,57 +1369,39 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) { ASSERT_TRUE(arm->ready()); } -TEST_F(AsyncResultsMergerTest, ErrorAtFirstAttemptAtSameTimeShouldEventuallyReturnResults) { +TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardHosts[2], CursorResponse(_nss, 2, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); - // Both hosts return an error which indicates that the request should be retried. + // Both hosts return network (retriable) errors. scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); executor()->waitForEvent(readyEvent); - ASSERT_FALSE(arm->ready()); - - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - ASSERT_FALSE(arm->ready()); - - - // Return valid data on both hosts. - { - std::vector<CursorResponse> responses; - std::vector<BSONObj> batch = {fromjson("{_id: 1, $sortKey: {'': 1}}")}; - responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::InitialResponse); - } - - { - std::vector<CursorResponse> responses; - std::vector<BSONObj> batch = {fromjson("{_id: 2, $sortKey: {'': 2}}}")}; - responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::InitialResponse); - } - - executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 1, $sortKey: {'': 1}}}"), - *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 2, $sortKey: {'': 2}}}}"), - *unittest::assertGet(arm->nextReady()).getResult()); + auto statusWithNext = arm->nextReady(); + ASSERT(!statusWithNext.isOK()); + ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::HostUnreachable); + ASSERT_EQ(statusWithNext.getStatus().reason(), "host unreachable"); - ASSERT_TRUE(arm->remotesExhausted()); - ASSERT_TRUE(arm->ready()); + // Required to kill the 'arm' on error before destruction. + auto killEvent = arm->kill(nullptr); + executor()->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -1479,7 +1410,8 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(123), batch1); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1513,9 +1445,13 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } + TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); + ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); auto killEvent = arm->kill(nullptr); executor()->waitForEvent(killEvent); @@ -1523,7 +1459,10 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); + ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); auto killEvent = arm->kill(nullptr); executor()->waitForEvent(killEvent); @@ -1531,7 +1470,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); @@ -1544,33 +1485,6 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) { executor()->waitForEvent(killEvent); } -TEST_F(AsyncResultsMergerTest, RetryWhenShardHasRetriableErrorInBetweenReadyAndNextEvent) { - BSONObj findCmd = fromjson("{find: 'testcoll'}"); - makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); - - ASSERT_FALSE(arm->ready()); - - // First attempt returns a retriable error. - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); - - // We expect to be able to retrieve another event, and be waiting on the retry to succeed. - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); - std::vector<CursorResponse> responses; - std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - - executor()->waitForEvent(readyEvent); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_TRUE(arm->remotesExhausted()); - ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); -} - } // namespace } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 45d5d76a147..1754b233eca 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -86,11 +86,6 @@ public: virtual UserNameIterator getAuthenticatedUsers() const = 0; /** - * Returns the view definition associated with this cursor, if any. - */ - virtual boost::optional<BSONObj> viewDefinition() const = 0; - - /** * Returns the number of result documents returned so far by this cursor via the next() method. */ virtual long long getNumReturnedSoFar() const = 0; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index dec40936d7c..1bd2bd644a8 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -104,10 +104,6 @@ UserNameIterator ClusterClientCursorImpl::getAuthenticatedUsers() const { _params.authenticatedUsers.end()); } -boost::optional<BSONObj> ClusterClientCursorImpl::viewDefinition() const { - return _params.viewDefinition; -} - long long ClusterClientCursorImpl::getNumReturnedSoFar() const { return _numReturnedSoFar; } diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 27b31dba14c..bb08223f618 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -103,8 +103,6 @@ public: UserNameIterator getAuthenticatedUsers() const final; - boost::optional<BSONObj> viewDefinition() const final; - long long getNumReturnedSoFar() const final; void queueResult(const ClusterQueryResult& result) final; diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 8b4d64f4d41..9b092358920 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -85,10 +85,6 @@ UserNameIterator ClusterClientCursorMock::getAuthenticatedUsers() const { return makeUserNameIterator(emptyAuthenticatedUsers.begin(), emptyAuthenticatedUsers.end()); } -boost::optional<BSONObj> ClusterClientCursorMock::viewDefinition() const { - return boost::none; -} - void ClusterClientCursorMock::queueResult(const ClusterQueryResult& result) { _resultsQueue.push({result}); } diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 23d857d59d6..5ee4143f45f 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -51,8 +51,6 @@ public: UserNameIterator getAuthenticatedUsers() const final; - boost::optional<BSONObj> viewDefinition() const final; - long long getNumReturnedSoFar() const final; void queueResult(const ClusterQueryResult& result) final; diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 03db1e37194..2fcaf317b3b 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -37,6 +37,7 @@ #include "mongo/db/auth/user_name.h" #include "mongo/db/cursor_id.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/query/cursor_response.h" #include "mongo/s/client/shard.h" #include "mongo/util/net/hostandport.h" @@ -44,57 +45,30 @@ namespace mongo { class OperationContext; +/** + * The resulting ClusterClientCursor will take ownership of the existing remote cursor, generating + * results based on the cursor's current state. + * + * Note that any results already generated from this cursor will not be returned by the resulting + * ClusterClientCursor. The caller is responsible for ensuring that results previously generated by + * this cursor have been processed. + */ struct ClusterClientCursorParams { // When mongos has to do a merge in order to return results to the client in the correct sort // order, it requests a sortKey meta-projection using this field name. static const char kSortKeyField[]; - /** - * Contains any CCC parameters that are specified per-remote node. - */ - struct Remote { - /** - * Use when a new cursor should be created on the remote. - */ - Remote(ShardId sid, BSONObj cmdObj) : shardId(std::move(sid)), cmdObj(std::move(cmdObj)) {} - - /** - * Use when an a cursor already exists on the remote. The resulting CCC will take ownership - * of the existing remote cursor, generating results based on its current state. - * - * Note that any results already generated from this cursor will not be returned by the - * resulting CCC. The caller is responsible for ensuring that results previously generated - * by this cursor have been processed. - */ - Remote(HostAndPort hostAndPort, CursorId cursorId) - : hostAndPort(std::move(hostAndPort)), cursorId(cursorId) {} - - // If this is a regular query cursor, this value will be set and shard id retargeting may - // occur on certain networking or replication errors. - // - // If this is an externally-prepared cursor (as is in the case of aggregation cursors), - // this value will never be set and no retargeting will occur. - boost::optional<ShardId> shardId; - - // If this is an externally-specified cursor (e.g. aggregation), this value will be set and - // used directly and no re-targeting may happen on errors. - boost::optional<HostAndPort> hostAndPort; - - // The raw command parameters to send to this remote (e.g. the find command specification). - // - // Exactly one of 'cmdObj' or 'cursorId' must be set. - boost::optional<BSONObj> cmdObj; - - // The cursorId for the remote node, if one already exists. - // - // Exactly one of 'cmdObj' or 'cursorId' must be set. - boost::optional<CursorId> cursorId; + struct RemoteCursor { + RemoteCursor(HostAndPort hostAndPort, CursorResponse cursorResponse) + : hostAndPort(std::move(hostAndPort)), cursorResponse(std::move(cursorResponse)) {} + + // The exact host (within the shard) on which the cursor resides. + HostAndPort hostAndPort; + + // Encompasses the state of the established cursor. + CursorResponse cursorResponse; }; - /** - * Read preference must be provided if initial shard host targeting is necessary (i.e., we don't - * know yet the remote cursor id). - */ ClusterClientCursorParams(NamespaceString nss, UserNameIterator authenticatedUsersIter, boost::optional<ReadPreferenceSetting> readPref = boost::none) @@ -107,14 +81,14 @@ struct ClusterClientCursorParams { } } - // Namespace against which to query. + // Namespace against which the cursors exist. NamespaceString nsString; // The set of authenticated users when this cursor was created. std::vector<UserName> authenticatedUsers; // Per-remote node data. - std::vector<Remote> remotes; + std::vector<RemoteCursor> remotes; // The sort specification. Leave empty if there is no sort. BSONObj sort; @@ -137,18 +111,12 @@ struct ClusterClientCursorParams { // Whether this cursor has the awaitData option set. bool isAwaitData = false; - // Read preference for where to target the query. This value is only set if initial shard host - // targeting is necessary and not used if using externally prepared cursor ids. + // Set if a readPreference must be respected throughout the lifetime of the cursor. boost::optional<ReadPreferenceSetting> readPreference; // Whether the client indicated that it is willing to receive partial results in the case of an // unreachable host. bool isAllowPartialResults = false; - - // If the read is done against a view, an error is returned along with the view definition in - // the first response from the primary shard for the base collection. Calling code can re-run - // the read against the base collection by using this returned view definition. - boost::optional<BSONObj> viewDefinition; }; } // mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index a6dcf1de83e..7249eebc7eb 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -52,6 +52,7 @@ #include "mongo/s/grid.h" #include "mongo/s/query/cluster_client_cursor_impl.h" #include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/stale_exception.h" #include "mongo/stdx/memory.h" @@ -159,6 +160,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, auto shardRegistry = Grid::get(opCtx)->shardRegistry(); // Get the set of shards on which we will run the query. + std::vector<std::shared_ptr<Shard>> shards; if (primary) { shards.emplace_back(std::move(primary)); @@ -180,6 +182,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, } } + // Construct the query and parameters. + ClusterClientCursorParams params( query.nss(), AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), @@ -213,12 +217,12 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, return qrToForward.getStatus(); } - // Use read pref to target a particular host from each shard. Also construct the find command - // that we will forward to each shard. + // Construct the find command that we will use to establish cursors, attaching the shardVersion. + + std::vector<std::pair<ShardId, BSONObj>> requests; for (const auto& shard : shards) { invariant(!shard->isConfig() || shard->getConnString().type() != ConnectionString::INVALID); - // Build the find command, and attach shard version if necessary. BSONObjBuilder cmdBuilder; qrToForward.getValue()->asFindCommand(&cmdBuilder); @@ -230,28 +234,44 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, version.appendForCommands(&cmdBuilder); } - params.remotes.emplace_back(shard->getId(), cmdBuilder.obj()); + requests.emplace_back(shard->getId(), cmdBuilder.obj()); } + // Establish the cursors with a consistent shardVersion across shards. + + auto swCursors = establishCursors(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + query.nss(), + readPref, + requests, + query.getQueryRequest().isAllowPartialResults(), + viewDefinition); + if (!swCursors.isOK()) { + // Make sure a viewDefinition was set if the find was on a view. + if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swCursors.getStatus().code()) { + if (!viewDefinition) { + return {ErrorCodes::InternalError, + str::stream() << "Missing resolved view definition, but remote returned " + << ErrorCodes::errorString(swCursors.getStatus().code())}; + } + } + return swCursors.getStatus(); + } + + // Transfer the established cursors to a ClusterClientCursor. + + params.remotes = std::move(swCursors.getValue()); auto ccc = ClusterClientCursorImpl::make( opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); + // Retrieve enough data from the ClusterClientCursor for the first batch of results. + auto cursorState = ClusterCursorManager::CursorState::NotExhausted; int bytesBuffered = 0; while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) { auto next = ccc->next(opCtx); if (!next.isOK()) { - if (viewDefinition && - ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == next.getStatus().code()) { - if (!ccc->viewDefinition()) { - return {ErrorCodes::InternalError, - str::stream() - << "Missing resolved view definition, but remote returned " - << ErrorCodes::errorString(next.getStatus().code())}; - } - *viewDefinition = BSON("resolvedView" << *ccc->viewDefinition()); - } return next.getStatus(); } @@ -291,7 +311,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, return CursorId(0); } - // Register the cursor with the cursor manager. + // Register the cursor with the cursor manager for subsequent getMore's. + auto cursorManager = Grid::get(opCtx)->getCursorManager(); const auto cursorType = chunkManager ? ClusterCursorManager::CursorType::NamespaceSharded : ClusterCursorManager::CursorType::NamespaceNotSharded; diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp new file mode 100644 index 00000000000..240b47d4fb9 --- /dev/null +++ b/src/mongo/s/query/establish_cursors.cpp @@ -0,0 +1,162 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/s/query/establish_cursors.h" + +#include "mongo/client/remote_command_targeter.h" +#include "mongo/db/query/cursor_response.h" +#include "mongo/db/query/getmore_request.h" +#include "mongo/db/query/killcursors_request.h" +#include "mongo/executor/remote_command_request.h" +#include "mongo/executor/remote_command_response.h" +#include "mongo/s/async_requests_sender.h" +#include "mongo/s/grid.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishCursors( + OperationContext* opCtx, + executor::TaskExecutor* executor, + const NamespaceString& nss, + const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + bool allowPartialResults, + BSONObj* viewDefinition) { + // Construct the requests + std::vector<AsyncRequestsSender::Request> requests; + for (const auto& remote : remotes) { + requests.emplace_back(remote.first, remote.second); + } + + // Send the requests + AsyncRequestsSender ars(opCtx, executor, nss.db().toString(), std::move(requests), readPref); + + // Get the responses + std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; + Status status = Status::OK(); + while (!ars.done()) { + auto response = ars.next(); + + StatusWith<CursorResponse> swCursorResponse( + response.swResponse.isOK() + ? CursorResponse::parseFromBSON(response.swResponse.getValue().data) + : response.swResponse.getStatus()); + + if (swCursorResponse.isOK()) { + remoteCursors.emplace_back(std::move(*response.shardHostAndPort), + std::move(swCursorResponse.getValue())); + continue; + } + + // In the case a read is performed against a view, the shard primary can return an error + // indicating that the underlying collection may be sharded. When this occurs the return + // message will include an expanded view definition and collection namespace which we + // need to store. This allows for a second attempt at the read directly against the + // underlying collection. + if (swCursorResponse.getStatus() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { + auto& responseObj = response.swResponse.getValue().data; + if (!responseObj.hasField("resolvedView")) { + status = Status(ErrorCodes::InternalError, + str::stream() << "Missing field 'resolvedView' in document: " + << responseObj); + break; + } + + auto resolvedViewObj = responseObj.getObjectField("resolvedView"); + if (resolvedViewObj.isEmpty()) { + status = Status(ErrorCodes::InternalError, + str::stream() << "Field 'resolvedView' must be an object: " + << responseObj); + break; + } + + status = std::move(swCursorResponse.getStatus()); + if (viewDefinition) { + *viewDefinition = BSON("resolvedView" << resolvedViewObj.getOwned()); + } + break; + } + + // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. + if (allowPartialResults) { + continue; + } + status = std::move(swCursorResponse.getStatus()); + break; + } + + // If one of the remotes had an error, we make a best effort to finish retrieving responses for + // other requests that were already sent, so that we can send killCursors to any cursors that we + // know were established. + if (!status.isOK()) { + // Do not schedule any new requests. + ars.stopRetrying(); + + // Collect responses from all requests that were already sent. + while (!ars.done()) { + auto response = ars.next(); + + // Check if the response contains an established cursor, and if so, store it. + StatusWith<CursorResponse> swCursorResponse( + response.swResponse.isOK() + ? CursorResponse::parseFromBSON(response.swResponse.getValue().data) + : response.swResponse.getStatus()); + + if (swCursorResponse.isOK()) { + remoteCursors.emplace_back(*response.shardHostAndPort, + std::move(swCursorResponse.getValue())); + } + } + + // Schedule killCursors against all cursors that were established. + for (const auto& remoteCursor : remoteCursors) { + BSONObj cmdObj = + KillCursorsRequest(nss, {remoteCursor.cursorResponse.getCursorId()}).toBSON(); + executor::RemoteCommandRequest request( + remoteCursor.hostAndPort, nss.db().toString(), cmdObj, opCtx); + + // We do not process the response to the killCursors request (we make a good-faith + // attempt at cleaning up the cursors, but ignore any returned errors). + executor->scheduleRemoteCommand( + request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {}); + } + + return status; + } + + return std::move(remoteCursors); +} + +} // namespace mongo diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h new file mode 100644 index 00000000000..4d921ad7a8a --- /dev/null +++ b/src/mongo/s/query/establish_cursors.h @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/cursor_id.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/query/cluster_client_cursor_params.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class CursorResponse; + +/** + * Establishes cursors on the remote shards by issuing requests in parallel, using the readPref to + * select a host within each shard. + * + * If any of the cursors fails to be established, performs cleanup by sending killCursors to any + * cursors that were established and returns a non-OK status. + * + * If an OK status is returned, the ownership of the cursors is transferred to the caller. This + * means the caller is now responsible for either exhausting the cursors or sending killCursors to + * them. + * + * @param allowPartialResults: If true, unreachable hosts are ignored, and only cursors established + * on reachable hosts are returned. + * @param viewDefinition: If the namespace represents a view, an error is returned and the view + * definition is stored in this parameter. Calling code can then attempt to + * establish cursors against the base collection using this viewDefinition. + */ +StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishCursors( + OperationContext* opCtx, + executor::TaskExecutor* executor, + const NamespaceString& nss, + const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + bool allowPartialResults, + BSONObj* viewDefinition); + +} // namespace mongo diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 4f53b2441bc..d2926602249 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -62,7 +62,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, ClusterClientCursorParams params( incomingCursorResponse.getValue().getNSS(), AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames()); - params.remotes.emplace_back(server, incomingCursorResponse.getValue().getCursorId()); + params.remotes.emplace_back( + server, CursorResponse(requestedNss, incomingCursorResponse.getValue().getCursorId(), {})); auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params)); |