diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-04-05 18:46:43 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-04-05 21:28:45 -0400 |
commit | 41f13212be110fc2360804fc04982273e43910f4 (patch) | |
tree | 1c75cad7feed3e2592320e2204163236bd6b568e /src/mongo/s/query/async_results_merger.cpp | |
parent | 7a48a263485a585dac1e1289c830eafd35a3d54b (diff) | |
download | mongo-41f13212be110fc2360804fc04982273e43910f4.tar.gz |
SERVER-33323 Use the IDL to serialize the ARM
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 90 |
1 files changed, 48 insertions, 42 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 59d9a133d72..45403399e8c 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -82,20 +82,27 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, executor::TaskExecutor* executor, - ClusterClientCursorParams* params) + AsyncResultsMergerParams params) : _opCtx(opCtx), _executor(executor), - _params(params), - _mergeQueue(MergingComparator(_remotes, _params->sort, _params->compareWholeSortKey)) { + // This strange initialization is to work around the fact that the IDL does not currently + // support a default value for an enum. The default tailable mode should be 'kNormal', but + // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'. + _tailableMode(params.getTailableMode() ? *params.getTailableMode() + : TailableModeEnum::kNormal), + _params(std::move(params)), + _mergeQueue(MergingComparator(_remotes, + _params.getSort() ? *_params.getSort() : BSONObj(), + _params.getCompareWholeSortKey())) { size_t remoteIndex = 0; - for (const auto& remote : _params->remotes) { - _remotes.emplace_back(remote.hostAndPort, - remote.cursorResponse.getNSS(), - remote.cursorResponse.getCursorId()); + for (const auto& remote : _params.getRemotes()) { + _remotes.emplace_back(remote.getHostAndPort(), + remote.getCursorResponse().getNSS(), + remote.getCursorResponse().getCursorId()); // We don't check the return value of _addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. - _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse); + _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse()); ++remoteIndex; } } @@ -123,7 +130,7 @@ bool AsyncResultsMerger::_remotesExhausted(WithLock) { Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_params->tailableMode != TailableMode::kTailableAndAwaitData) { + if (_tailableMode != TailableModeEnum::kTailableAndAwaitData) { return Status(ErrorCodes::BadValue, "maxTimeMS can only be used with getMore for tailable, awaitData cursors"); } @@ -133,9 +140,9 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { // recent optimes, which allows us to return sorted $changeStream results even if some shards // are yet to provide a batch of data. If the timeout specified by the client is greater than // 1000ms, then it will be enforced elsewhere. - _awaitDataTimeout = (!_params->sort.isEmpty() && _remotes.size() > 1u - ? std::min(awaitDataTimeout, Milliseconds{1000}) - : awaitDataTimeout); + _awaitDataTimeout = + (_params.getSort() && _remotes.size() > 1u ? std::min(awaitDataTimeout, Milliseconds{1000}) + : awaitDataTimeout); return Status::OK(); } @@ -161,13 +168,12 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) { _opCtx = opCtx; } -void AsyncResultsMerger::addNewShardCursors( - const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors) { +void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) { stdx::lock_guard<stdx::mutex> lk(_mutex); for (auto&& remote : newCursors) { - _remotes.emplace_back(remote.hostAndPort, - remote.cursorResponse.getNSS(), - remote.cursorResponse.getCursorId()); + _remotes.emplace_back(remote.getHostAndPort(), + remote.getCursorResponse().getNSS(), + remote.getCursorResponse().getCursorId()); } } @@ -190,16 +196,15 @@ bool AsyncResultsMerger::_ready(WithLock lk) { } } - const bool hasSort = !_params->sort.isEmpty(); - return hasSort ? _readySorted(lk) : _readyUnsorted(lk); + return _params.getSort() ? _readySorted(lk) : _readyUnsorted(lk); } bool AsyncResultsMerger::_readySorted(WithLock lk) { - if (_params->tailableMode == TailableMode::kTailableAndAwaitData) { + if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) { return _readySortedTailable(lk); } // Tailable non-awaitData cursors cannot have a sort. - invariant(_params->tailableMode == TailableMode::kNormal); + invariant(_tailableMode == TailableModeEnum::kNormal); for (const auto& remote : _remotes) { if (!remote.hasNext() && !remote.exhausted()) { @@ -218,13 +223,14 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) { auto smallestRemote = _mergeQueue.top(); auto smallestResult = _remotes[smallestRemote].docBuffer.front(); auto keyWeWantToReturn = - extractSortKey(*smallestResult.getResult(), _params->compareWholeSortKey); + extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey()); for (const auto& remote : _remotes) { if (!remote.promisedMinSortKey) { // In order to merge sorted tailable cursors, we need this value to be populated. return false; } - if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, _params->sort) > 0) { + if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) > + 0) { // The key we want to return is not guaranteed to be smaller than future results from // this remote, so we can't yet return it. return false; @@ -264,13 +270,12 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { return {ClusterQueryResult()}; } - const bool hasSort = !_params->sort.isEmpty(); - return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk); + return _params.getSort() ? _nextReadySorted(lk) : _nextReadyUnsorted(lk); } ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) { // Tailable non-awaitData cursors cannot have a sort. - invariant(_params->tailableMode != TailableMode::kTailable); + invariant(_tailableMode != TailableModeEnum::kTailable); if (_mergeQueue.empty()) { return {}; @@ -304,7 +309,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) { ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front(); _remotes[_gettingFromRemote].docBuffer.pop(); - if (_params->tailableMode == TailableMode::kTailable && + if (_tailableMode == TailableModeEnum::kTailable && !_remotes[_gettingFromRemote].hasNext()) { // The cursor is tailable and we're about to return the last buffered result. This // means that the next value returned should be boost::none to indicate the end of @@ -334,9 +339,9 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { // request to fetch the remaining docs only. If the remote node has a plan with OR for top k and // a full sort as is the case for the OP_QUERY find then this optimization will prevent // switching to the full sort plan branch. - auto adjustedBatchSize = _params->batchSize; - if (_params->batchSize && *_params->batchSize > remote.fetchedCount) { - adjustedBatchSize = *_params->batchSize - remote.fetchedCount; + auto adjustedBatchSize = _params.getBatchSize(); + if (_params.getBatchSize() && *_params.getBatchSize() > remote.fetchedCount) { + adjustedBatchSize = *_params.getBatchSize() - remote.fetchedCount; } BSONObj cmdObj = GetMoreRequest(remote.cursorNss, @@ -348,7 +353,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { .toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _opCtx); + remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx); auto callbackStatus = _executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) { @@ -448,7 +453,8 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, remote->cursorId = response.getCursorId(); if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) { // We only expect to see this for change streams. - invariant(SimpleBSONObjComparator::kInstance.evaluate(_params->sort == + invariant(_params.getSort()); + invariant(SimpleBSONObjComparator::kInstance.evaluate(*_params.getSort() == change_stream_constants::kSortSpec)); auto newLatestTimestamp = *response.getLastOplogTimestamp(); @@ -475,7 +481,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, auto maxSortKeyFromResponse = (response.getBatch().empty() ? BSONObj() - : extractSortKey(response.getBatch().back(), _params->compareWholeSortKey)); + : extractSortKey(response.getBatch().back(), _params.getCompareWholeSortKey())); remote->promisedMinSortKey = (compareSortKeys( @@ -525,7 +531,7 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t remote.status = std::move(status); // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We // remove the unreachable host entirely from consideration by marking it as exhausted. - if (_params->isAllowPartialResults) { + if (_params.getAllowPartialResults()) { remote.status = Status::OK(); // Clear the results buffer and cursor id. @@ -565,7 +571,7 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk, // through to the client as-is. // (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from // one shard means the end of the overall batch). - if (_params->tailableMode == TailableMode::kTailable && !remote.hasNext()) { + if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) { invariant(_remotes.size() == 1); _eofNext = true; } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) { @@ -582,7 +588,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, updateRemoteMetadata(&remote, response); for (const auto& obj : response.getBatch()) { // If there's a sort, we're expecting the remote node to have given us back a sort key. - if (!_params->sort.isEmpty()) { + if (_params.getSort()) { auto key = obj[AsyncResultsMerger::kSortKeyField]; if (!key) { remote.status = @@ -591,7 +597,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, << "' in document: " << obj); return false; - } else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) { + } else if (!_params.getCompareWholeSortKey() && key.type() != BSONType::Object) { remote.status = Status(ErrorCodes::InternalError, str::stream() << "Field '" << AsyncResultsMerger::kSortKeyField @@ -606,9 +612,9 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, ++remote.fetchedCount; } - // If we're doing a sorted merge, then we have to make sure to put this remote onto the - // merge queue. - if (!_params->sort.isEmpty() && !response.getBatch().empty()) { + // If we're doing a sorted merge, then we have to make sure to put this remote onto the merge + // queue. + if (_params.getSort() && !response.getBatch().empty()) { _mergeQueue.push(remoteIndex); } return true; @@ -638,10 +644,10 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx) for (const auto& remote : _remotes) { if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) { - BSONObj cmdObj = KillCursorsRequest(_params->nsString, {remote.cursorId}).toBSON(); + BSONObj cmdObj = KillCursorsRequest(_params.getNss(), {remote.cursorId}).toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx); + remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, opCtx); // Send kill request; discard callback handle, if any, or failure report, if not. _executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus().ignore(); |