diff options
author | Martin Neupauer <martin.neupauer@10gen.com> | 2018-06-20 13:33:34 -0400 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-07-06 15:02:32 -0400 |
commit | fda766f6be1a20fa28ce361511bc62e5c995186b (patch) | |
tree | 4289bb82ff7ce5ca55bc9f33915ff3205431dedf /src/mongo/db/commands/run_aggregate.cpp | |
parent | 5b2739dbff77811dbbfbccbc8a7ca8b973c8525f (diff) | |
download | mongo-fda766f6be1a20fa28ce361511bc62e5c995186b.tar.gz |
SERVER-35894 The initial implementation of the producer document source
for the exchange operator.
SERVER-35940 Remove IntrusiveCounter in favor of RefCountable.
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 137 |
1 files changed, 103 insertions, 34 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index f4a97b7cfef..5e00d3f867c 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -45,6 +45,7 @@ #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_exchange.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" @@ -88,14 +89,48 @@ namespace { */ bool handleCursorCommand(OperationContext* opCtx, const NamespaceString& nsForCursor, - ClientCursor* cursor, + std::vector<ClientCursor*> cursors, const AggregationRequest& request, BSONObjBuilder& result) { - invariant(cursor); - + invariant(!cursors.empty()); long long batchSize = request.getBatchSize(); + if (cursors.size() > 1) { + + uassert( + ErrorCodes::BadValue, "the exchange initial batch size must be zero", batchSize == 0); + + BSONArrayBuilder cursorsBuilder; + for (size_t idx = 0; idx < cursors.size(); ++idx) { + invariant(cursors[idx]); + + BSONObjBuilder cursorResult; + appendCursorResponseObject( + cursors[idx]->cursorid(), nsForCursor.ns(), BSONArray(), &cursorResult); + cursorResult.appendBool("ok", 1); + + cursorsBuilder.append(cursorResult.obj()); + + // If a time limit was set on the pipeline, remaining time is "rolled over" to the + // cursor (for use by future getmore ops). + cursors[idx]->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); + + // Cursor needs to be in a saved state while we yield locks for getmore. State + // will be restored in getMore(). + cursors[idx]->getExecutor()->saveState(); + cursors[idx]->getExecutor()->detachFromOperationContext(); + } + + result.appendArray("cursors", cursorsBuilder.obj()); + + return true; + } + CursorResponseBuilder responseBuilder(true, &result); + + ClientCursor* cursor = cursors[0]; + invariant(cursor); + BSONObj next; for (int objCount = 0; objCount < batchSize; objCount++) { // The initial getNext() on a PipelineProxyStage may be very expensive so we don't @@ -322,9 +357,8 @@ Status runAggregate(OperationContext* opCtx, // streams, this will be the UUID of the original namespace instead of the oplog namespace. boost::optional<UUID> uuid; - unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; + std::vector<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs; boost::intrusive_ptr<ExpressionContext> expCtx; - Pipeline* unownedPipeline; auto curOp = CurOp::get(opCtx); { const LiteParsedPipeline liteParsedPipeline(request); @@ -498,22 +532,43 @@ Status runAggregate(OperationContext* opCtx, // this process uses the correct collation if it does any string comparisons. pipeline->optimizePipeline(); - // Transfer ownership of the Pipeline to the PipelineProxyStage. - unownedPipeline = pipeline.get(); - auto ws = make_unique<WorkingSet>(); - auto proxy = make_unique<PipelineProxyStage>(opCtx, std::move(pipeline), ws.get()); + std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> pipelines; - // This PlanExecutor will simply forward requests to the Pipeline, so does not need to - // yield or to be registered with any collection's CursorManager to receive invalidations. - // The Pipeline may contain PlanExecutors which *are* yielding PlanExecutors and which *are* - // registered with their respective collection's CursorManager - auto statusWithPlanExecutor = - PlanExecutor::make(opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::NO_YIELD); - invariant(statusWithPlanExecutor.isOK()); - exec = std::move(statusWithPlanExecutor.getValue()); + pipelines.emplace_back(std::move(pipeline)); + + auto exchange = + dynamic_cast<DocumentSourceExchange*>(pipelines[0]->getSources().back().get()); + if (exchange) { + for (size_t idx = 1; idx < exchange->getConsumers(); ++idx) { + auto sources = pipelines[0]->getSources(); + sources.back() = new DocumentSourceExchange(expCtx, exchange->getExchange(), idx); + pipelines.emplace_back( + uassertStatusOK(Pipeline::create(std::move(sources), expCtx))); + } + } + + // TODO we will revisit the current vector of pipelines design when we will implement + // plan summaries, explains, etc. + for (size_t idx = 0; idx < pipelines.size(); ++idx) { + // Transfer ownership of the Pipeline to the PipelineProxyStage. + auto ws = make_unique<WorkingSet>(); + auto proxy = + make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get()); + + // This PlanExecutor will simply forward requests to the Pipeline, so does not need to + // yield or to be registered with any collection's CursorManager to receive + // invalidations. The Pipeline may contain PlanExecutors which *are* yielding + // PlanExecutors and which *are* registered with their respective collection's + // CursorManager + + auto statusWithPlanExecutor = PlanExecutor::make( + opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::NO_YIELD); + invariant(statusWithPlanExecutor.isOK()); + execs.emplace_back(std::move(statusWithPlanExecutor.getValue())); + } { - auto planSummary = Explain::getPlanSummary(exec.get()); + auto planSummary = Explain::getPlanSummary(execs[0].get()); stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setPlanSummary_inlock(std::move(planSummary)); } @@ -524,30 +579,44 @@ Status runAggregate(OperationContext* opCtx, // cursor manager. The global cursor manager does not deliver invalidations or kill // notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving // invalidations and kill notifications themselves, not the cursor we create here. - ClientCursorParams cursorParams( - std::move(exec), - origNss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - repl::ReadConcernArgs::get(opCtx).getLevel(), - cmdObj); - if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { - cursorParams.setTailable(true); - cursorParams.setAwaitData(true); - } - auto pin = - CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams)); + std::vector<ClientCursorPin> pins; + std::vector<ClientCursor*> cursors; + + ScopeGuard cursorFreer = MakeGuard( + [](std::vector<ClientCursorPin>* pins) { + for (auto& p : *pins) { + p.deleteUnderlying(); + } + }, + &pins); + + for (size_t idx = 0; idx < execs.size(); ++idx) { + ClientCursorParams cursorParams( + std::move(execs[idx]), + origNss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + repl::ReadConcernArgs::get(opCtx).getLevel(), + cmdObj); + if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { + cursorParams.setTailable(true); + cursorParams.setAwaitData(true); + } - ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin); + auto pin = + CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams)); + cursors.emplace_back(pin.getCursor()); + pins.emplace_back(std::move(pin)); + } // If both explain and cursor are specified, explain wins. if (expCtx->explain) { Explain::explainPipelineExecutor( - pin.getCursor()->getExecutor(), *(expCtx->explain), &result); + pins[0].getCursor()->getExecutor(), *(expCtx->explain), &result); } else { // Cursor must be specified, if explain is not. const bool keepCursor = - handleCursorCommand(opCtx, origNss, pin.getCursor(), request, result); + handleCursorCommand(opCtx, origNss, std::move(cursors), request, result); if (keepCursor) { cursorFreer.Dismiss(); } @@ -555,7 +624,7 @@ Status runAggregate(OperationContext* opCtx, if (!expCtx->explain) { PlanSummaryStats stats; - Explain::getSummaryStats(*(pin.getCursor()->getExecutor()), &stats); + Explain::getSummaryStats(*(pins[0].getCursor()->getExecutor()), &stats); curOp->debug().setPlanSummaryMetrics(stats); curOp->debug().nreturned = stats.nReturned; } |