diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-08-07 15:07:47 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-08-08 19:04:30 +0100 |
commit | 9062f71621d053e32beec1061f708e3fd08b0158 (patch) | |
tree | 82fd151004fc4192126d71628e765e9257128feb /src/mongo/s/query/cluster_find.cpp | |
parent | 1dabf89e81b06b4d93a447e1fa5f6742b2f7afa1 (diff) | |
download | mongo-9062f71621d053e32beec1061f708e3fd08b0158.tar.gz |
SERVER-22760 Sharded aggregation pipelines which involve taking a simple union should merge on mongos
Diffstat (limited to 'src/mongo/s/query/cluster_find.cpp')
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 0f7630d5dc8..b7a5ecc6b34 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -257,6 +257,12 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, return swCursors.getStatus(); } + // Determine whether the cursor we may eventually register will be single- or multi-target. + + const auto cursorType = swCursors.getValue().size() > 1 + ? ClusterCursorManager::CursorType::MultiTarget + : ClusterCursorManager::CursorType::SingleTarget; + // Transfer the established cursors to a ClusterClientCursor. params.remotes = std::move(swCursors.getValue()); @@ -267,8 +273,11 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, auto cursorState = ClusterCursorManager::CursorState::NotExhausted; int bytesBuffered = 0; + + ccc->reattachToOperationContext(opCtx); + while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) { - auto next = ccc->next(opCtx); + auto next = ccc->next(); if (!next.isOK()) { return next.getStatus(); @@ -300,6 +309,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, results->push_back(std::move(nextObj)); } + ccc->detachFromOperationContext(); + if (!query.getQueryRequest().wantMore() && !ccc->isTailable()) { cursorState = ClusterCursorManager::CursorState::Exhausted; } @@ -313,8 +324,6 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, // Register the cursor with the cursor manager for subsequent getMore's. auto cursorManager = Grid::get(opCtx)->getCursorManager(); - const auto cursorType = chunkManager ? ClusterCursorManager::CursorType::NamespaceSharded - : ClusterCursorManager::CursorType::NamespaceNotSharded; const auto cursorLifetime = query.getQueryRequest().isNoCursorTimeout() ? ClusterCursorManager::CursorLifetime::Immortal : ClusterCursorManager::CursorLifetime::Mortal; @@ -427,8 +436,11 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, long long batchSize = request.batchSize.value_or(0); long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar(); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; + + pinnedCursor.getValue().reattachToOperationContext(opCtx); + while (!FindCommon::enoughForGetMore(batchSize, batch.size())) { - auto next = pinnedCursor.getValue().next(opCtx); + auto next = pinnedCursor.getValue().next(); if (!next.isOK()) { return next.getStatus(); } @@ -454,6 +466,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, batch.push_back(std::move(*next.getValue().getResult())); } + pinnedCursor.getValue().detachFromOperationContext(); + // Transfer ownership of the cursor back to the cursor manager. pinnedCursor.getValue().returnCursor(cursorState); |