summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/run_aggregate.cpp
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@10gen.com>2018-06-20 13:33:34 -0400
committerMartin Neupauer <martin.neupauer@mongodb.com>2018-07-06 15:02:32 -0400
commitfda766f6be1a20fa28ce361511bc62e5c995186b (patch)
tree4289bb82ff7ce5ca55bc9f33915ff3205431dedf /src/mongo/db/commands/run_aggregate.cpp
parent5b2739dbff77811dbbfbccbc8a7ca8b973c8525f (diff)
downloadmongo-fda766f6be1a20fa28ce361511bc62e5c995186b.tar.gz
SERVER-35894 The initial implementation of the producer document source
for the exchange operator. SERVER-35940 Remove IntrusiveCounter in favor of RefCountable.
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp137
1 files changed, 103 insertions, 34 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index f4a97b7cfef..5e00d3f867c 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_exchange.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
@@ -88,14 +89,48 @@ namespace {
*/
bool handleCursorCommand(OperationContext* opCtx,
const NamespaceString& nsForCursor,
- ClientCursor* cursor,
+ std::vector<ClientCursor*> cursors,
const AggregationRequest& request,
BSONObjBuilder& result) {
- invariant(cursor);
-
+ invariant(!cursors.empty());
long long batchSize = request.getBatchSize();
+ if (cursors.size() > 1) {
+
+ uassert(
+ ErrorCodes::BadValue, "the exchange initial batch size must be zero", batchSize == 0);
+
+ BSONArrayBuilder cursorsBuilder;
+ for (size_t idx = 0; idx < cursors.size(); ++idx) {
+ invariant(cursors[idx]);
+
+ BSONObjBuilder cursorResult;
+ appendCursorResponseObject(
+ cursors[idx]->cursorid(), nsForCursor.ns(), BSONArray(), &cursorResult);
+ cursorResult.appendBool("ok", 1);
+
+ cursorsBuilder.append(cursorResult.obj());
+
+ // If a time limit was set on the pipeline, remaining time is "rolled over" to the
+ // cursor (for use by future getmore ops).
+ cursors[idx]->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
+
+ // Cursor needs to be in a saved state while we yield locks for getmore. State
+ // will be restored in getMore().
+ cursors[idx]->getExecutor()->saveState();
+ cursors[idx]->getExecutor()->detachFromOperationContext();
+ }
+
+ result.appendArray("cursors", cursorsBuilder.obj());
+
+ return true;
+ }
+
CursorResponseBuilder responseBuilder(true, &result);
+
+ ClientCursor* cursor = cursors[0];
+ invariant(cursor);
+
BSONObj next;
for (int objCount = 0; objCount < batchSize; objCount++) {
// The initial getNext() on a PipelineProxyStage may be very expensive so we don't
@@ -322,9 +357,8 @@ Status runAggregate(OperationContext* opCtx,
// streams, this will be the UUID of the original namespace instead of the oplog namespace.
boost::optional<UUID> uuid;
- unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
+ std::vector<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs;
boost::intrusive_ptr<ExpressionContext> expCtx;
- Pipeline* unownedPipeline;
auto curOp = CurOp::get(opCtx);
{
const LiteParsedPipeline liteParsedPipeline(request);
@@ -498,22 +532,43 @@ Status runAggregate(OperationContext* opCtx,
// this process uses the correct collation if it does any string comparisons.
pipeline->optimizePipeline();
- // Transfer ownership of the Pipeline to the PipelineProxyStage.
- unownedPipeline = pipeline.get();
- auto ws = make_unique<WorkingSet>();
- auto proxy = make_unique<PipelineProxyStage>(opCtx, std::move(pipeline), ws.get());
+ std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> pipelines;
- // This PlanExecutor will simply forward requests to the Pipeline, so does not need to
- // yield or to be registered with any collection's CursorManager to receive invalidations.
- // The Pipeline may contain PlanExecutors which *are* yielding PlanExecutors and which *are*
- // registered with their respective collection's CursorManager
- auto statusWithPlanExecutor =
- PlanExecutor::make(opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::NO_YIELD);
- invariant(statusWithPlanExecutor.isOK());
- exec = std::move(statusWithPlanExecutor.getValue());
+ pipelines.emplace_back(std::move(pipeline));
+
+ auto exchange =
+ dynamic_cast<DocumentSourceExchange*>(pipelines[0]->getSources().back().get());
+ if (exchange) {
+ for (size_t idx = 1; idx < exchange->getConsumers(); ++idx) {
+ auto sources = pipelines[0]->getSources();
+ sources.back() = new DocumentSourceExchange(expCtx, exchange->getExchange(), idx);
+ pipelines.emplace_back(
+ uassertStatusOK(Pipeline::create(std::move(sources), expCtx)));
+ }
+ }
+
+ // TODO we will revisit the current vector of pipelines design when we will implement
+ // plan summaries, explains, etc.
+ for (size_t idx = 0; idx < pipelines.size(); ++idx) {
+ // Transfer ownership of the Pipeline to the PipelineProxyStage.
+ auto ws = make_unique<WorkingSet>();
+ auto proxy =
+ make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get());
+
+ // This PlanExecutor will simply forward requests to the Pipeline, so does not need to
+ // yield or to be registered with any collection's CursorManager to receive
+ // invalidations. The Pipeline may contain PlanExecutors which *are* yielding
+ // PlanExecutors and which *are* registered with their respective collection's
+ // CursorManager
+
+ auto statusWithPlanExecutor = PlanExecutor::make(
+ opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::NO_YIELD);
+ invariant(statusWithPlanExecutor.isOK());
+ execs.emplace_back(std::move(statusWithPlanExecutor.getValue()));
+ }
{
- auto planSummary = Explain::getPlanSummary(exec.get());
+ auto planSummary = Explain::getPlanSummary(execs[0].get());
stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setPlanSummary_inlock(std::move(planSummary));
}
@@ -524,30 +579,44 @@ Status runAggregate(OperationContext* opCtx,
// cursor manager. The global cursor manager does not deliver invalidations or kill
// notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving
// invalidations and kill notifications themselves, not the cursor we create here.
- ClientCursorParams cursorParams(
- std::move(exec),
- origNss,
- AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- repl::ReadConcernArgs::get(opCtx).getLevel(),
- cmdObj);
- if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
- cursorParams.setTailable(true);
- cursorParams.setAwaitData(true);
- }
- auto pin =
- CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams));
+ std::vector<ClientCursorPin> pins;
+ std::vector<ClientCursor*> cursors;
+
+ ScopeGuard cursorFreer = MakeGuard(
+ [](std::vector<ClientCursorPin>* pins) {
+ for (auto& p : *pins) {
+ p.deleteUnderlying();
+ }
+ },
+ &pins);
+
+ for (size_t idx = 0; idx < execs.size(); ++idx) {
+ ClientCursorParams cursorParams(
+ std::move(execs[idx]),
+ origNss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ repl::ReadConcernArgs::get(opCtx).getLevel(),
+ cmdObj);
+ if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ cursorParams.setTailable(true);
+ cursorParams.setAwaitData(true);
+ }
- ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin);
+ auto pin =
+ CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams));
+ cursors.emplace_back(pin.getCursor());
+ pins.emplace_back(std::move(pin));
+ }
// If both explain and cursor are specified, explain wins.
if (expCtx->explain) {
Explain::explainPipelineExecutor(
- pin.getCursor()->getExecutor(), *(expCtx->explain), &result);
+ pins[0].getCursor()->getExecutor(), *(expCtx->explain), &result);
} else {
// Cursor must be specified, if explain is not.
const bool keepCursor =
- handleCursorCommand(opCtx, origNss, pin.getCursor(), request, result);
+ handleCursorCommand(opCtx, origNss, std::move(cursors), request, result);
if (keepCursor) {
cursorFreer.Dismiss();
}
@@ -555,7 +624,7 @@ Status runAggregate(OperationContext* opCtx,
if (!expCtx->explain) {
PlanSummaryStats stats;
- Explain::getSummaryStats(*(pin.getCursor()->getExecutor()), &stats);
+ Explain::getSummaryStats(*(pins[0].getCursor()->getExecutor()), &stats);
curOp->debug().setPlanSummaryMetrics(stats);
curOp->debug().nreturned = stats.nReturned;
}