summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_find.cpp
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-08-07 15:07:47 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-08-08 19:04:30 +0100
commit9062f71621d053e32beec1061f708e3fd08b0158 (patch)
tree82fd151004fc4192126d71628e765e9257128feb /src/mongo/s/query/cluster_find.cpp
parent1dabf89e81b06b4d93a447e1fa5f6742b2f7afa1 (diff)
downloadmongo-9062f71621d053e32beec1061f708e3fd08b0158.tar.gz
SERVER-22760 Sharded aggregation pipelines which involve taking a simple union should merge on mongos
Diffstat (limited to 'src/mongo/s/query/cluster_find.cpp')
-rw-r--r--src/mongo/s/query/cluster_find.cpp22
1 files changed, 18 insertions, 4 deletions
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 0f7630d5dc8..b7a5ecc6b34 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -257,6 +257,12 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
return swCursors.getStatus();
}
+ // Determine whether the cursor we may eventually register will be single- or multi-target.
+
+ const auto cursorType = swCursors.getValue().size() > 1
+ ? ClusterCursorManager::CursorType::MultiTarget
+ : ClusterCursorManager::CursorType::SingleTarget;
+
// Transfer the established cursors to a ClusterClientCursor.
params.remotes = std::move(swCursors.getValue());
@@ -267,8 +273,11 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
int bytesBuffered = 0;
+
+ ccc->reattachToOperationContext(opCtx);
+
while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) {
- auto next = ccc->next(opCtx);
+ auto next = ccc->next();
if (!next.isOK()) {
return next.getStatus();
@@ -300,6 +309,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
results->push_back(std::move(nextObj));
}
+ ccc->detachFromOperationContext();
+
if (!query.getQueryRequest().wantMore() && !ccc->isTailable()) {
cursorState = ClusterCursorManager::CursorState::Exhausted;
}
@@ -313,8 +324,6 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
// Register the cursor with the cursor manager for subsequent getMore's.
auto cursorManager = Grid::get(opCtx)->getCursorManager();
- const auto cursorType = chunkManager ? ClusterCursorManager::CursorType::NamespaceSharded
- : ClusterCursorManager::CursorType::NamespaceNotSharded;
const auto cursorLifetime = query.getQueryRequest().isNoCursorTimeout()
? ClusterCursorManager::CursorLifetime::Immortal
: ClusterCursorManager::CursorLifetime::Mortal;
@@ -427,8 +436,11 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
long long batchSize = request.batchSize.value_or(0);
long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar();
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
+
+ pinnedCursor.getValue().reattachToOperationContext(opCtx);
+
while (!FindCommon::enoughForGetMore(batchSize, batch.size())) {
- auto next = pinnedCursor.getValue().next(opCtx);
+ auto next = pinnedCursor.getValue().next();
if (!next.isOK()) {
return next.getStatus();
}
@@ -454,6 +466,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
batch.push_back(std::move(*next.getValue().getResult()));
}
+ pinnedCursor.getValue().detachFromOperationContext();
+
// Transfer ownership of the cursor back to the cursor manager.
pinnedCursor.getValue().returnCursor(cursorState);