diff options
author | James Wahlin <james.wahlin@10gen.com> | 2016-07-25 16:56:22 -0400 |
---|---|---|
committer | James Wahlin <james.wahlin@10gen.com> | 2016-07-29 15:36:53 -0400 |
commit | dc7f50c520c5129709008568241274cb6d5ec231 (patch) | |
tree | cd38158bf08d17566e706eeb3eb4202d3dfc1044 /src/mongo/s/query/async_results_merger.cpp | |
parent | d305e618162d37ccc16cf574fcc0388a1160af93 (diff) | |
download | mongo-dc7f50c520c5129709008568241274cb6d5ec231.tar.gz |
SERVER-24762 Support for views on sharded collections
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 61 |
1 files changed, 47 insertions, 14 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 2d1919769ad..31d9337b10f 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -182,7 +182,7 @@ bool AsyncResultsMerger::readyUnsorted_inlock() { return allExhausted; } -StatusWith<boost::optional<BSONObj>> AsyncResultsMerger::nextReady() { +StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { stdx::lock_guard<stdx::mutex> lk(_mutex); dassert(ready_inlock()); if (_lifecycleState != kAlive) { @@ -195,19 +195,19 @@ StatusWith<boost::optional<BSONObj>> AsyncResultsMerger::nextReady() { if (_eofNext) { _eofNext = false; - return {boost::none}; + return {ClusterQueryResult()}; } const bool hasSort = !_params.sort.isEmpty(); return hasSort ? nextReadySorted() : nextReadyUnsorted(); } -boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() { +ClusterQueryResult AsyncResultsMerger::nextReadySorted() { // Tailable cursors cannot have a sort. invariant(!_params.isTailable); if (_mergeQueue.empty()) { - return boost::none; + return {}; } size_t smallestRemote = _mergeQueue.top(); @@ -216,7 +216,7 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() { invariant(!_remotes[smallestRemote].docBuffer.empty()); invariant(_remotes[smallestRemote].status.isOK()); - BSONObj front = _remotes[smallestRemote].docBuffer.front(); + ClusterQueryResult front = _remotes[smallestRemote].docBuffer.front(); _remotes[smallestRemote].docBuffer.pop(); // Re-populate the merging queue with the next result from 'smallestRemote', if it has a @@ -228,14 +228,14 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() { return front; } -boost::optional<BSONObj> AsyncResultsMerger::nextReadyUnsorted() { +ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() { size_t remotesAttempted = 0; while (remotesAttempted < _remotes.size()) { // It is illegal to call this method if there is an error received from any shard. invariant(_remotes[_gettingFromRemote].status.isOK()); if (_remotes[_gettingFromRemote].hasNext()) { - BSONObj front = _remotes[_gettingFromRemote].docBuffer.front(); + ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front(); _remotes[_gettingFromRemote].docBuffer.pop(); if (_params.isTailable && !_remotes[_gettingFromRemote].hasNext()) { @@ -255,7 +255,7 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadyUnsorted() { } } - return boost::none; + return {}; } Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { @@ -436,6 +436,38 @@ void AsyncResultsMerger::handleBatchResponse( : cbData.response.getStatus()); if (!cursorResponseStatus.isOK()) { + // In the case a read is performed against a view, the shard primary can return an error + // indicating that the underlying collection may be sharded. When this occurs the return + // message will include an expanded view definition and collection namespace which we need + // to store. This allows for a second attempt at the read directly against the underlying + // collection. + if (cursorResponseStatus.getStatus() == + ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { + auto& responseObj = cbData.response.getValue().data; + if (!responseObj.hasField("resolvedView")) { + remote.status = Status(ErrorCodes::InternalError, + str::stream() << "Missing field 'resolvedView' in document: " + << responseObj); + return; + } + + auto resolvedViewObj = responseObj.getObjectField("resolvedView"); + if (resolvedViewObj.isEmpty()) { + remote.status = Status(ErrorCodes::InternalError, + str::stream() << "Field 'resolvedView' must be an object: " + << responseObj); + return; + } + + ClusterQueryResult result; + result.setViewDefinition(resolvedViewObj.getOwned()); + + remote.docBuffer.push(result); + remote.cursorId = 0; + remote.status = Status::OK(); + return; + } + auto shard = remote.getShard(); if (!shard) { remote.status = Status(cursorResponseStatus.getStatus().code(), @@ -479,7 +511,7 @@ void AsyncResultsMerger::handleBatchResponse( remote.status = Status::OK(); // Clear the results buffer and cursor id. - std::queue<BSONObj> emptyBuffer; + std::queue<ClusterQueryResult> emptyBuffer; std::swap(remote.docBuffer, emptyBuffer); remote.cursorId = 0; } @@ -504,7 +536,8 @@ void AsyncResultsMerger::handleBatchResponse( return; } - remote.docBuffer.push(obj); + ClusterQueryResult result(obj); + remote.docBuffer.push(result); ++remote.fetchedCount; } @@ -677,11 +710,11 @@ std::shared_ptr<Shard> AsyncResultsMerger::RemoteCursorData::getShard() { // bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const size_t& rhs) { - const BSONObj& leftDoc = _remotes[lhs].docBuffer.front(); - const BSONObj& rightDoc = _remotes[rhs].docBuffer.front(); + const ClusterQueryResult& leftDoc = _remotes[lhs].docBuffer.front(); + const ClusterQueryResult& rightDoc = _remotes[rhs].docBuffer.front(); - BSONObj leftDocKey = leftDoc[ClusterClientCursorParams::kSortKeyField].Obj(); - BSONObj rightDocKey = rightDoc[ClusterClientCursorParams::kSortKeyField].Obj(); + BSONObj leftDocKey = (*leftDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj(); + BSONObj rightDocKey = (*rightDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj(); // This does not need to sort with a collator, since mongod has already mapped strings to their // ICU comparison keys as part of the $sortKey meta projection. |