summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/async_results_merger.cpp
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2018-04-05 13:56:47 -0400
committerWilliam Schultz <william.schultz@mongodb.com>2018-04-05 13:59:27 -0400
commite88c6d85036607ddf86105234917b4adfffbd612 (patch)
tree43ff091111fec8836654b0ccc3dcaff6e3d2a518 /src/mongo/s/query/async_results_merger.cpp
parentac6544a9194197b5ab10f563cf1a19dcdc42349c (diff)
downloadmongo-e88c6d85036607ddf86105234917b4adfffbd612.tar.gz
Revert "SERVER-33323 Use the IDL to serialize the ARM"
This reverts commit 7d09f278a2acf9791b36927d6af1d30347d60391.
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp90
1 files changed, 42 insertions, 48 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 45403399e8c..59d9a133d72 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -82,27 +82,20 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa
AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
executor::TaskExecutor* executor,
- AsyncResultsMergerParams params)
+ ClusterClientCursorParams* params)
: _opCtx(opCtx),
_executor(executor),
- // This strange initialization is to work around the fact that the IDL does not currently
- // support a default value for an enum. The default tailable mode should be 'kNormal', but
- // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'.
- _tailableMode(params.getTailableMode() ? *params.getTailableMode()
- : TailableModeEnum::kNormal),
- _params(std::move(params)),
- _mergeQueue(MergingComparator(_remotes,
- _params.getSort() ? *_params.getSort() : BSONObj(),
- _params.getCompareWholeSortKey())) {
+ _params(params),
+ _mergeQueue(MergingComparator(_remotes, _params->sort, _params->compareWholeSortKey)) {
size_t remoteIndex = 0;
- for (const auto& remote : _params.getRemotes()) {
- _remotes.emplace_back(remote.getHostAndPort(),
- remote.getCursorResponse().getNSS(),
- remote.getCursorResponse().getCursorId());
+ for (const auto& remote : _params->remotes) {
+ _remotes.emplace_back(remote.hostAndPort,
+ remote.cursorResponse.getNSS(),
+ remote.cursorResponse.getCursorId());
// We don't check the return value of _addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
- _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse());
+ _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse);
++remoteIndex;
}
}
@@ -130,7 +123,7 @@ bool AsyncResultsMerger::_remotesExhausted(WithLock) {
Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_tailableMode != TailableModeEnum::kTailableAndAwaitData) {
+ if (_params->tailableMode != TailableMode::kTailableAndAwaitData) {
return Status(ErrorCodes::BadValue,
"maxTimeMS can only be used with getMore for tailable, awaitData cursors");
}
@@ -140,9 +133,9 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
// recent optimes, which allows us to return sorted $changeStream results even if some shards
// are yet to provide a batch of data. If the timeout specified by the client is greater than
// 1000ms, then it will be enforced elsewhere.
- _awaitDataTimeout =
- (_params.getSort() && _remotes.size() > 1u ? std::min(awaitDataTimeout, Milliseconds{1000})
- : awaitDataTimeout);
+ _awaitDataTimeout = (!_params->sort.isEmpty() && _remotes.size() > 1u
+ ? std::min(awaitDataTimeout, Milliseconds{1000})
+ : awaitDataTimeout);
return Status::OK();
}
@@ -168,12 +161,13 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) {
_opCtx = opCtx;
}
-void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
+void AsyncResultsMerger::addNewShardCursors(
+ const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
for (auto&& remote : newCursors) {
- _remotes.emplace_back(remote.getHostAndPort(),
- remote.getCursorResponse().getNSS(),
- remote.getCursorResponse().getCursorId());
+ _remotes.emplace_back(remote.hostAndPort,
+ remote.cursorResponse.getNSS(),
+ remote.cursorResponse.getCursorId());
}
}
@@ -196,15 +190,16 @@ bool AsyncResultsMerger::_ready(WithLock lk) {
}
}
- return _params.getSort() ? _readySorted(lk) : _readyUnsorted(lk);
+ const bool hasSort = !_params->sort.isEmpty();
+ return hasSort ? _readySorted(lk) : _readyUnsorted(lk);
}
bool AsyncResultsMerger::_readySorted(WithLock lk) {
- if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ if (_params->tailableMode == TailableMode::kTailableAndAwaitData) {
return _readySortedTailable(lk);
}
// Tailable non-awaitData cursors cannot have a sort.
- invariant(_tailableMode == TailableModeEnum::kNormal);
+ invariant(_params->tailableMode == TailableMode::kNormal);
for (const auto& remote : _remotes) {
if (!remote.hasNext() && !remote.exhausted()) {
@@ -223,14 +218,13 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) {
auto smallestRemote = _mergeQueue.top();
auto smallestResult = _remotes[smallestRemote].docBuffer.front();
auto keyWeWantToReturn =
- extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey());
+ extractSortKey(*smallestResult.getResult(), _params->compareWholeSortKey);
for (const auto& remote : _remotes) {
if (!remote.promisedMinSortKey) {
// In order to merge sorted tailable cursors, we need this value to be populated.
return false;
}
- if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) >
- 0) {
+ if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, _params->sort) > 0) {
// The key we want to return is not guaranteed to be smaller than future results from
// this remote, so we can't yet return it.
return false;
@@ -270,12 +264,13 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
return {ClusterQueryResult()};
}
- return _params.getSort() ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
+ const bool hasSort = !_params->sort.isEmpty();
+ return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk);
}
ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) {
// Tailable non-awaitData cursors cannot have a sort.
- invariant(_tailableMode != TailableModeEnum::kTailable);
+ invariant(_params->tailableMode != TailableMode::kTailable);
if (_mergeQueue.empty()) {
return {};
@@ -309,7 +304,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) {
ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front();
_remotes[_gettingFromRemote].docBuffer.pop();
- if (_tailableMode == TailableModeEnum::kTailable &&
+ if (_params->tailableMode == TailableMode::kTailable &&
!_remotes[_gettingFromRemote].hasNext()) {
// The cursor is tailable and we're about to return the last buffered result. This
// means that the next value returned should be boost::none to indicate the end of
@@ -339,9 +334,9 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
// request to fetch the remaining docs only. If the remote node has a plan with OR for top k and
// a full sort as is the case for the OP_QUERY find then this optimization will prevent
// switching to the full sort plan branch.
- auto adjustedBatchSize = _params.getBatchSize();
- if (_params.getBatchSize() && *_params.getBatchSize() > remote.fetchedCount) {
- adjustedBatchSize = *_params.getBatchSize() - remote.fetchedCount;
+ auto adjustedBatchSize = _params->batchSize;
+ if (_params->batchSize && *_params->batchSize > remote.fetchedCount) {
+ adjustedBatchSize = *_params->batchSize - remote.fetchedCount;
}
BSONObj cmdObj = GetMoreRequest(remote.cursorNss,
@@ -353,7 +348,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
.toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx);
+ remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _opCtx);
auto callbackStatus =
_executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) {
@@ -453,8 +448,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
remote->cursorId = response.getCursorId();
if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) {
// We only expect to see this for change streams.
- invariant(_params.getSort());
- invariant(SimpleBSONObjComparator::kInstance.evaluate(*_params.getSort() ==
+ invariant(SimpleBSONObjComparator::kInstance.evaluate(_params->sort ==
change_stream_constants::kSortSpec));
auto newLatestTimestamp = *response.getLastOplogTimestamp();
@@ -481,7 +475,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
auto maxSortKeyFromResponse =
(response.getBatch().empty()
? BSONObj()
- : extractSortKey(response.getBatch().back(), _params.getCompareWholeSortKey()));
+ : extractSortKey(response.getBatch().back(), _params->compareWholeSortKey));
remote->promisedMinSortKey =
(compareSortKeys(
@@ -531,7 +525,7 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t
remote.status = std::move(status);
// Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
// remove the unreachable host entirely from consideration by marking it as exhausted.
- if (_params.getAllowPartialResults()) {
+ if (_params->isAllowPartialResults) {
remote.status = Status::OK();
// Clear the results buffer and cursor id.
@@ -571,7 +565,7 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk,
// through to the client as-is.
// (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from
// one shard means the end of the overall batch).
- if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) {
+ if (_params->tailableMode == TailableMode::kTailable && !remote.hasNext()) {
invariant(_remotes.size() == 1);
_eofNext = true;
} else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) {
@@ -588,7 +582,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
updateRemoteMetadata(&remote, response);
for (const auto& obj : response.getBatch()) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
- if (_params.getSort()) {
+ if (!_params->sort.isEmpty()) {
auto key = obj[AsyncResultsMerger::kSortKeyField];
if (!key) {
remote.status =
@@ -597,7 +591,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
<< "' in document: "
<< obj);
return false;
- } else if (!_params.getCompareWholeSortKey() && key.type() != BSONType::Object) {
+ } else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) {
remote.status =
Status(ErrorCodes::InternalError,
str::stream() << "Field '" << AsyncResultsMerger::kSortKeyField
@@ -612,9 +606,9 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
++remote.fetchedCount;
}
- // If we're doing a sorted merge, then we have to make sure to put this remote onto the merge
- // queue.
- if (_params.getSort() && !response.getBatch().empty()) {
+ // If we're doing a sorted merge, then we have to make sure to put this remote onto the
+ // merge queue.
+ if (!_params->sort.isEmpty() && !response.getBatch().empty()) {
_mergeQueue.push(remoteIndex);
}
return true;
@@ -644,10 +638,10 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx)
for (const auto& remote : _remotes) {
if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) {
- BSONObj cmdObj = KillCursorsRequest(_params.getNss(), {remote.cursorId}).toBSON();
+ BSONObj cmdObj = KillCursorsRequest(_params->nsString, {remote.cursorId}).toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, opCtx);
+ remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx);
// Send kill request; discard callback handle, if any, or failure report, if not.
_executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus().ignore();