summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/async_results_merger.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-04-05 18:46:43 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-04-05 21:28:45 -0400
commit41f13212be110fc2360804fc04982273e43910f4 (patch)
tree1c75cad7feed3e2592320e2204163236bd6b568e /src/mongo/s/query/async_results_merger.cpp
parent7a48a263485a585dac1e1289c830eafd35a3d54b (diff)
downloadmongo-41f13212be110fc2360804fc04982273e43910f4.tar.gz
SERVER-33323 Use the IDL to serialize the ARM
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp90
1 files changed, 48 insertions, 42 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 59d9a133d72..45403399e8c 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -82,20 +82,27 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa
AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
executor::TaskExecutor* executor,
- ClusterClientCursorParams* params)
+ AsyncResultsMergerParams params)
: _opCtx(opCtx),
_executor(executor),
- _params(params),
- _mergeQueue(MergingComparator(_remotes, _params->sort, _params->compareWholeSortKey)) {
+ // This strange initialization is to work around the fact that the IDL does not currently
+ // support a default value for an enum. The default tailable mode should be 'kNormal', but
+ // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'.
+ _tailableMode(params.getTailableMode() ? *params.getTailableMode()
+ : TailableModeEnum::kNormal),
+ _params(std::move(params)),
+ _mergeQueue(MergingComparator(_remotes,
+ _params.getSort() ? *_params.getSort() : BSONObj(),
+ _params.getCompareWholeSortKey())) {
size_t remoteIndex = 0;
- for (const auto& remote : _params->remotes) {
- _remotes.emplace_back(remote.hostAndPort,
- remote.cursorResponse.getNSS(),
- remote.cursorResponse.getCursorId());
+ for (const auto& remote : _params.getRemotes()) {
+ _remotes.emplace_back(remote.getHostAndPort(),
+ remote.getCursorResponse().getNSS(),
+ remote.getCursorResponse().getCursorId());
// We don't check the return value of _addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
- _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse);
+ _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse());
++remoteIndex;
}
}
@@ -123,7 +130,7 @@ bool AsyncResultsMerger::_remotesExhausted(WithLock) {
Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_params->tailableMode != TailableMode::kTailableAndAwaitData) {
+ if (_tailableMode != TailableModeEnum::kTailableAndAwaitData) {
return Status(ErrorCodes::BadValue,
"maxTimeMS can only be used with getMore for tailable, awaitData cursors");
}
@@ -133,9 +140,9 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
// recent optimes, which allows us to return sorted $changeStream results even if some shards
// are yet to provide a batch of data. If the timeout specified by the client is greater than
// 1000ms, then it will be enforced elsewhere.
- _awaitDataTimeout = (!_params->sort.isEmpty() && _remotes.size() > 1u
- ? std::min(awaitDataTimeout, Milliseconds{1000})
- : awaitDataTimeout);
+ _awaitDataTimeout =
+ (_params.getSort() && _remotes.size() > 1u ? std::min(awaitDataTimeout, Milliseconds{1000})
+ : awaitDataTimeout);
return Status::OK();
}
@@ -161,13 +168,12 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) {
_opCtx = opCtx;
}
-void AsyncResultsMerger::addNewShardCursors(
- const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors) {
+void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
for (auto&& remote : newCursors) {
- _remotes.emplace_back(remote.hostAndPort,
- remote.cursorResponse.getNSS(),
- remote.cursorResponse.getCursorId());
+ _remotes.emplace_back(remote.getHostAndPort(),
+ remote.getCursorResponse().getNSS(),
+ remote.getCursorResponse().getCursorId());
}
}
@@ -190,16 +196,15 @@ bool AsyncResultsMerger::_ready(WithLock lk) {
}
}
- const bool hasSort = !_params->sort.isEmpty();
- return hasSort ? _readySorted(lk) : _readyUnsorted(lk);
+ return _params.getSort() ? _readySorted(lk) : _readyUnsorted(lk);
}
bool AsyncResultsMerger::_readySorted(WithLock lk) {
- if (_params->tailableMode == TailableMode::kTailableAndAwaitData) {
+ if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) {
return _readySortedTailable(lk);
}
// Tailable non-awaitData cursors cannot have a sort.
- invariant(_params->tailableMode == TailableMode::kNormal);
+ invariant(_tailableMode == TailableModeEnum::kNormal);
for (const auto& remote : _remotes) {
if (!remote.hasNext() && !remote.exhausted()) {
@@ -218,13 +223,14 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) {
auto smallestRemote = _mergeQueue.top();
auto smallestResult = _remotes[smallestRemote].docBuffer.front();
auto keyWeWantToReturn =
- extractSortKey(*smallestResult.getResult(), _params->compareWholeSortKey);
+ extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey());
for (const auto& remote : _remotes) {
if (!remote.promisedMinSortKey) {
// In order to merge sorted tailable cursors, we need this value to be populated.
return false;
}
- if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, _params->sort) > 0) {
+ if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) >
+ 0) {
// The key we want to return is not guaranteed to be smaller than future results from
// this remote, so we can't yet return it.
return false;
@@ -264,13 +270,12 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
return {ClusterQueryResult()};
}
- const bool hasSort = !_params->sort.isEmpty();
- return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
+ return _params.getSort() ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
}
ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) {
// Tailable non-awaitData cursors cannot have a sort.
- invariant(_params->tailableMode != TailableMode::kTailable);
+ invariant(_tailableMode != TailableModeEnum::kTailable);
if (_mergeQueue.empty()) {
return {};
@@ -304,7 +309,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) {
ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front();
_remotes[_gettingFromRemote].docBuffer.pop();
- if (_params->tailableMode == TailableMode::kTailable &&
+ if (_tailableMode == TailableModeEnum::kTailable &&
!_remotes[_gettingFromRemote].hasNext()) {
// The cursor is tailable and we're about to return the last buffered result. This
// means that the next value returned should be boost::none to indicate the end of
@@ -334,9 +339,9 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
// request to fetch the remaining docs only. If the remote node has a plan with OR for top k and
// a full sort as is the case for the OP_QUERY find then this optimization will prevent
// switching to the full sort plan branch.
- auto adjustedBatchSize = _params->batchSize;
- if (_params->batchSize && *_params->batchSize > remote.fetchedCount) {
- adjustedBatchSize = *_params->batchSize - remote.fetchedCount;
+ auto adjustedBatchSize = _params.getBatchSize();
+ if (_params.getBatchSize() && *_params.getBatchSize() > remote.fetchedCount) {
+ adjustedBatchSize = *_params.getBatchSize() - remote.fetchedCount;
}
BSONObj cmdObj = GetMoreRequest(remote.cursorNss,
@@ -348,7 +353,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
.toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _opCtx);
+ remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx);
auto callbackStatus =
_executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) {
@@ -448,7 +453,8 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
remote->cursorId = response.getCursorId();
if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) {
// We only expect to see this for change streams.
- invariant(SimpleBSONObjComparator::kInstance.evaluate(_params->sort ==
+ invariant(_params.getSort());
+ invariant(SimpleBSONObjComparator::kInstance.evaluate(*_params.getSort() ==
change_stream_constants::kSortSpec));
auto newLatestTimestamp = *response.getLastOplogTimestamp();
@@ -475,7 +481,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
auto maxSortKeyFromResponse =
(response.getBatch().empty()
? BSONObj()
- : extractSortKey(response.getBatch().back(), _params->compareWholeSortKey));
+ : extractSortKey(response.getBatch().back(), _params.getCompareWholeSortKey()));
remote->promisedMinSortKey =
(compareSortKeys(
@@ -525,7 +531,7 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t
remote.status = std::move(status);
// Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
// remove the unreachable host entirely from consideration by marking it as exhausted.
- if (_params->isAllowPartialResults) {
+ if (_params.getAllowPartialResults()) {
remote.status = Status::OK();
// Clear the results buffer and cursor id.
@@ -565,7 +571,7 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk,
// through to the client as-is.
// (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from
// one shard means the end of the overall batch).
- if (_params->tailableMode == TailableMode::kTailable && !remote.hasNext()) {
+ if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) {
invariant(_remotes.size() == 1);
_eofNext = true;
} else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) {
@@ -582,7 +588,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
updateRemoteMetadata(&remote, response);
for (const auto& obj : response.getBatch()) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
- if (!_params->sort.isEmpty()) {
+ if (_params.getSort()) {
auto key = obj[AsyncResultsMerger::kSortKeyField];
if (!key) {
remote.status =
@@ -591,7 +597,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
<< "' in document: "
<< obj);
return false;
- } else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) {
+ } else if (!_params.getCompareWholeSortKey() && key.type() != BSONType::Object) {
remote.status =
Status(ErrorCodes::InternalError,
str::stream() << "Field '" << AsyncResultsMerger::kSortKeyField
@@ -606,9 +612,9 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
++remote.fetchedCount;
}
- // If we're doing a sorted merge, then we have to make sure to put this remote onto the
- // merge queue.
- if (!_params->sort.isEmpty() && !response.getBatch().empty()) {
+ // If we're doing a sorted merge, then we have to make sure to put this remote onto the merge
+ // queue.
+ if (_params.getSort() && !response.getBatch().empty()) {
_mergeQueue.push(remoteIndex);
}
return true;
@@ -638,10 +644,10 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx)
for (const auto& remote : _remotes) {
if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) {
- BSONObj cmdObj = KillCursorsRequest(_params->nsString, {remote.cursorId}).toBSON();
+ BSONObj cmdObj = KillCursorsRequest(_params.getNss(), {remote.cursorId}).toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx);
+ remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, opCtx);
// Send kill request; discard callback handle, if any, or failure report, if not.
_executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus().ignore();