summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/async_results_merger.cpp
diff options
context:
space:
mode:
authorJames Wahlin <james.wahlin@10gen.com>2016-07-25 16:56:22 -0400
committerJames Wahlin <james.wahlin@10gen.com>2016-07-29 15:36:53 -0400
commitdc7f50c520c5129709008568241274cb6d5ec231 (patch)
treecd38158bf08d17566e706eeb3eb4202d3dfc1044 /src/mongo/s/query/async_results_merger.cpp
parentd305e618162d37ccc16cf574fcc0388a1160af93 (diff)
downloadmongo-dc7f50c520c5129709008568241274cb6d5ec231.tar.gz
SERVER-24762 Support for views on sharded collections
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp61
1 files changed, 47 insertions, 14 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 2d1919769ad..31d9337b10f 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -182,7 +182,7 @@ bool AsyncResultsMerger::readyUnsorted_inlock() {
return allExhausted;
}
-StatusWith<boost::optional<BSONObj>> AsyncResultsMerger::nextReady() {
+StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
dassert(ready_inlock());
if (_lifecycleState != kAlive) {
@@ -195,19 +195,19 @@ StatusWith<boost::optional<BSONObj>> AsyncResultsMerger::nextReady() {
if (_eofNext) {
_eofNext = false;
- return {boost::none};
+ return {ClusterQueryResult()};
}
const bool hasSort = !_params.sort.isEmpty();
return hasSort ? nextReadySorted() : nextReadyUnsorted();
}
-boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() {
+ClusterQueryResult AsyncResultsMerger::nextReadySorted() {
// Tailable cursors cannot have a sort.
invariant(!_params.isTailable);
if (_mergeQueue.empty()) {
- return boost::none;
+ return {};
}
size_t smallestRemote = _mergeQueue.top();
@@ -216,7 +216,7 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() {
invariant(!_remotes[smallestRemote].docBuffer.empty());
invariant(_remotes[smallestRemote].status.isOK());
- BSONObj front = _remotes[smallestRemote].docBuffer.front();
+ ClusterQueryResult front = _remotes[smallestRemote].docBuffer.front();
_remotes[smallestRemote].docBuffer.pop();
// Re-populate the merging queue with the next result from 'smallestRemote', if it has a
@@ -228,14 +228,14 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() {
return front;
}
-boost::optional<BSONObj> AsyncResultsMerger::nextReadyUnsorted() {
+ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() {
size_t remotesAttempted = 0;
while (remotesAttempted < _remotes.size()) {
// It is illegal to call this method if there is an error received from any shard.
invariant(_remotes[_gettingFromRemote].status.isOK());
if (_remotes[_gettingFromRemote].hasNext()) {
- BSONObj front = _remotes[_gettingFromRemote].docBuffer.front();
+ ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front();
_remotes[_gettingFromRemote].docBuffer.pop();
if (_params.isTailable && !_remotes[_gettingFromRemote].hasNext()) {
@@ -255,7 +255,7 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadyUnsorted() {
}
}
- return boost::none;
+ return {};
}
Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
@@ -436,6 +436,38 @@ void AsyncResultsMerger::handleBatchResponse(
: cbData.response.getStatus());
if (!cursorResponseStatus.isOK()) {
+ // In the case a read is performed against a view, the shard primary can return an error
+ // indicating that the underlying collection may be sharded. When this occurs the return
+ // message will include an expanded view definition and collection namespace which we need
+ // to store. This allows for a second attempt at the read directly against the underlying
+ // collection.
+ if (cursorResponseStatus.getStatus() ==
+ ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) {
+ auto& responseObj = cbData.response.getValue().data;
+ if (!responseObj.hasField("resolvedView")) {
+ remote.status = Status(ErrorCodes::InternalError,
+ str::stream() << "Missing field 'resolvedView' in document: "
+ << responseObj);
+ return;
+ }
+
+ auto resolvedViewObj = responseObj.getObjectField("resolvedView");
+ if (resolvedViewObj.isEmpty()) {
+ remote.status = Status(ErrorCodes::InternalError,
+ str::stream() << "Field 'resolvedView' must be an object: "
+ << responseObj);
+ return;
+ }
+
+ ClusterQueryResult result;
+ result.setViewDefinition(resolvedViewObj.getOwned());
+
+ remote.docBuffer.push(result);
+ remote.cursorId = 0;
+ remote.status = Status::OK();
+ return;
+ }
+
auto shard = remote.getShard();
if (!shard) {
remote.status = Status(cursorResponseStatus.getStatus().code(),
@@ -479,7 +511,7 @@ void AsyncResultsMerger::handleBatchResponse(
remote.status = Status::OK();
// Clear the results buffer and cursor id.
- std::queue<BSONObj> emptyBuffer;
+ std::queue<ClusterQueryResult> emptyBuffer;
std::swap(remote.docBuffer, emptyBuffer);
remote.cursorId = 0;
}
@@ -504,7 +536,8 @@ void AsyncResultsMerger::handleBatchResponse(
return;
}
- remote.docBuffer.push(obj);
+ ClusterQueryResult result(obj);
+ remote.docBuffer.push(result);
++remote.fetchedCount;
}
@@ -677,11 +710,11 @@ std::shared_ptr<Shard> AsyncResultsMerger::RemoteCursorData::getShard() {
//
bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const size_t& rhs) {
- const BSONObj& leftDoc = _remotes[lhs].docBuffer.front();
- const BSONObj& rightDoc = _remotes[rhs].docBuffer.front();
+ const ClusterQueryResult& leftDoc = _remotes[lhs].docBuffer.front();
+ const ClusterQueryResult& rightDoc = _remotes[rhs].docBuffer.front();
- BSONObj leftDocKey = leftDoc[ClusterClientCursorParams::kSortKeyField].Obj();
- BSONObj rightDocKey = rightDoc[ClusterClientCursorParams::kSortKeyField].Obj();
+ BSONObj leftDocKey = (*leftDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj();
+ BSONObj rightDocKey = (*rightDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj();
// This does not need to sort with a collator, since mongod has already mapped strings to their
// ICU comparison keys as part of the $sortKey meta projection.