summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2018-04-05 13:56:47 -0400
committerWilliam Schultz <william.schultz@mongodb.com>2018-04-05 13:59:27 -0400
commite88c6d85036607ddf86105234917b4adfffbd612 (patch)
tree43ff091111fec8836654b0ccc3dcaff6e3d2a518 /src/mongo/s
parentac6544a9194197b5ab10f563cf1a19dcdc42349c (diff)
downloadmongo-e88c6d85036607ddf86105234917b4adfffbd612.tar.gz
Revert "SERVER-33323 Use the IDL to serialize the ARM"
This reverts commit 7d09f278a2acf9791b36927d6af1d30347d60391.
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp77
-rw-r--r--src/mongo/s/commands/pipeline_s.cpp6
-rw-r--r--src/mongo/s/commands/strategy.cpp2
-rw-r--r--src/mongo/s/query/SConscript1
-rw-r--r--src/mongo/s/query/async_results_merger.cpp90
-rw-r--r--src/mongo/s/query/async_results_merger.h15
-rw-r--r--src/mongo/s/query/async_results_merger_params.idl89
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp658
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp41
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp3
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h39
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp5
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h2
-rw-r--r--src/mongo/s/query/cluster_find.cpp4
-rw-r--r--src/mongo/s/query/document_source_router_adapter.cpp4
-rw-r--r--src/mongo/s/query/document_source_router_adapter.h1
-rw-r--r--src/mongo/s/query/establish_cursors.cpp45
-rw-r--r--src/mongo/s/query/establish_cursors.h15
-rw-r--r--src/mongo/s/query/router_exec_stage.h8
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp21
-rw-r--r--src/mongo/s/query/router_stage_merge.h4
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp4
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h2
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.cpp21
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.h5
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp15
-rw-r--r--src/mongo/s/query/store_possible_cursor.h2
30 files changed, 495 insertions, 690 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index c0643205001..7c673f73b30 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -41,7 +41,6 @@
#include "mongo/db/curop.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
@@ -264,14 +263,26 @@ BSONObj createCommandForMergingShard(
return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), true);
}
-std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- CachedCollectionRoutingInfo* routingInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery,
- const BSONObj& collation) {
+/**
+ * Verifies that the shardIds are the same as they were atClusterTime using versioned table.
+ * TODO: SERVER-33767
+ */
+bool verifyTargetedShardsAtClusterTime(OperationContext* opCtx,
+ const std::set<ShardId>& shardIds,
+ LogicalTime atClusterTime) {
+ return true;
+}
+
+std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ CachedCollectionRoutingInfo* routingInfo,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery,
+ const BSONObj& collation) {
+>>>>>>> parent of 7d09f27... SERVER-33323 Use the IDL to serialize the ARM
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
bool mustRunOnAll = mustRunOnAllShards(nss, *routingInfo, litePipe);
@@ -341,7 +352,7 @@ struct DispatchShardPipelineResults {
// Populated if this *is not* an explain, this vector represents the cursors on the remote
// shards.
- std::vector<RemoteCursor> remoteCursors;
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
// Populated if this *is* an explain, this vector represents the results from each shard.
std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
@@ -379,7 +390,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
// pipeline is already split and we now only need to target a single shard, reassemble the
// original pipeline.
// - After exhausting 10 attempts to establish the cursors, we give up and throw.
- auto cursors = std::vector<RemoteCursor>();
+ auto cursors = std::vector<ClusterClientCursorParams::RemoteCursor>();
auto shardResults = std::vector<AsyncRequestsSender::Response>();
auto opCtx = expCtx->opCtx;
@@ -537,7 +548,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
BSONObj cmdToRunOnNewShards,
const LiteParsedPipeline& liteParsedPipeline,
std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
- std::vector<RemoteCursor> cursors) {
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx));
@@ -545,6 +556,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
params.tailableMode = pipelineForMerging->getContext()->tailableMode;
params.mergePipeline = std::move(pipelineForMerging);
params.remotes = std::move(cursors);
+
// A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
// size we pass here is used for getMores, so do not specify a batch size if the initial request
// had a batch size of 0.
@@ -554,19 +566,12 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
if (liteParsedPipeline.hasChangeStream()) {
// For change streams, we need to set up a custom stage to establish cursors on new shards
- // when they are added. Be careful to extract the targeted shard IDs before the remote
- // cursors are transferred from the ClusterClientCursorParams to the AsyncResultsMerger.
- std::vector<ShardId> shardIds;
- for (const auto& remote : params.remotes) {
- shardIds.emplace_back(remote.getShardId().toString());
- }
-
- params.createCustomCursorSource = [cmdToRunOnNewShards,
- shardIds](OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
+ // when they are added.
+ params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params) {
return stdx::make_unique<RouterStageUpdateOnAddShard>(
- opCtx, executor, params, std::move(shardIds), cmdToRunOnNewShards);
+ opCtx, executor, params, cmdToRunOnNewShards);
};
}
auto ccc = ClusterClientCursorImpl::make(
@@ -615,7 +620,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
ccc->detachFromOperationContext();
- int nShards = ccc->getNumRemotes();
+ int nShards = ccc->getRemotes().size();
CursorId clusterCursorId = 0;
if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
@@ -681,8 +686,7 @@ ShardId pickMergingShard(OperationContext* opCtx,
return dispatchResults.needsPrimaryShardMerge
? primaryShard
: dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())]
- .getShardId()
- .toString();
+ .shardId;
}
} // namespace
@@ -835,16 +839,15 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
auto executorPool = Grid::get(opCtx)->getExecutorPool();
const BSONObj reply = uassertStatusOK(storePossibleCursor(
opCtx,
- remoteCursor.getShardId().toString(),
- remoteCursor.getHostAndPort(),
- remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse),
+ remoteCursor.shardId,
+ remoteCursor.hostAndPort,
+ remoteCursor.cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse),
namespaces.requestedNss,
executorPool->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager(),
mergeCtx->tailableMode));
- return appendCursorResponseToCommandResult(
- remoteCursor.getShardId().toString(), reply, result);
+ return appendCursorResponseToCommandResult(remoteCursor.shardId, reply, result);
}
// If we reach here, we have a merge pipeline to dispatch.
@@ -880,10 +883,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
ShardId mergingShardId =
pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.db().primaryId());
- cluster_aggregation_planner::addMergeCursorsSource(
- mergingPipeline.get(),
+ mergingPipeline->addInitialSource(DocumentSourceMergeCursors::create(
std::move(dispatchResults.remoteCursors),
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ mergeCtx));
auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline);
auto mergeResponse =
@@ -978,8 +981,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
namespaces.requestedNss,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager(),
- liteParsedPipeline.hasChangeStream() ? TailableModeEnum::kTailableAndAwaitData
- : TailableModeEnum::kNormal));
+ liteParsedPipeline.hasChangeStream() ? TailableMode::kTailableAndAwaitData
+ : TailableMode::kNormal));
}
// First append the properly constructed writeConcernError. It will then be skipped
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp
index 4d91ddfff08..060c5533877 100644
--- a/src/mongo/s/commands/pipeline_s.cpp
+++ b/src/mongo/s/commands/pipeline_s.cpp
@@ -114,7 +114,7 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern);
}
- auto shardResult = std::vector<RemoteCursor>();
+ auto shardResult = std::vector<ClusterClientCursorParams::RemoteCursor>();
auto findCmd = cmdBuilder.obj();
size_t numAttempts = 0;
while (++numAttempts <= kMaxNumStaleVersionRetries) {
@@ -164,13 +164,13 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
invariant(shardResult.size() == 1u);
- auto& cursor = shardResult.front().getCursorResponse();
+ auto& cursor = shardResult.front().cursorResponse;
auto& batch = cursor.getBatch();
// We should have at most 1 result, and the cursor should be exhausted.
uassert(ErrorCodes::InternalError,
str::stream() << "Shard cursor was unexpectedly open after lookup: "
- << shardResult.front().getHostAndPort()
+ << shardResult.front().hostAndPort
<< ", id: "
<< cursor.getCursorId(),
cursor.getCursorId() == 0);
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index fa70221c4fd..708f52ac576 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -585,7 +585,7 @@ DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss
}
uassertStatusOK(statusGetDb);
- boost::optional<std::int64_t> batchSize;
+ boost::optional<long long> batchSize;
if (ntoreturn) {
batchSize = ntoreturn;
}
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index aa9a1713255..d9148577f5a 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -86,7 +86,6 @@ env.Library(
source=[
"async_results_merger.cpp",
"establish_cursors.cpp",
- env.Idlc('async_results_merger_params.idl')[0],
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/query/command_request_response",
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 45403399e8c..59d9a133d72 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -82,27 +82,20 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa
AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
executor::TaskExecutor* executor,
- AsyncResultsMergerParams params)
+ ClusterClientCursorParams* params)
: _opCtx(opCtx),
_executor(executor),
- // This strange initialization is to work around the fact that the IDL does not currently
- // support a default value for an enum. The default tailable mode should be 'kNormal', but
- // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'.
- _tailableMode(params.getTailableMode() ? *params.getTailableMode()
- : TailableModeEnum::kNormal),
- _params(std::move(params)),
- _mergeQueue(MergingComparator(_remotes,
- _params.getSort() ? *_params.getSort() : BSONObj(),
- _params.getCompareWholeSortKey())) {
+ _params(params),
+ _mergeQueue(MergingComparator(_remotes, _params->sort, _params->compareWholeSortKey)) {
size_t remoteIndex = 0;
- for (const auto& remote : _params.getRemotes()) {
- _remotes.emplace_back(remote.getHostAndPort(),
- remote.getCursorResponse().getNSS(),
- remote.getCursorResponse().getCursorId());
+ for (const auto& remote : _params->remotes) {
+ _remotes.emplace_back(remote.hostAndPort,
+ remote.cursorResponse.getNSS(),
+ remote.cursorResponse.getCursorId());
// We don't check the return value of _addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
- _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse());
+ _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse);
++remoteIndex;
}
}
@@ -130,7 +123,7 @@ bool AsyncResultsMerger::_remotesExhausted(WithLock) {
Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_tailableMode != TailableModeEnum::kTailableAndAwaitData) {
+ if (_params->tailableMode != TailableMode::kTailableAndAwaitData) {
return Status(ErrorCodes::BadValue,
"maxTimeMS can only be used with getMore for tailable, awaitData cursors");
}
@@ -140,9 +133,9 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
// recent optimes, which allows us to return sorted $changeStream results even if some shards
// are yet to provide a batch of data. If the timeout specified by the client is greater than
// 1000ms, then it will be enforced elsewhere.
- _awaitDataTimeout =
- (_params.getSort() && _remotes.size() > 1u ? std::min(awaitDataTimeout, Milliseconds{1000})
- : awaitDataTimeout);
+ _awaitDataTimeout = (!_params->sort.isEmpty() && _remotes.size() > 1u
+ ? std::min(awaitDataTimeout, Milliseconds{1000})
+ : awaitDataTimeout);
return Status::OK();
}
@@ -168,12 +161,13 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) {
_opCtx = opCtx;
}
-void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
+void AsyncResultsMerger::addNewShardCursors(
+ const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
for (auto&& remote : newCursors) {
- _remotes.emplace_back(remote.getHostAndPort(),
- remote.getCursorResponse().getNSS(),
- remote.getCursorResponse().getCursorId());
+ _remotes.emplace_back(remote.hostAndPort,
+ remote.cursorResponse.getNSS(),
+ remote.cursorResponse.getCursorId());
}
}
@@ -196,15 +190,16 @@ bool AsyncResultsMerger::_ready(WithLock lk) {
}
}
- return _params.getSort() ? _readySorted(lk) : _readyUnsorted(lk);
+ const bool hasSort = !_params->sort.isEmpty();
+ return hasSort ? _readySorted(lk) : _readyUnsorted(lk);
}
bool AsyncResultsMerger::_readySorted(WithLock lk) {
- if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ if (_params->tailableMode == TailableMode::kTailableAndAwaitData) {
return _readySortedTailable(lk);
}
// Tailable non-awaitData cursors cannot have a sort.
- invariant(_tailableMode == TailableModeEnum::kNormal);
+ invariant(_params->tailableMode == TailableMode::kNormal);
for (const auto& remote : _remotes) {
if (!remote.hasNext() && !remote.exhausted()) {
@@ -223,14 +218,13 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) {
auto smallestRemote = _mergeQueue.top();
auto smallestResult = _remotes[smallestRemote].docBuffer.front();
auto keyWeWantToReturn =
- extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey());
+ extractSortKey(*smallestResult.getResult(), _params->compareWholeSortKey);
for (const auto& remote : _remotes) {
if (!remote.promisedMinSortKey) {
// In order to merge sorted tailable cursors, we need this value to be populated.
return false;
}
- if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) >
- 0) {
+ if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, _params->sort) > 0) {
// The key we want to return is not guaranteed to be smaller than future results from
// this remote, so we can't yet return it.
return false;
@@ -270,12 +264,13 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
return {ClusterQueryResult()};
}
- return _params.getSort() ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
+ const bool hasSort = !_params->sort.isEmpty();
+ return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
}
ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) {
// Tailable non-awaitData cursors cannot have a sort.
- invariant(_tailableMode != TailableModeEnum::kTailable);
+ invariant(_params->tailableMode != TailableMode::kTailable);
if (_mergeQueue.empty()) {
return {};
@@ -309,7 +304,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) {
ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front();
_remotes[_gettingFromRemote].docBuffer.pop();
- if (_tailableMode == TailableModeEnum::kTailable &&
+ if (_params->tailableMode == TailableMode::kTailable &&
!_remotes[_gettingFromRemote].hasNext()) {
// The cursor is tailable and we're about to return the last buffered result. This
// means that the next value returned should be boost::none to indicate the end of
@@ -339,9 +334,9 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
// request to fetch the remaining docs only. If the remote node has a plan with OR for top k and
// a full sort as is the case for the OP_QUERY find then this optimization will prevent
// switching to the full sort plan branch.
- auto adjustedBatchSize = _params.getBatchSize();
- if (_params.getBatchSize() && *_params.getBatchSize() > remote.fetchedCount) {
- adjustedBatchSize = *_params.getBatchSize() - remote.fetchedCount;
+ auto adjustedBatchSize = _params->batchSize;
+ if (_params->batchSize && *_params->batchSize > remote.fetchedCount) {
+ adjustedBatchSize = *_params->batchSize - remote.fetchedCount;
}
BSONObj cmdObj = GetMoreRequest(remote.cursorNss,
@@ -353,7 +348,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
.toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx);
+ remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _opCtx);
auto callbackStatus =
_executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) {
@@ -453,8 +448,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
remote->cursorId = response.getCursorId();
if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) {
// We only expect to see this for change streams.
- invariant(_params.getSort());
- invariant(SimpleBSONObjComparator::kInstance.evaluate(*_params.getSort() ==
+ invariant(SimpleBSONObjComparator::kInstance.evaluate(_params->sort ==
change_stream_constants::kSortSpec));
auto newLatestTimestamp = *response.getLastOplogTimestamp();
@@ -481,7 +475,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
auto maxSortKeyFromResponse =
(response.getBatch().empty()
? BSONObj()
- : extractSortKey(response.getBatch().back(), _params.getCompareWholeSortKey()));
+ : extractSortKey(response.getBatch().back(), _params->compareWholeSortKey));
remote->promisedMinSortKey =
(compareSortKeys(
@@ -531,7 +525,7 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t
remote.status = std::move(status);
// Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
// remove the unreachable host entirely from consideration by marking it as exhausted.
- if (_params.getAllowPartialResults()) {
+ if (_params->isAllowPartialResults) {
remote.status = Status::OK();
// Clear the results buffer and cursor id.
@@ -571,7 +565,7 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk,
// through to the client as-is.
// (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from
// one shard means the end of the overall batch).
- if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) {
+ if (_params->tailableMode == TailableMode::kTailable && !remote.hasNext()) {
invariant(_remotes.size() == 1);
_eofNext = true;
} else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) {
@@ -588,7 +582,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
updateRemoteMetadata(&remote, response);
for (const auto& obj : response.getBatch()) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
- if (_params.getSort()) {
+ if (!_params->sort.isEmpty()) {
auto key = obj[AsyncResultsMerger::kSortKeyField];
if (!key) {
remote.status =
@@ -597,7 +591,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
<< "' in document: "
<< obj);
return false;
- } else if (!_params.getCompareWholeSortKey() && key.type() != BSONType::Object) {
+ } else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) {
remote.status =
Status(ErrorCodes::InternalError,
str::stream() << "Field '" << AsyncResultsMerger::kSortKeyField
@@ -612,9 +606,9 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
++remote.fetchedCount;
}
- // If we're doing a sorted merge, then we have to make sure to put this remote onto the merge
- // queue.
- if (_params.getSort() && !response.getBatch().empty()) {
+ // If we're doing a sorted merge, then we have to make sure to put this remote onto the
+ // merge queue.
+ if (!_params->sort.isEmpty() && !response.getBatch().empty()) {
_mergeQueue.push(remoteIndex);
}
return true;
@@ -644,10 +638,10 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx)
for (const auto& remote : _remotes) {
if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) {
- BSONObj cmdObj = KillCursorsRequest(_params.getNss(), {remote.cursorId}).toBSON();
+ BSONObj cmdObj = KillCursorsRequest(_params->nsString, {remote.cursorId}).toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, opCtx);
+ remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx);
// Send kill request; discard callback handle, if any, or failure report, if not.
_executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus().ignore();
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 1653374b5bc..e2551a3c7dd 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -37,7 +37,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/cursor_id.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/async_results_merger_params_gen.h"
+#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_query_result.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -98,7 +98,7 @@ public:
*/
AsyncResultsMerger(OperationContext* opCtx,
executor::TaskExecutor* executor,
- AsyncResultsMergerParams params);
+ ClusterClientCursorParams* params);
/**
* In order to be destroyed, either the ARM must have been kill()'ed or all cursors must have
@@ -195,11 +195,7 @@ public:
* Adds the specified shard cursors to the set of cursors to be merged. The results from the
* new cursors will be returned as normal through nextReady().
*/
- void addNewShardCursors(std::vector<RemoteCursor>&& newCursors);
-
- std::size_t getNumRemotes() const {
- return _remotes.size();
- }
+ void addNewShardCursors(const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors);
/**
* Starts shutting down this ARM by canceling all pending requests and scheduling killCursors
@@ -297,7 +293,7 @@ private:
private:
const std::vector<RemoteCursorData>& _remotes;
- const BSONObj _sort;
+ const BSONObj& _sort;
// When '_compareWholeSortKey' is true, $sortKey is a scalar value, rather than an object.
// We extract the sort key {$sortKey: <value>}. The sort key pattern '_sort' is verified to
@@ -405,8 +401,7 @@ private:
OperationContext* _opCtx;
executor::TaskExecutor* _executor;
- TailableModeEnum _tailableMode;
- AsyncResultsMergerParams _params;
+ ClusterClientCursorParams* _params;
// Must be acquired before accessing any data members (other than _params, which is read-only).
stdx::mutex _mutex;
diff --git a/src/mongo/s/query/async_results_merger_params.idl b/src/mongo/s/query/async_results_merger_params.idl
deleted file mode 100644
index dafc9b53c1c..00000000000
--- a/src/mongo/s/query/async_results_merger_params.idl
+++ /dev/null
@@ -1,89 +0,0 @@
-# Copyright (C) 2018 MongoDB Inc.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License, version 3,
-# as published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-# As a special exception, the copyright holders give permission to link the
-# code of portions of this program with the OpenSSL library under certain
-# conditions as described in each individual source file and distribute
-# linked combinations including the program with the OpenSSL library. You
-# must comply with the GNU Affero General Public License in all respects for
-# all of the code used other than as permitted herein. If you modify file(s)
-# with this exception, you may extend this exception to your version of the
-# file(s), but you are not obligated to do so. If you do not wish to do so,
-# delete this exception statement from your version. If you delete this
-# exception statement from all source files in the program, then also delete
-# it in the license file.
-
-global:
- cpp_namespace: "mongo"
- cpp_includes:
- - "mongo/s/shard_id.h"
- - "mongo/util/net/hostandport.h"
- - "mongo/db/query/cursor_response.h"
-
-imports:
- - "mongo/db/query/tailable_mode.idl"
- - "mongo/idl/basic_types.idl"
- - "mongo/util/net/hostandport.idl"
-
-types:
- CursorResponse:
- bson_serialization_type: object
- description: The first batch returned after establishing cursors on a shard.
- cpp_type: CursorResponse
- serializer: CursorResponse::toBSONAsInitialResponse
- deserializer: CursorResponse::parseFromBSONThrowing
-
-structs:
- RemoteCursor:
- description: A description of a cursor opened on a remote server.
- fields:
- shardId:
- type: string
- description: The shardId of the shard on which the cursor resides.
- hostAndPort:
- type: HostAndPort
- description: The exact host (within the shard) on which the cursor resides.
- cursorResponse:
- type: CursorResponse
- description: The response after establishing a cursor on the remote shard, including
- the first batch.
-
- AsyncResultsMergerParams:
- description: The parameters needed to establish an AsyncResultsMerger.
- fields:
- sort:
- type: object
- description: The sort requested on the merging operation. Empty if there is no sort.
- optional: true
- compareWholeSortKey:
- type: bool
- default: false
- description: >-
- When 'compareWholeSortKey' is true, $sortKey is a scalar value, rather than an
- object. We extract the sort key {$sortKey: <value>}. The sort key pattern is
- verified to be {$sortKey: 1}.
- remotes: array<RemoteCursor>
- tailableMode:
- type: TailableMode
- optional: true
- description: If set, the tailability mode of this cursor.
- batchSize:
- type: safeInt64
- optional: true
- description: The batch size for this cursor.
- nss: namespacestring
- allowPartialResults:
- type: bool
- default: false
- description: If set, error responses are ignored.
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 6fd81715e90..4c0e32cba51 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -62,11 +62,9 @@ const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host",
HostAndPort("FakeShard2Host", 12345),
HostAndPort("FakeShard3Host", 12345)};
-const NamespaceString kTestNss("testdb.testcoll");
-
class AsyncResultsMergerTest : public ShardingTestFixture {
public:
- AsyncResultsMergerTest() {}
+ AsyncResultsMergerTest() : _nss("testdb.testcoll") {}
void setUp() override {
ShardingTestFixture::setUp();
@@ -97,48 +95,42 @@ public:
void tearDown() override {
ShardingTestFixture::tearDown();
+ // Reset _params only after shutting down the network interface (through
+ // ShardingTestFixture::tearDown()), because shutting down the network interface will
+ // deliver blackholed responses to the AsyncResultsMerger, and the AsyncResultsMerger's
+ // callback may still access _params.
+ _params.reset();
}
protected:
/**
* Constructs an ARM with the given vector of existing cursors.
*
- * If 'findCmd' is not set, the default AsyncResultsMergerParams are used.
- * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams.
+ * If 'findCmd' is not set, the default ClusterClientCursorParams are used.
+ * Otherwise, the 'findCmd' is used to construct the ClusterClientCursorParams.
*
* 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the
* initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.'
*/
- std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors(
- std::vector<RemoteCursor> remoteCursors,
+ void makeCursorFromExistingCursors(
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors,
boost::optional<BSONObj> findCmd = boost::none,
- boost::optional<std::int64_t> getMoreBatchSize = boost::none) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- params.setRemotes(std::move(remoteCursors));
-
+ boost::optional<long long> getMoreBatchSize = boost::none) {
+ _params = stdx::make_unique<ClusterClientCursorParams>(_nss);
+ _params->remotes = std::move(remoteCursors);
if (findCmd) {
const auto qr = unittest::assertGet(
- QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */));
- if (!qr->getSort().isEmpty()) {
- params.setSort(qr->getSort().getOwned());
- }
-
- if (getMoreBatchSize) {
- params.setBatchSize(getMoreBatchSize);
- } else {
- params.setBatchSize(qr->getBatchSize()
- ? boost::optional<std::int64_t>(
- static_cast<std::int64_t>(*qr->getBatchSize()))
- : boost::none);
- }
- params.setTailableMode(qr->getTailableMode());
- params.setAllowPartialResults(qr->isAllowPartialResults());
+ QueryRequest::makeFromFindCommand(_nss, *findCmd, false /* isExplain */));
+ _params->sort = qr->getSort();
+ _params->limit = qr->getLimit();
+ _params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize();
+ _params->skip = qr->getSkip();
+ _params->tailableMode = qr->getTailableMode();
+ _params->isAllowPartialResults = qr->isAllowPartialResults();
}
- return stdx::make_unique<AsyncResultsMerger>(
- operationContext(), executor(), std::move(params));
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), _params.get());
}
/**
@@ -228,6 +220,11 @@ protected:
net->blackHole(net->getNextReadyRequest());
net->exitNetwork();
}
+
+ const NamespaceString _nss;
+ std::unique_ptr<ClusterClientCursorParams> _params;
+
+ std::unique_ptr<AsyncResultsMerger> arm;
};
void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) {
@@ -243,19 +240,10 @@ void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) {
ASSERT_EQ(numCursors, 1u);
}
-RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) {
- RemoteCursor remoteCursor;
- remoteCursor.setShardId(std::move(shardId));
- remoteCursor.setHostAndPort(std::move(host));
- remoteCursor.setCursorResponse(std::move(response));
- return remoteCursor;
-}
-
TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -271,7 +259,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -297,10 +285,9 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -316,7 +303,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -341,12 +328,10 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
}
TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -363,7 +348,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -391,7 +376,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
responses.clear();
std::vector<BSONObj> batch2 = {
fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -407,20 +392,18 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both shards
- // cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -437,7 +420,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"),
fromjson("{$sortKey: {'': 6}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -451,7 +434,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"),
fromjson("{$sortKey: {'': 9}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -473,19 +456,17 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 9}}"),
*unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both shards
- // cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -498,7 +479,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(5), batch1);
+ responses.emplace_back(_nss, CursorId(5), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -527,7 +508,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
responses.clear();
std::vector<BSONObj> batch2 = {
fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -555,7 +536,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
responses.clear();
std::vector<BSONObj> batch3 = {
fromjson("{_id: 7}"), fromjson("{_id: 8}"), fromjson("{_id: 9}")};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -571,22 +552,19 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both shards
- // cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 7, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 7, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
// Schedule requests.
ASSERT_FALSE(arm->ready());
@@ -597,13 +575,13 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5, '': 9}}"),
fromjson("{$sortKey: {'': 4, '': 20}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 10, '': 11}}"),
fromjson("{$sortKey: {'': 4, '': 4}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"),
fromjson("{$sortKey: {'': 5, '': 9}}")};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -629,18 +607,17 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"),
*unittest::assertGet(arm->nextReady()).getResult());
- // After returning all the buffered results, the ARM returns EOF immediately because both shards
- // cursors were exhausted.
+ // After returning all the buffered results, the ARM returns EOF immediately because both
+ // shards cursors were exhausted.
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -649,7 +626,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
// Parsing the batch results in an error because the sort key is missing.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")};
- responses.emplace_back(kTestNss, CursorId(1), batch1);
+ responses.emplace_back(_nss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -667,10 +644,10 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
std::vector<BSONObj> firstBatch = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- std::vector<RemoteCursor> cursors;
- cursors.push_back(makeRemoteCursor(
- kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, std::move(firstBatch))));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch)));
+ makeCursorFromExistingCursors(std::move(cursors));
// Because there was firstBatch, ARM is immediately ready to return results.
ASSERT_TRUE(arm->ready());
@@ -698,7 +675,7 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -725,12 +702,11 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
std::vector<BSONObj> firstBatch = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
- std::vector<RemoteCursor> cursors;
- cursors.push_back(makeRemoteCursor(
- kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, std::move(firstBatch))));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 0, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch)));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 0, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Because there was firstBatch, ARM is immediately ready to return results.
ASSERT_TRUE(arm->ready());
@@ -758,7 +734,7 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
// Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -783,12 +759,10 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
}
TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -797,9 +771,9 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// Both shards respond with the first batch.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(1), batch1);
+ responses.emplace_back(_nss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(2), batch2);
+ responses.emplace_back(_nss, CursorId(2), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -821,7 +795,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// never responds.
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(1), batch3);
+ responses.emplace_back(_nss, CursorId(1), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
blackHoleNextRequest();
@@ -839,7 +813,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// We can continue to return results from first shard, while second shard remains unresponsive.
responses.clear();
std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -854,15 +828,12 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// the network interface.
auto killEvent = arm->kill(operationContext());
ASSERT_TRUE(killEvent.isValid());
- executor()->shutdown();
- executor()->waitForEvent(killEvent);
}
TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -870,7 +841,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
- responses.emplace_back(kTestNss, CursorId(456), batch);
+ responses.emplace_back(_nss, CursorId(456), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -884,25 +855,22 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
}
TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 789, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 789, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- BSONObj response1 = CursorResponse(kTestNss, CursorId(123), batch1)
+ BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1)
.toBSON(CursorResponse::ResponseType::SubsequentResponse);
BSONObj response2 = fromjson("{foo: 'bar'}");
std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")};
- BSONObj response3 = CursorResponse(kTestNss, CursorId(789), batch3)
+ BSONObj response3 = CursorResponse(_nss, CursorId(789), batch3)
.toBSON(CursorResponse::ResponseType::SubsequentResponse);
scheduleNetworkResponseObjs({response1, response2, response3});
runReadyCallbacks();
@@ -917,14 +885,11 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
}
TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -932,9 +897,9 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(1), batch1);
+ responses.emplace_back(_nss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(2), batch2);
+ responses.emplace_back(_nss, CursorId(2), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -953,10 +918,9 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
}
TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -966,7 +930,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -984,10 +948,9 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
}
TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
executor()->shutdown();
ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent().getStatus());
@@ -996,10 +959,9 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
}
TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatches) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Make a request to the shard that will never get answered.
ASSERT_FALSE(arm->ready());
@@ -1017,10 +979,9 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch
}
TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto killedEvent = arm->kill(operationContext());
@@ -1035,14 +996,11 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
}
TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1050,11 +1008,11 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1069,14 +1027,11 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
}
TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1084,12 +1039,12 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
// Cursor 3 is not exhausted.
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
- responses.emplace_back(kTestNss, CursorId(123), batch3);
+ responses.emplace_back(_nss, CursorId(123), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1104,14 +1059,11 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
}
TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1119,7 +1071,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(0), batch1);
+ responses.emplace_back(_nss, CursorId(0), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1139,10 +1091,9 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
}
TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1150,7 +1101,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(1), batch1);
+ responses.emplace_back(_nss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1163,10 +1114,9 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
}
TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
auto killedEvent1 = arm->kill(operationContext());
ASSERT(killedEvent1.isValid());
auto killedEvent2 = arm->kill(operationContext());
@@ -1177,10 +1127,9 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
TEST_F(AsyncResultsMergerTest, TailableBasic) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1188,7 +1137,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(123), batch1);
+ responses.emplace_back(_nss, CursorId(123), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1209,7 +1158,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(123), batch2);
+ responses.emplace_back(_nss, CursorId(123), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1227,10 +1176,9 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1239,7 +1187,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
// Remote responds with an empty batch and a non-zero cursor id.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
- responses.emplace_back(kTestNss, CursorId(123), batch);
+ responses.emplace_back(_nss, CursorId(123), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1256,10 +1204,9 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1268,7 +1215,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
// Remote responds with an empty batch and a zero cursor id.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1282,10 +1229,9 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 3}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1293,7 +1239,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(1), batch1);
+ responses.emplace_back(_nss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1308,7 +1254,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(0), batch2);
+ responses.emplace_back(_nss, CursorId(0), batch2);
readyEvent = unittest::assertGet(arm->nextEvent());
BSONObj scheduledCmd = getNthPendingRequest(0).cmdObj;
@@ -1328,14 +1274,11 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 97, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 98, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 99, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 97, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 98, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 99, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1349,9 +1292,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
// remaining shards.
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
- responses.emplace_back(kTestNss, CursorId(98), batch1);
+ responses.emplace_back(_nss, CursorId(98), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(99), batch2);
+ responses.emplace_back(_nss, CursorId(99), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1372,7 +1315,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(99), batch3);
+ responses.emplace_back(_nss, CursorId(99), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1387,7 +1330,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
// Once the last reachable shard indicates that its cursor is closed, we're done.
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1398,10 +1341,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 98, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 98, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1409,7 +1351,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(98), batch);
+ responses.emplace_back(_nss, CursorId(98), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1432,12 +1374,10 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 2, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 2, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1446,7 +1386,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
// First host returns single result
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
- responses.emplace_back(kTestNss, CursorId(0), batch);
+ responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1464,12 +1404,10 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 2, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 2, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1494,10 +1432,9 @@ TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) {
TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1505,7 +1442,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
- responses.emplace_back(kTestNss, CursorId(123), batch1);
+ responses.emplace_back(_nss, CursorId(123), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1527,7 +1464,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
- responses.emplace_back(kTestNss, CursorId(123), batch2);
+ responses.emplace_back(_nss, CursorId(123), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1548,24 +1485,20 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
// Clean up.
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoOplogTimestamp) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
- params.setRemotes(std::move(cursors));
- params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
- params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
- auto arm =
- stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+ auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1577,7 +1510,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
- responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
+ responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1590,7 +1523,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, "
"$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")};
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
- responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
+ responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1610,40 +1543,37 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
// Clean up the cursors.
responses.clear();
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest,
SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNullOplogTimestamp) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.push_back(makeRemoteCursor(
+ auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(
kTestShardIds[0],
kTestShardHosts[0],
CursorResponse(
- kTestNss,
+ _nss,
123,
{fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")},
boost::none,
- Timestamp(1, 5))));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1],
+ Timestamp(1, 5)));
+ cursors.emplace_back(kTestShardIds[1],
kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, Timestamp())));
- params.setRemotes(std::move(cursors));
- params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
- params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
- auto arm =
- stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+ CursorResponse(_nss, 456, {}, boost::none, Timestamp()));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1651,7 +1581,7 @@ TEST_F(AsyncResultsMergerTest,
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch3, boost::none, Timestamp(1, 8));
+ responses.emplace_back(_nss, CursorId(0), batch3, boost::none, Timestamp(1, 8));
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(unittest::assertGet(arm->nextEvent()));
@@ -1667,34 +1597,31 @@ TEST_F(AsyncResultsMergerTest,
// Clean up.
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOplogTime) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
+ auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
Timestamp tooLow = Timestamp(1, 2);
- cursors.push_back(makeRemoteCursor(
+ cursors.emplace_back(
kTestShardIds[0],
kTestShardHosts[0],
CursorResponse(
- kTestNss,
+ _nss,
123,
{fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")},
boost::none,
- Timestamp(1, 5))));
- cursors.push_back(makeRemoteCursor(kTestShardIds[1],
- kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, tooLow)));
- params.setRemotes(std::move(cursors));
- params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
- params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
- auto arm =
- stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+ Timestamp(1, 5)));
+ cursors.emplace_back(
+ kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}, boost::none, tooLow));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1702,7 +1629,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp
// Clean up the cursors.
std::vector<CursorResponse> responses;
- responses.emplace_back(kTestNss, CursorId(0), std::vector<BSONObj>{});
+ responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>{});
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
auto killEvent = arm->kill(operationContext());
@@ -1710,16 +1637,13 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- params.setRemotes(std::move(cursors));
- params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
- params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
- auto arm =
- stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+ auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1731,7 +1655,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
- responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
+ responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1739,10 +1663,9 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
ASSERT_TRUE(arm->ready());
// Add the new shard.
- std::vector<RemoteCursor> newCursors;
- newCursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
- arm->addNewShardCursors(std::move(newCursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> newCursors;
+ newCursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
+ arm->addNewShardCursors(newCursors);
// Now shouldn't be ready, we don't have a guarantee from each shard.
ASSERT_FALSE(arm->ready());
@@ -1754,7 +1677,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, "
"$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")};
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
- responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
+ responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1774,27 +1697,24 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
// Clean up the cursors.
responses.clear();
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- params.setRemotes(std::move(cursors));
- params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
- params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"));
- auto arm =
- stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+ auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1806,7 +1726,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
- responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
+ responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
@@ -1814,10 +1734,9 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
ASSERT_TRUE(arm->ready());
// Add the new shard.
- std::vector<RemoteCursor> newCursors;
- newCursors.push_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
- arm->addNewShardCursors(std::move(newCursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> newCursors;
+ newCursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
+ arm->addNewShardCursors(newCursors);
// Now shouldn't be ready, we don't have a guarantee from each shard.
ASSERT_FALSE(arm->ready());
@@ -1831,7 +1750,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
// The last observed time should still be later than the first shard, so we can get the data
// from it.
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
- responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
+ responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
executor()->waitForEvent(readyEvent);
@@ -1851,22 +1770,21 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
// Clean up the cursors.
responses.clear();
std::vector<BSONObj> batch3 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
+ responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
responses.clear();
std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
+ responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
}
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
auto killEvent = arm->kill(operationContext());
@@ -1875,10 +1793,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
auto killEvent = arm->kill(operationContext());
@@ -1887,10 +1804,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1904,10 +1820,9 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
}
TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulingKillCursors) {
- std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -1935,10 +1850,9 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin
}
TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -1957,7 +1871,7 @@ TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
// exhausted.
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["getMore"]);
- return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ return CursorResponse(_nss, 0LL, {BSON("x" << 1)})
.toBSON(CursorResponse::ResponseType::SubsequentResponse);
});
@@ -1965,10 +1879,9 @@ TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
}
TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Issue a blocking wait for the next result asynchronously on a different thread.
auto future = launchAsync([&]() {
@@ -1992,10 +1905,9 @@ TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) {
}
TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
+ makeCursorFromExistingCursors(std::move(cursors));
// Before any requests are scheduled, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 9c01c013ce6..653599def7e 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -113,7 +113,7 @@ public:
/**
* Returns a reference to the vector of remote hosts involved in this operation.
*/
- virtual const std::size_t getNumRemotes() const = 0;
+ virtual const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const = 0;
/**
* Returns the number of result documents returned so far by this cursor via the next() method.
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 1a4be45f1be..58484e87bfa 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -30,7 +30,6 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
-#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/document_source_sort.h"
@@ -136,19 +135,20 @@ OperationContext* ClusterClientCursorImpl::getCurrentOperationContext() const {
}
bool ClusterClientCursorImpl::isTailable() const {
- return _params.tailableMode != TailableModeEnum::kNormal;
+ return _params.tailableMode != TailableMode::kNormal;
}
bool ClusterClientCursorImpl::isTailableAndAwaitData() const {
- return _params.tailableMode == TailableModeEnum::kTailableAndAwaitData;
+ return _params.tailableMode == TailableMode::kTailableAndAwaitData;
}
BSONObj ClusterClientCursorImpl::getOriginatingCommand() const {
return _params.originatingCommandObj;
}
-const std::size_t ClusterClientCursorImpl::getNumRemotes() const {
- return _root->getNumRemotes();
+const std::vector<ClusterClientCursorParams::RemoteCursor>& ClusterClientCursorImpl::getRemotes()
+ const {
+ return _params.remotes;
}
long long ClusterClientCursorImpl::getNumReturnedSoFar() const {
@@ -181,6 +181,30 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc
namespace {
+/**
+ * Rips off an initial $sort stage that will be handled by mongos execution machinery. Returns the
+ * sort key pattern of such a $sort stage if there was one, and boost::none otherwise.
+ */
+boost::optional<BSONObj> extractLeadingSort(Pipeline* mergePipeline) {
+ // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort.
+ auto frontSort = mergePipeline->popFrontWithCriteria(
+ DocumentSourceSort::kStageName, [](const DocumentSource* const source) {
+ return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted();
+ });
+
+ if (frontSort) {
+ auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get());
+ if (auto sortLimit = sortStage->getLimitSrc()) {
+ // There was a limit stage absorbed into the sort stage, so we need to preserve that.
+ mergePipeline->addInitialSource(sortLimit);
+ }
+ return sortStage
+ ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
+ .toBson();
+ }
+ return boost::none;
+}
+
bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) {
return (dynamic_cast<DocumentSourceLimit*>(stage.get()) ||
dynamic_cast<DocumentSourceSkip*>(stage.get()));
@@ -226,10 +250,10 @@ std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* execu
// instead.
while (!pipeline->getSources().empty()) {
invariant(isSkipOrLimit(pipeline->getSources().front()));
- if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) {
+ if (auto skip = pipeline->popFrontWithCriteria(DocumentSourceSkip::kStageName)) {
root = stdx::make_unique<RouterStageSkip>(
opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip());
- } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) {
+ } else if (auto limit = pipeline->popFrontWithCriteria(DocumentSourceLimit::kStageName)) {
root = stdx::make_unique<RouterStageLimit>(
opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit());
}
@@ -246,8 +270,7 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
const auto skip = params->skip;
const auto limit = params->limit;
if (params->mergePipeline) {
- if (auto sort =
- cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) {
+ if (auto sort = extractLeadingSort(params->mergePipeline.get())) {
params->sort = *sort;
}
return buildPipelinePlan(executor, params);
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index d3c9349233b..c685a383307 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -105,7 +105,7 @@ public:
BSONObj getOriginatingCommand() const final;
- const std::size_t getNumRemotes() const final;
+ const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const final;
long long getNumReturnedSoFar() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 6e624f36b84..0e3a3fd6731 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -68,7 +68,8 @@ BSONObj ClusterClientCursorMock::getOriginatingCommand() const {
return _originatingCommand;
}
-const std::size_t ClusterClientCursorMock::getNumRemotes() const {
+const std::vector<ClusterClientCursorParams::RemoteCursor>& ClusterClientCursorMock::getRemotes()
+ const {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 1c50403c3ae..7364240112d 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -69,7 +69,7 @@ public:
BSONObj getOriginatingCommand() const final;
- const std::size_t getNumRemotes() const final;
+ const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const final;
long long getNumReturnedSoFar() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 71a7f17c282..116634bcee8 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -42,7 +42,6 @@
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/tailable_mode.h"
#include "mongo/s/client/shard.h"
-#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -62,6 +61,22 @@ class RouterExecStage;
* this cursor have been processed.
*/
struct ClusterClientCursorParams {
+ struct RemoteCursor {
+ RemoteCursor(ShardId shardId, HostAndPort hostAndPort, CursorResponse cursorResponse)
+ : shardId(std::move(shardId)),
+ hostAndPort(std::move(hostAndPort)),
+ cursorResponse(std::move(cursorResponse)) {}
+
+ // The shardId of the shard on which the cursor resides.
+ ShardId shardId;
+
+ // The exact host (within the shard) on which the cursor resides.
+ HostAndPort hostAndPort;
+
+ // Encompasses the state of the established cursor.
+ CursorResponse cursorResponse;
+ };
+
ClusterClientCursorParams(NamespaceString nss,
boost::optional<ReadPreferenceSetting> readPref = boost::none)
: nsString(std::move(nss)) {
@@ -70,24 +85,6 @@ struct ClusterClientCursorParams {
}
}
- /**
- * Extracts the subset of fields here needed by the AsyncResultsMerger. The returned
- * AsyncResultsMergerParams will assume ownership of 'remotes'.
- */
- AsyncResultsMergerParams extractARMParams() {
- AsyncResultsMergerParams armParams;
- if (!sort.isEmpty()) {
- armParams.setSort(sort);
- }
- armParams.setCompareWholeSortKey(compareWholeSortKey);
- armParams.setRemotes(std::move(remotes));
- armParams.setTailableMode(tailableMode);
- armParams.setBatchSize(batchSize);
- armParams.setNss(nsString);
- armParams.setAllowPartialResults(isAllowPartialResults);
- return armParams;
- }
-
// Namespace against which the cursors exist.
NamespaceString nsString;
@@ -111,7 +108,7 @@ struct ClusterClientCursorParams {
// The number of results per batch. Optional. If specified, will be specified as the batch for
// each getMore.
- boost::optional<std::int64_t> batchSize;
+ boost::optional<long long> batchSize;
// Limits the number of results returned by the ClusterClientCursor to this many. Optional.
// Should be forwarded to the remote hosts in 'cmdObj'.
@@ -122,7 +119,7 @@ struct ClusterClientCursorParams {
// Whether this cursor is tailing a capped collection, and whether it has the awaitData option
// set.
- TailableModeEnum tailableMode = TailableModeEnum::kNormal;
+ TailableMode tailableMode = TailableMode::kNormal;
// Set if a readPreference must be respected throughout the lifetime of the cursor.
boost::optional<ReadPreferenceSetting> readPreference;
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index ca66ee04e08..cd5ce0f9bc8 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -147,9 +147,10 @@ BSONObj ClusterCursorManager::PinnedCursor::getOriginatingCommand() const {
return _cursor->getOriginatingCommand();
}
-const std::size_t ClusterCursorManager::PinnedCursor::getNumRemotes() const {
+const std::vector<ClusterClientCursorParams::RemoteCursor>&
+ClusterCursorManager::PinnedCursor::getRemotes() const {
invariant(_cursor);
- return _cursor->getNumRemotes();
+ return _cursor->getRemotes();
}
CursorId ClusterCursorManager::PinnedCursor::getCursorId() const {
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index cf6dc54f21e..b9b4ee81ef6 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -198,7 +198,7 @@ public:
/**
* Returns a reference to the vector of remote hosts involved in this operation.
*/
- const std::size_t getNumRemotes() const;
+ const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const;
/**
* Returns the cursor id for the underlying cursor, or zero if no cursor is owned.
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 8c9e654f5bb..f8faaabaa96 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -346,7 +346,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
}
// Fill out query exec properties.
- CurOp::get(opCtx)->debug().nShards = ccc->getNumRemotes();
+ CurOp::get(opCtx)->debug().nShards = ccc->getRemotes().size();
CurOp::get(opCtx)->debug().nreturned = results->size();
// If the cursor is exhausted, then there are no more results to return and we don't need to
@@ -490,7 +490,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
// Set the originatingCommand object and the cursorID in CurOp.
{
- CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue().getNumRemotes();
+ CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue().getRemotes().size();
CurOp::get(opCtx)->debug().cursorid = request.cursorid;
stdx::lock_guard<Client> lk(*opCtx->getClient());
CurOp::get(opCtx)->setOriginatingCommand_inlock(
diff --git a/src/mongo/s/query/document_source_router_adapter.cpp b/src/mongo/s/query/document_source_router_adapter.cpp
index 26a944ed5cc..4e144751dcb 100644
--- a/src/mongo/s/query/document_source_router_adapter.cpp
+++ b/src/mongo/s/query/document_source_router_adapter.cpp
@@ -67,10 +67,6 @@ Value DocumentSourceRouterAdapter::serialize(
return Value(); // Return the empty value to hide this stage from explain output.
}
-std::size_t DocumentSourceRouterAdapter::getNumRemotes() const {
- return _child->getNumRemotes();
-}
-
bool DocumentSourceRouterAdapter::remotesExhausted() {
return _child->remotesExhausted();
}
diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h
index a7db7734539..5c1a6a0935c 100644
--- a/src/mongo/s/query/document_source_router_adapter.h
+++ b/src/mongo/s/query/document_source_router_adapter.h
@@ -59,7 +59,6 @@ public:
void detachFromOperationContext() final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
bool remotesExhausted();
- std::size_t getNumRemotes() const;
void setExecContext(RouterExecStage::ExecContext execContext) {
_execContext = execContext;
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index 08ce5a2cb5b..f186bcb1bb5 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -47,12 +47,13 @@
namespace mongo {
-std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- const NamespaceString& nss,
- const ReadPreferenceSetting readPref,
- const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults) {
+std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors(
+ OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults) {
// Construct the requests
std::vector<AsyncRequestsSender::Request> requests;
for (const auto& remote : remotes) {
@@ -67,23 +68,20 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
readPref,
Shard::RetryPolicy::kIdempotent);
- std::vector<RemoteCursor> remoteCursors;
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
try {
// Get the responses
while (!ars.done()) {
try {
auto response = ars.next();
- // Note the shardHostAndPort may not be populated if there was an error, so be sure
- // to do this after parsing the cursor response to ensure the response was ok.
- // Additionally, be careful not to push into 'remoteCursors' until we are sure we
- // have a valid cursor, since the error handling path will attempt to clean up
- // anything in 'remoteCursors'
- RemoteCursor cursor;
- cursor.setCursorResponse(CursorResponse::parseFromBSONThrowing(
+
+ // uasserts must happen before attempting to access the optional shardHostAndPort.
+ auto cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(
uassertStatusOK(std::move(response.swResponse)).data));
- cursor.setShardId(std::move(response.shardId));
- cursor.setHostAndPort(*response.shardHostAndPort);
- remoteCursors.push_back(std::move(cursor));
+
+ remoteCursors.emplace_back(std::move(response.shardId),
+ std::move(*response.shardHostAndPort),
+ std::move(cursorResponse));
} catch (const DBException& ex) {
// Retriable errors are swallowed if 'allowPartialResults' is true.
if (allowPartialResults &&
@@ -116,21 +114,18 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
: response.swResponse.getStatus());
if (swCursorResponse.isOK()) {
- RemoteCursor cursor;
- cursor.setShardId(std::move(response.shardId));
- cursor.setHostAndPort(*response.shardHostAndPort);
- cursor.setCursorResponse(std::move(swCursorResponse.getValue()));
- remoteCursors.push_back(std::move(cursor));
+ remoteCursors.emplace_back(std::move(response.shardId),
+ *response.shardHostAndPort,
+ std::move(swCursorResponse.getValue()));
}
}
// Schedule killCursors against all cursors that were established.
for (const auto& remoteCursor : remoteCursors) {
BSONObj cmdObj =
- KillCursorsRequest(nss, {remoteCursor.getCursorResponse().getCursorId()})
- .toBSON();
+ KillCursorsRequest(nss, {remoteCursor.cursorResponse.getCursorId()}).toBSON();
executor::RemoteCommandRequest request(
- remoteCursor.getHostAndPort(), nss.db().toString(), cmdObj, opCtx);
+ remoteCursor.hostAndPort, nss.db().toString(), cmdObj, opCtx);
// We do not process the response to the killCursors request (we make a good-faith
// attempt at cleaning up the cursors, but ignore any returned errors).
diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h
index e88ddc2682e..b75a750d7b7 100644
--- a/src/mongo/s/query/establish_cursors.h
+++ b/src/mongo/s/query/establish_cursors.h
@@ -36,7 +36,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/cursor_id.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/async_results_merger_params_gen.h"
+#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -61,11 +61,12 @@ class CursorResponse;
* on reachable hosts are returned.
*
*/
-std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- const NamespaceString& nss,
- const ReadPreferenceSetting readPref,
- const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults);
+std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors(
+ OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults);
} // namespace mongo
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 515da5a358c..418419fdbef 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -89,14 +89,6 @@ public:
}
/**
- * Returns the number of remote hosts involved in this execution plan.
- */
- virtual std::size_t getNumRemotes() const {
- invariant(_child); // The default implementation forwards to the child stage.
- return _child->getNumRemotes();
- }
-
- /**
* Returns whether or not all the remote cursors are exhausted.
*/
virtual bool remotesExhausted() {
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 48abb1452ec..4f17927483b 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -40,21 +40,18 @@ namespace mongo {
RouterStageMerge::RouterStageMerge(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams* params)
- : RouterExecStage(opCtx),
- _executor(executor),
- _params(params),
- _arm(opCtx, executor, params->extractARMParams()) {}
+ : RouterExecStage(opCtx), _executor(executor), _params(params), _arm(opCtx, executor, params) {}
StatusWith<ClusterQueryResult> RouterStageMerge::next(ExecContext execCtx) {
// Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData
// cursors wait for ready() only until a specified time limit is exceeded.
- return (_params->tailableMode == TailableModeEnum::kTailableAndAwaitData
+ return (_params->tailableMode == TailableMode::kTailableAndAwaitData
? awaitNextWithTimeout(execCtx)
: _arm.blockingNext());
}
StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) {
- invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData);
+ invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData);
// If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not
// ready, we don't block. Fall straight through to the return statement.
while (!_arm.ready() && execCtx == ExecContext::kGetMoreNoResultsYet) {
@@ -88,7 +85,7 @@ StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContex
StatusWith<EventHandle> RouterStageMerge::getNextEvent() {
// If we abandoned a previous event due to a mongoS-side timeout, wait for it first.
if (_leftoverEventFromLastTimeout) {
- invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData);
+ invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData);
auto event = _leftoverEventFromLastTimeout;
_leftoverEventFromLastTimeout = EventHandle();
return event;
@@ -105,16 +102,14 @@ bool RouterStageMerge::remotesExhausted() {
return _arm.remotesExhausted();
}
-std::size_t RouterStageMerge::getNumRemotes() const {
- return _arm.getNumRemotes();
-}
-
Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return _arm.setAwaitDataTimeout(awaitDataTimeout);
}
-void RouterStageMerge::addNewShardCursors(std::vector<RemoteCursor>&& newShards) {
- _arm.addNewShardCursors(std::move(newShards));
+void RouterStageMerge::addNewShardCursors(
+ std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards) {
+ _arm.addNewShardCursors(newShards);
+ std::move(newShards.begin(), newShards.end(), std::back_inserter(_params->remotes));
}
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index b6bfee146b6..efd397b8c7e 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -57,12 +57,10 @@ public:
bool remotesExhausted() final;
- std::size_t getNumRemotes() const final;
-
/**
* Adds the cursors in 'newShards' to those being merged by the ARM.
*/
- void addNewShardCursors(std::vector<RemoteCursor>&& newShards);
+ void addNewShardCursors(std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards);
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index 5e94274b9ac..97febc62173 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -84,10 +84,6 @@ void RouterStagePipeline::kill(OperationContext* opCtx) {
_mergePipeline->dispose(opCtx);
}
-std::size_t RouterStagePipeline::getNumRemotes() const {
- return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes();
-}
-
bool RouterStagePipeline::remotesExhausted() {
return _mongosOnlyPipeline || _routerAdapter->remotesExhausted();
}
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index c14ddf9f80b..e876dc816a2 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -51,8 +51,6 @@ public:
bool remotesExhausted() final;
- std::size_t getNumRemotes() const final;
-
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
index 61fa2a9176d..451a1ee9699 100644
--- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp
+++ b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
@@ -56,11 +56,9 @@ bool needsUpdate(const StatusWith<ClusterQueryResult>& childResult) {
RouterStageUpdateOnAddShard::RouterStageUpdateOnAddShard(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams* params,
- std::vector<ShardId> shardIds,
BSONObj cmdToRunOnNewShards)
: RouterExecStage(opCtx, stdx::make_unique<RouterStageMerge>(opCtx, executor, params)),
_params(params),
- _shardIds(std::move(shardIds)),
_cmdToRunOnNewShards(cmdToRunOnNewShards) {}
StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next(
@@ -75,12 +73,18 @@ StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next(
}
void RouterStageUpdateOnAddShard::addNewShardCursors(BSONObj newShardDetectedObj) {
+ std::vector<ShardId> existingShardIds;
+ for (const auto& remote : _params->remotes) {
+ existingShardIds.push_back(remote.shardId);
+ }
checked_cast<RouterStageMerge*>(getChildStage())
- ->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj));
+ ->addNewShardCursors(
+ establishShardCursorsOnNewShards(std::move(existingShardIds), newShardDetectedObj));
}
-std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(
- const BSONObj& newShardDetectedObj) {
+std::vector<ClusterClientCursorParams::RemoteCursor>
+RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardId> existingShardIds,
+ const BSONObj& newShardDetectedObj) {
auto* opCtx = getOpCtx();
// Reload the shard registry. We need to ensure a reload initiated after calling this method
// caused the reload, otherwise we aren't guaranteed to get all the new shards.
@@ -94,12 +98,12 @@ std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNe
std::vector<ShardId> shardIds, newShardIds;
shardRegistry->getAllShardIdsNoReload(&shardIds);
- std::sort(_shardIds.begin(), _shardIds.end());
+ std::sort(existingShardIds.begin(), existingShardIds.end());
std::sort(shardIds.begin(), shardIds.end());
std::set_difference(shardIds.begin(),
shardIds.end(),
- _shardIds.begin(),
- _shardIds.end(),
+ existingShardIds.begin(),
+ existingShardIds.end(),
std::back_inserter(newShardIds));
auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
@@ -108,7 +112,6 @@ std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNe
std::vector<std::pair<ShardId, BSONObj>> requests;
for (const auto& shardId : newShardIds) {
requests.emplace_back(shardId, cmdObj);
- _shardIds.push_back(shardId);
}
const bool allowPartialResults = false; // partial results are not allowed
return establishCursors(opCtx,
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.h b/src/mongo/s/query/router_stage_update_on_add_shard.h
index 00ee921e2af..1128dc83430 100644
--- a/src/mongo/s/query/router_stage_update_on_add_shard.h
+++ b/src/mongo/s/query/router_stage_update_on_add_shard.h
@@ -44,7 +44,6 @@ public:
RouterStageUpdateOnAddShard(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams* params,
- std::vector<ShardId> shardIds,
BSONObj cmdToRunOnNewShards);
StatusWith<ClusterQueryResult> next(ExecContext) final;
@@ -59,10 +58,10 @@ private:
/**
* Open the cursors on the new shards.
*/
- std::vector<RemoteCursor> establishShardCursorsOnNewShards(const BSONObj& newShardDetectedObj);
+ std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursorsOnNewShards(
+ std::vector<ShardId> existingShardIds, const BSONObj& newShardDetectedObj);
ClusterClientCursorParams* _params;
- std::vector<ShardId> _shardIds;
BSONObj _cmdToRunOnNewShards;
};
}
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 9a754364412..bdf4283c104 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -38,7 +38,6 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_cursor_manager.h"
-#include "mongo/s/shard_id.h"
namespace mongo {
@@ -49,7 +48,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
ClusterCursorManager* cursorManager,
- TailableModeEnum tailableMode) {
+ TailableMode tailableMode) {
if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) {
return cmdResult;
}
@@ -72,13 +71,11 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
}
ClusterClientCursorParams params(incomingCursorResponse.getValue().getNSS());
- params.remotes.emplace_back();
- auto& remoteCursor = params.remotes.back();
- remoteCursor.setShardId(shardId.toString());
- remoteCursor.setHostAndPort(server);
- remoteCursor.setCursorResponse(CursorResponse(incomingCursorResponse.getValue().getNSS(),
- incomingCursorResponse.getValue().getCursorId(),
- {}));
+ params.remotes.emplace_back(shardId,
+ server,
+ CursorResponse(incomingCursorResponse.getValue().getNSS(),
+ incomingCursorResponse.getValue().getCursorId(),
+ {}));
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.tailableMode = tailableMode;
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index b9756be44f7..75a4e76bf24 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -74,6 +74,6 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
ClusterCursorManager* cursorManager,
- TailableModeEnum tailableMode = TailableModeEnum::kNormal);
+ TailableMode tailableMode = TailableMode::kNormal);
} // namespace mongo