diff options
author | William Schultz <william.schultz@mongodb.com> | 2018-04-05 13:56:47 -0400 |
---|---|---|
committer | William Schultz <william.schultz@mongodb.com> | 2018-04-05 13:59:27 -0400 |
commit | e88c6d85036607ddf86105234917b4adfffbd612 (patch) | |
tree | 43ff091111fec8836654b0ccc3dcaff6e3d2a518 /src/mongo/s/query/async_results_merger.cpp | |
parent | ac6544a9194197b5ab10f563cf1a19dcdc42349c (diff) | |
download | mongo-e88c6d85036607ddf86105234917b4adfffbd612.tar.gz |
Revert "SERVER-33323 Use the IDL to serialize the ARM"
This reverts commit 7d09f278a2acf9791b36927d6af1d30347d60391.
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 90 |
1 files changed, 42 insertions, 48 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 45403399e8c..59d9a133d72 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -82,27 +82,20 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, executor::TaskExecutor* executor, - AsyncResultsMergerParams params) + ClusterClientCursorParams* params) : _opCtx(opCtx), _executor(executor), - // This strange initialization is to work around the fact that the IDL does not currently - // support a default value for an enum. The default tailable mode should be 'kNormal', but - // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'. - _tailableMode(params.getTailableMode() ? *params.getTailableMode() - : TailableModeEnum::kNormal), - _params(std::move(params)), - _mergeQueue(MergingComparator(_remotes, - _params.getSort() ? *_params.getSort() : BSONObj(), - _params.getCompareWholeSortKey())) { + _params(params), + _mergeQueue(MergingComparator(_remotes, _params->sort, _params->compareWholeSortKey)) { size_t remoteIndex = 0; - for (const auto& remote : _params.getRemotes()) { - _remotes.emplace_back(remote.getHostAndPort(), - remote.getCursorResponse().getNSS(), - remote.getCursorResponse().getCursorId()); + for (const auto& remote : _params->remotes) { + _remotes.emplace_back(remote.hostAndPort, + remote.cursorResponse.getNSS(), + remote.cursorResponse.getCursorId()); // We don't check the return value of _addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. - _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse()); + _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse); ++remoteIndex; } } @@ -130,7 +123,7 @@ bool AsyncResultsMerger::_remotesExhausted(WithLock) { Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_tailableMode != TailableModeEnum::kTailableAndAwaitData) { + if (_params->tailableMode != TailableMode::kTailableAndAwaitData) { return Status(ErrorCodes::BadValue, "maxTimeMS can only be used with getMore for tailable, awaitData cursors"); } @@ -140,9 +133,9 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { // recent optimes, which allows us to return sorted $changeStream results even if some shards // are yet to provide a batch of data. If the timeout specified by the client is greater than // 1000ms, then it will be enforced elsewhere. - _awaitDataTimeout = - (_params.getSort() && _remotes.size() > 1u ? std::min(awaitDataTimeout, Milliseconds{1000}) - : awaitDataTimeout); + _awaitDataTimeout = (!_params->sort.isEmpty() && _remotes.size() > 1u + ? std::min(awaitDataTimeout, Milliseconds{1000}) + : awaitDataTimeout); return Status::OK(); } @@ -168,12 +161,13 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) { _opCtx = opCtx; } -void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) { +void AsyncResultsMerger::addNewShardCursors( + const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors) { stdx::lock_guard<stdx::mutex> lk(_mutex); for (auto&& remote : newCursors) { - _remotes.emplace_back(remote.getHostAndPort(), - remote.getCursorResponse().getNSS(), - remote.getCursorResponse().getCursorId()); + _remotes.emplace_back(remote.hostAndPort, + remote.cursorResponse.getNSS(), + remote.cursorResponse.getCursorId()); } } @@ -196,15 +190,16 @@ bool AsyncResultsMerger::_ready(WithLock lk) { } } - return _params.getSort() ? _readySorted(lk) : _readyUnsorted(lk); + const bool hasSort = !_params->sort.isEmpty(); + return hasSort ? _readySorted(lk) : _readyUnsorted(lk); } bool AsyncResultsMerger::_readySorted(WithLock lk) { - if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) { + if (_params->tailableMode == TailableMode::kTailableAndAwaitData) { return _readySortedTailable(lk); } // Tailable non-awaitData cursors cannot have a sort. - invariant(_tailableMode == TailableModeEnum::kNormal); + invariant(_params->tailableMode == TailableMode::kNormal); for (const auto& remote : _remotes) { if (!remote.hasNext() && !remote.exhausted()) { @@ -223,14 +218,13 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) { auto smallestRemote = _mergeQueue.top(); auto smallestResult = _remotes[smallestRemote].docBuffer.front(); auto keyWeWantToReturn = - extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey()); + extractSortKey(*smallestResult.getResult(), _params->compareWholeSortKey); for (const auto& remote : _remotes) { if (!remote.promisedMinSortKey) { // In order to merge sorted tailable cursors, we need this value to be populated. return false; } - if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) > - 0) { + if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, _params->sort) > 0) { // The key we want to return is not guaranteed to be smaller than future results from // this remote, so we can't yet return it. return false; @@ -270,12 +264,13 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { return {ClusterQueryResult()}; } - return _params.getSort() ? _nextReadySorted(lk) : _nextReadyUnsorted(lk); + const bool hasSort = !_params->sort.isEmpty(); + return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk); } ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) { // Tailable non-awaitData cursors cannot have a sort. - invariant(_tailableMode != TailableModeEnum::kTailable); + invariant(_params->tailableMode != TailableMode::kTailable); if (_mergeQueue.empty()) { return {}; @@ -309,7 +304,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) { ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front(); _remotes[_gettingFromRemote].docBuffer.pop(); - if (_tailableMode == TailableModeEnum::kTailable && + if (_params->tailableMode == TailableMode::kTailable && !_remotes[_gettingFromRemote].hasNext()) { // The cursor is tailable and we're about to return the last buffered result. This // means that the next value returned should be boost::none to indicate the end of @@ -339,9 +334,9 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { // request to fetch the remaining docs only. If the remote node has a plan with OR for top k and // a full sort as is the case for the OP_QUERY find then this optimization will prevent // switching to the full sort plan branch. - auto adjustedBatchSize = _params.getBatchSize(); - if (_params.getBatchSize() && *_params.getBatchSize() > remote.fetchedCount) { - adjustedBatchSize = *_params.getBatchSize() - remote.fetchedCount; + auto adjustedBatchSize = _params->batchSize; + if (_params->batchSize && *_params->batchSize > remote.fetchedCount) { + adjustedBatchSize = *_params->batchSize - remote.fetchedCount; } BSONObj cmdObj = GetMoreRequest(remote.cursorNss, @@ -353,7 +348,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { .toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx); + remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _opCtx); auto callbackStatus = _executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) { @@ -453,8 +448,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, remote->cursorId = response.getCursorId(); if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) { // We only expect to see this for change streams. - invariant(_params.getSort()); - invariant(SimpleBSONObjComparator::kInstance.evaluate(*_params.getSort() == + invariant(SimpleBSONObjComparator::kInstance.evaluate(_params->sort == change_stream_constants::kSortSpec)); auto newLatestTimestamp = *response.getLastOplogTimestamp(); @@ -481,7 +475,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, auto maxSortKeyFromResponse = (response.getBatch().empty() ? BSONObj() - : extractSortKey(response.getBatch().back(), _params.getCompareWholeSortKey())); + : extractSortKey(response.getBatch().back(), _params->compareWholeSortKey)); remote->promisedMinSortKey = (compareSortKeys( @@ -531,7 +525,7 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t remote.status = std::move(status); // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We // remove the unreachable host entirely from consideration by marking it as exhausted. - if (_params.getAllowPartialResults()) { + if (_params->isAllowPartialResults) { remote.status = Status::OK(); // Clear the results buffer and cursor id. @@ -571,7 +565,7 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk, // through to the client as-is. // (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from // one shard means the end of the overall batch). - if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) { + if (_params->tailableMode == TailableMode::kTailable && !remote.hasNext()) { invariant(_remotes.size() == 1); _eofNext = true; } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) { @@ -588,7 +582,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, updateRemoteMetadata(&remote, response); for (const auto& obj : response.getBatch()) { // If there's a sort, we're expecting the remote node to have given us back a sort key. - if (_params.getSort()) { + if (!_params->sort.isEmpty()) { auto key = obj[AsyncResultsMerger::kSortKeyField]; if (!key) { remote.status = @@ -597,7 +591,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, << "' in document: " << obj); return false; - } else if (!_params.getCompareWholeSortKey() && key.type() != BSONType::Object) { + } else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) { remote.status = Status(ErrorCodes::InternalError, str::stream() << "Field '" << AsyncResultsMerger::kSortKeyField @@ -612,9 +606,9 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, ++remote.fetchedCount; } - // If we're doing a sorted merge, then we have to make sure to put this remote onto the merge - // queue. - if (_params.getSort() && !response.getBatch().empty()) { + // If we're doing a sorted merge, then we have to make sure to put this remote onto the + // merge queue. + if (!_params->sort.isEmpty() && !response.getBatch().empty()) { _mergeQueue.push(remoteIndex); } return true; @@ -644,10 +638,10 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx) for (const auto& remote : _remotes) { if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) { - BSONObj cmdObj = KillCursorsRequest(_params.getNss(), {remote.cursorId}).toBSON(); + BSONObj cmdObj = KillCursorsRequest(_params->nsString, {remote.cursorId}).toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, opCtx); + remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx); // Send kill request; discard callback handle, if any, or failure report, if not. _executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus().ignore(); |