summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/router_stage_pipeline.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-07-02 18:23:25 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-08-15 13:30:12 -0400
commitee06e6cbe5a75775f76836449558be2f6a98ddfd (patch)
treed4dbf37110d25f7f4876337a7b1e11abe251fac5 /src/mongo/s/query/router_stage_pipeline.cpp
parenta5bde2f3e9afc3f72da01788b76829fb29c2f4e7 (diff)
downloadmongo-ee06e6cbe5a75775f76836449558be2f6a98ddfd.tar.gz
SERVER-33323 Refactor agg cursor merging on mongos
This commit makes it so that aggregations will always use a $mergeCursors as a wrapper around a AsyncResultsMerger, which is new behavior for mongos. As part of this refactor, we can delete the concept of a 'merging presorted' $sort stage (which is now handled by the AsyncResultsMerger) and delete the DocumentSourceRouterAdapter stage which talked to a RouterStageMerge, instead directly using a $mergeCursors stage.
Diffstat (limited to 'src/mongo/s/query/router_stage_pipeline.cpp')
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp31
1 files changed, 15 insertions, 16 deletions
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index 5e94274b9ac..a5a97bdbdbc 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -35,26 +35,20 @@
#include "mongo/db/pipeline/document_source_list_local_sessions.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/expression_context.h"
-#include "mongo/s/query/document_source_router_adapter.h"
namespace mongo {
-RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child,
- std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline)
+RouterStagePipeline::RouterStagePipeline(std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline)
: RouterExecStage(mergePipeline->getContext()->opCtx),
- _mergePipeline(std::move(mergePipeline)),
- _mongosOnlyPipeline(!_mergePipeline->isSplitForMerge()) {
- if (!_mongosOnlyPipeline) {
- // Add an adapter to the front of the pipeline to draw results from 'child'.
- _routerAdapter =
- DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child)),
- _mergePipeline->addInitialSource(_routerAdapter);
- }
+ _mergePipeline(std::move(mergePipeline)) {
+ invariant(!_mergePipeline->getSources().empty());
+ _mergeCursorsStage =
+ dynamic_cast<DocumentSourceMergeCursors*>(_mergePipeline->getSources().front().get());
}
StatusWith<ClusterQueryResult> RouterStagePipeline::next(RouterExecStage::ExecContext execContext) {
- if (_routerAdapter) {
- _routerAdapter->setExecContext(execContext);
+ if (_mergeCursorsStage) {
+ _mergeCursorsStage->setExecContext(execContext);
}
// Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF.
@@ -85,15 +79,20 @@ void RouterStagePipeline::kill(OperationContext* opCtx) {
}
std::size_t RouterStagePipeline::getNumRemotes() const {
- return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes();
+ if (_mergeCursorsStage) {
+ return _mergeCursorsStage->getNumRemotes();
+ }
+ return 0;
}
bool RouterStagePipeline::remotesExhausted() {
- return _mongosOnlyPipeline || _routerAdapter->remotesExhausted();
+ return !_mergeCursorsStage || _mergeCursorsStage->remotesExhausted();
}
Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return _routerAdapter->setAwaitDataTimeout(awaitDataTimeout);
+ invariant(_mergeCursorsStage,
+ "The only cursors which should be tailable are those with remote cursors.");
+ return _mergeCursorsStage->setAwaitDataTimeout(awaitDataTimeout);
}
} // namespace mongo