summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp137
-rw-r--r--src/mongo/db/pipeline/SConscript3
-rw-r--r--src/mongo/db/pipeline/document_source.h2
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.cpp274
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.h172
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.idl71
-rw-r--r--src/mongo/db/pipeline/document_source_exchange_test.cpp329
-rw-r--r--src/mongo/db/query/plan_executor.h4
9 files changed, 958 insertions, 36 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index f4a97b7cfef..5e00d3f867c 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_exchange.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
@@ -88,14 +89,48 @@ namespace {
*/
bool handleCursorCommand(OperationContext* opCtx,
const NamespaceString& nsForCursor,
- ClientCursor* cursor,
+ std::vector<ClientCursor*> cursors,
const AggregationRequest& request,
BSONObjBuilder& result) {
- invariant(cursor);
-
+ invariant(!cursors.empty());
long long batchSize = request.getBatchSize();
+ if (cursors.size() > 1) {
+
+ uassert(
+ ErrorCodes::BadValue, "the exchange initial batch size must be zero", batchSize == 0);
+
+ BSONArrayBuilder cursorsBuilder;
+ for (size_t idx = 0; idx < cursors.size(); ++idx) {
+ invariant(cursors[idx]);
+
+ BSONObjBuilder cursorResult;
+ appendCursorResponseObject(
+ cursors[idx]->cursorid(), nsForCursor.ns(), BSONArray(), &cursorResult);
+ cursorResult.appendBool("ok", 1);
+
+ cursorsBuilder.append(cursorResult.obj());
+
+ // If a time limit was set on the pipeline, remaining time is "rolled over" to the
+ // cursor (for use by future getmore ops).
+ cursors[idx]->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
+
+ // Cursor needs to be in a saved state while we yield locks for getmore. State
+ // will be restored in getMore().
+ cursors[idx]->getExecutor()->saveState();
+ cursors[idx]->getExecutor()->detachFromOperationContext();
+ }
+
+ result.appendArray("cursors", cursorsBuilder.obj());
+
+ return true;
+ }
+
CursorResponseBuilder responseBuilder(true, &result);
+
+ ClientCursor* cursor = cursors[0];
+ invariant(cursor);
+
BSONObj next;
for (int objCount = 0; objCount < batchSize; objCount++) {
// The initial getNext() on a PipelineProxyStage may be very expensive so we don't
@@ -322,9 +357,8 @@ Status runAggregate(OperationContext* opCtx,
// streams, this will be the UUID of the original namespace instead of the oplog namespace.
boost::optional<UUID> uuid;
- unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
+ std::vector<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs;
boost::intrusive_ptr<ExpressionContext> expCtx;
- Pipeline* unownedPipeline;
auto curOp = CurOp::get(opCtx);
{
const LiteParsedPipeline liteParsedPipeline(request);
@@ -498,22 +532,43 @@ Status runAggregate(OperationContext* opCtx,
// this process uses the correct collation if it does any string comparisons.
pipeline->optimizePipeline();
- // Transfer ownership of the Pipeline to the PipelineProxyStage.
- unownedPipeline = pipeline.get();
- auto ws = make_unique<WorkingSet>();
- auto proxy = make_unique<PipelineProxyStage>(opCtx, std::move(pipeline), ws.get());
+ std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> pipelines;
- // This PlanExecutor will simply forward requests to the Pipeline, so does not need to
- // yield or to be registered with any collection's CursorManager to receive invalidations.
- // The Pipeline may contain PlanExecutors which *are* yielding PlanExecutors and which *are*
- // registered with their respective collection's CursorManager
- auto statusWithPlanExecutor =
- PlanExecutor::make(opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::NO_YIELD);
- invariant(statusWithPlanExecutor.isOK());
- exec = std::move(statusWithPlanExecutor.getValue());
+ pipelines.emplace_back(std::move(pipeline));
+
+ auto exchange =
+ dynamic_cast<DocumentSourceExchange*>(pipelines[0]->getSources().back().get());
+ if (exchange) {
+ for (size_t idx = 1; idx < exchange->getConsumers(); ++idx) {
+ auto sources = pipelines[0]->getSources();
+ sources.back() = new DocumentSourceExchange(expCtx, exchange->getExchange(), idx);
+ pipelines.emplace_back(
+ uassertStatusOK(Pipeline::create(std::move(sources), expCtx)));
+ }
+ }
+
+ // TODO we will revisit the current vector of pipelines design when we will implement
+ // plan summaries, explains, etc.
+ for (size_t idx = 0; idx < pipelines.size(); ++idx) {
+ // Transfer ownership of the Pipeline to the PipelineProxyStage.
+ auto ws = make_unique<WorkingSet>();
+ auto proxy =
+ make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get());
+
+ // This PlanExecutor will simply forward requests to the Pipeline, so does not need to
+ // yield or to be registered with any collection's CursorManager to receive
+ // invalidations. The Pipeline may contain PlanExecutors which *are* yielding
+ // PlanExecutors and which *are* registered with their respective collection's
+ // CursorManager
+
+ auto statusWithPlanExecutor = PlanExecutor::make(
+ opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::NO_YIELD);
+ invariant(statusWithPlanExecutor.isOK());
+ execs.emplace_back(std::move(statusWithPlanExecutor.getValue()));
+ }
{
- auto planSummary = Explain::getPlanSummary(exec.get());
+ auto planSummary = Explain::getPlanSummary(execs[0].get());
stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setPlanSummary_inlock(std::move(planSummary));
}
@@ -524,30 +579,44 @@ Status runAggregate(OperationContext* opCtx,
// cursor manager. The global cursor manager does not deliver invalidations or kill
// notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving
// invalidations and kill notifications themselves, not the cursor we create here.
- ClientCursorParams cursorParams(
- std::move(exec),
- origNss,
- AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- repl::ReadConcernArgs::get(opCtx).getLevel(),
- cmdObj);
- if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
- cursorParams.setTailable(true);
- cursorParams.setAwaitData(true);
- }
- auto pin =
- CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams));
+ std::vector<ClientCursorPin> pins;
+ std::vector<ClientCursor*> cursors;
+
+ ScopeGuard cursorFreer = MakeGuard(
+ [](std::vector<ClientCursorPin>* pins) {
+ for (auto& p : *pins) {
+ p.deleteUnderlying();
+ }
+ },
+ &pins);
+
+ for (size_t idx = 0; idx < execs.size(); ++idx) {
+ ClientCursorParams cursorParams(
+ std::move(execs[idx]),
+ origNss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ repl::ReadConcernArgs::get(opCtx).getLevel(),
+ cmdObj);
+ if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ cursorParams.setTailable(true);
+ cursorParams.setAwaitData(true);
+ }
- ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin);
+ auto pin =
+ CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams));
+ cursors.emplace_back(pin.getCursor());
+ pins.emplace_back(std::move(pin));
+ }
// If both explain and cursor are specified, explain wins.
if (expCtx->explain) {
Explain::explainPipelineExecutor(
- pin.getCursor()->getExecutor(), *(expCtx->explain), &result);
+ pins[0].getCursor()->getExecutor(), *(expCtx->explain), &result);
} else {
// Cursor must be specified, if explain is not.
const bool keepCursor =
- handleCursorCommand(opCtx, origNss, pin.getCursor(), request, result);
+ handleCursorCommand(opCtx, origNss, std::move(cursors), request, result);
if (keepCursor) {
cursorFreer.Dismiss();
}
@@ -555,7 +624,7 @@ Status runAggregate(OperationContext* opCtx,
if (!expCtx->explain) {
PlanSummaryStats stats;
- Explain::getSummaryStats(*(pin.getCursor()->getExecutor()), &stats);
+ Explain::getSummaryStats(*(pins[0].getCursor()->getExecutor()), &stats);
curOp->debug().setPlanSummaryMetrics(stats);
curOp->debug().nreturned = stats.nReturned;
}
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 813a9bd05a1..6ca079e6b8f 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -202,6 +202,7 @@ env.CppUnitTest(
'document_source_check_resume_token_test.cpp',
'document_source_count_test.cpp',
'document_source_current_op_test.cpp',
+ 'document_source_exchange_test.cpp',
'document_source_geo_near_test.cpp',
'document_source_graph_lookup_test.cpp',
'document_source_group_test.cpp',
@@ -277,6 +278,7 @@ pipelineeEnv.Library(
'document_source_coll_stats.cpp',
'document_source_count.cpp',
'document_source_current_op.cpp',
+ 'document_source_exchange.cpp',
'document_source_facet.cpp',
'document_source_geo_near.cpp',
'document_source_graph_lookup.cpp',
@@ -490,6 +492,7 @@ env.Library(
env.Idlc('document_source_change_stream.idl')[0],
env.Idlc('document_source_list_sessions.idl')[0],
env.Idlc('document_source_out.idl')[0],
+ env.Idlc('document_source_exchange.idl')[0],
'resume_token.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 7555f8a321d..580a2fabc93 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -125,7 +125,7 @@ class Document;
return Status::OK(); \
}
-class DocumentSource : public IntrusiveCounterUnsigned {
+class DocumentSource : public RefCountable {
public:
using Parser = stdx::function<std::list<boost::intrusive_ptr<DocumentSource>>(
BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>;
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index aeb16094abb..24f9be39320 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -231,7 +231,7 @@ Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity>
}
void DocumentSourceCursor::detachFromOperationContext() {
- if (_exec) {
+ if (_exec && !_exec->isDetached()) {
_exec->detachFromOperationContext();
}
}
diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp
new file mode 100644
index 00000000000..94dc7920b0a
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_exchange.cpp
@@ -0,0 +1,274 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include <algorithm>
+#include <iterator>
+
+#include "mongo/db/pipeline/document_source_exchange.h"
+#include "mongo/db/storage/key_string.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+REGISTER_DOCUMENT_SOURCE(exchange,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceExchange::createFromBson);
+
+const char* DocumentSourceExchange::getSourceName() const {
+ return "$exchange";
+}
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceExchange::createFromBson(
+ BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "$exchange options must be specified in an object, but found: "
+ << typeName(spec.type()),
+ spec.type() == BSONType::Object);
+
+ IDLParserErrorContext ctx("$exchange");
+ auto parsed = ExchangeSpec::parse(ctx, spec.embeddedObject());
+
+ boost::intrusive_ptr<Exchange> exchange = new Exchange(parsed);
+
+ return new DocumentSourceExchange(pExpCtx, exchange, 0);
+}
+
+Value DocumentSourceExchange::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
+ return Value(DOC(getSourceName() << _exchange->getSpec().toBSON()));
+}
+
+DocumentSourceExchange::DocumentSourceExchange(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<Exchange> exchange,
+ size_t consumerId)
+ : DocumentSource(expCtx), _exchange(exchange), _consumerId(consumerId) {}
+
+DocumentSource::GetNextResult DocumentSourceExchange::getNext() {
+ return _exchange->getNext(_consumerId);
+}
+
+Exchange::Exchange(const ExchangeSpec& spec)
+ : _spec(spec),
+ _keyPattern(spec.getKey().getOwned()),
+ _boundaries(extractBoundaries(spec.getBoundaries())),
+ _policy(spec.getPolicy()),
+ _orderPreserving(spec.getOrderPreserving()),
+ _maxBufferSize(spec.getBufferSize()) {
+ for (int idx = 0; idx < spec.getConsumers(); ++idx) {
+ _consumers.emplace_back(std::make_unique<ExchangeBuffer>());
+ }
+
+ if (_policy == ExchangePolicyEnum::kRange) {
+ uassert(ErrorCodes::BadValue,
+ "$exchange boundaries do not much number of consumers.",
+ getConsumers() + 1 == _boundaries.size());
+ } else if (_policy == ExchangePolicyEnum::kHash) {
+ uasserted(ErrorCodes::BadValue, "$exchange hash is not yet implemented.");
+ }
+}
+
+std::vector<std::string> Exchange::extractBoundaries(
+ const boost::optional<std::vector<BSONObj>>& obj) {
+ std::vector<std::string> ret;
+
+ if (!obj) {
+ return ret;
+ }
+
+ for (auto& b : *obj) {
+ // Build the key.
+ BSONObjBuilder kb;
+ for (auto elem : b) {
+ kb << "" << elem;
+ }
+
+ KeyString key{KeyString::Version::V1, kb.obj(), Ordering::make(BSONObj())};
+ std::string keyStr{key.getBuffer(), key.getSize()};
+
+ ret.emplace_back(std::move(keyStr));
+ }
+
+ for (size_t idx = 1; idx < ret.size(); ++idx) {
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "$exchange range boundaries are not in ascending order.",
+ ret[idx - 1] < ret[idx]);
+ }
+ return ret;
+}
+
+DocumentSource::GetNextResult Exchange::getNext(size_t consumerId) {
+ // Grab a lock.
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ for (;;) {
+ // Check if we have a document.
+ if (!_consumers[consumerId]->isEmpty()) {
+ auto doc = _consumers[consumerId]->getNext();
+
+ // See if the loading is blocked on this consumer and if so unblock it.
+ if (_loadingThreadId == consumerId) {
+ _loadingThreadId = kInvalidThreadId;
+ _haveBufferSpace.notify_all();
+ }
+
+ return doc;
+ }
+
+ // There is not any document so try to load more from the source.
+ if (_loadingThreadId == kInvalidThreadId) {
+ LOG(3) << "A consumer " << consumerId << " begins loading";
+
+ // This consumer won the race and will fill the buffers.
+ _loadingThreadId = consumerId;
+
+ // This will return when some exchange buffer is full and we cannot make any forward
+ // progress anymore.
+ // The return value is an index of a full consumer buffer.
+ size_t fullConsumerId = loadNextBatch();
+
+ // The loading cannot continue until the consumer with the full buffer consumes some
+ // documents.
+ _loadingThreadId = fullConsumerId;
+
+ // Wake up everybody and try to make some progress.
+ _haveBufferSpace.notify_all();
+ } else {
+ // Some other consumer is already loading the buffers. There is nothing else we can do
+ // but wait.
+ _haveBufferSpace.wait(lk);
+ }
+ }
+}
+
+size_t Exchange::loadNextBatch() {
+ auto input = pSource->getNext();
+
+ for (; input.isAdvanced(); input = pSource->getNext()) {
+ // We have a document and we will deliver it to a consumer(s) based on the policy.
+ switch (_policy) {
+ case ExchangePolicyEnum::kBroadcast: {
+ bool full = false;
+ // The document is sent to all consumers.
+ for (auto& c : _consumers) {
+ full = c->appendDocument(input, _maxBufferSize);
+ }
+
+ if (full)
+ return 0;
+ } break;
+ case ExchangePolicyEnum::kRoundRobin: {
+ size_t target = _roundRobinCounter;
+ _roundRobinCounter = (_roundRobinCounter + 1) % _consumers.size();
+
+ if (_consumers[target]->appendDocument(std::move(input), _maxBufferSize))
+ return target;
+ } break;
+ case ExchangePolicyEnum::kRange: {
+ size_t target = getTargetConsumer(input.getDocument());
+ bool full = _consumers[target]->appendDocument(std::move(input), _maxBufferSize);
+ if (full && _orderPreserving) {
+ // TODO send the high watermark here.
+ }
+ if (full)
+ return target;
+ } break;
+ case ExchangePolicyEnum::kHash: {
+ // TODO implement the hash policy. Note that returning 0 is technically correct.
+ size_t target = 0;
+ bool full = _consumers[target]->appendDocument(std::move(input), _maxBufferSize);
+ if (full && _orderPreserving) {
+ // TODO send the high watermark here.
+ }
+ if (full)
+ return target;
+ } break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }
+
+ invariant(input.isEOF());
+
+ // We have reached the end so send EOS to all consumers.
+ for (auto& c : _consumers) {
+ c->appendDocument(input, _maxBufferSize);
+ }
+
+ return kInvalidThreadId;
+}
+
+size_t Exchange::getTargetConsumer(const Document& input) {
+ // Build the key.
+ BSONObjBuilder kb;
+ for (auto elem : _keyPattern) {
+ auto value = input[elem.fieldName()];
+ kb << "" << value;
+ }
+
+ // TODO implement hash keys for the hash policy.
+ KeyString key{KeyString::Version::V1, kb.obj(), Ordering::make(BSONObj())};
+ std::string keyStr{key.getBuffer(), key.getSize()};
+
+ // Binary search for the consumer id.
+ auto it = std::upper_bound(_boundaries.begin(), _boundaries.end(), keyStr);
+ invariant(it != _boundaries.end());
+
+ size_t distance = std::distance(_boundaries.begin(), it) - 1;
+ invariant(distance < _consumers.size());
+
+ return distance;
+}
+
+DocumentSource::GetNextResult Exchange::ExchangeBuffer::getNext() {
+ invariant(!_buffer.empty());
+
+ auto result = std::move(_buffer.front());
+ _buffer.pop_front();
+
+ if (result.isAdvanced()) {
+ _bytesInBuffer -= result.getDocument().getApproximateSize();
+ }
+
+ return result;
+}
+
+bool Exchange::ExchangeBuffer::appendDocument(DocumentSource::GetNextResult input, size_t limit) {
+ if (input.isAdvanced()) {
+ _bytesInBuffer += input.getDocument().getApproximateSize();
+ }
+ _buffer.push_back(std::move(input));
+
+ // The buffer is full.
+ return _bytesInBuffer >= limit;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h
new file mode 100644
index 00000000000..3457cf6e4fa
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_exchange.h
@@ -0,0 +1,172 @@
+/**
+ * Copyright (C) 2018 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <deque>
+#include <vector>
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_exchange_gen.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+
+namespace mongo {
+
+class Exchange : public RefCountable {
+ static constexpr size_t kInvalidThreadId{std::numeric_limits<size_t>::max()};
+
+ static std::vector<std::string> extractBoundaries(
+ const boost::optional<std::vector<BSONObj>>& obj);
+
+public:
+ explicit Exchange(const ExchangeSpec& spec);
+ DocumentSource::GetNextResult getNext(size_t consumerId);
+
+ size_t getConsumers() const {
+ return _consumers.size();
+ }
+
+ void setSource(DocumentSource* source) {
+ pSource = source;
+ }
+
+ const auto& getSpec() const {
+ return _spec;
+ }
+
+private:
+ size_t loadNextBatch();
+
+ size_t getTargetConsumer(const Document& input);
+
+ class ExchangeBuffer {
+ public:
+ bool appendDocument(DocumentSource::GetNextResult input, size_t limit);
+ DocumentSource::GetNextResult getNext();
+ bool isEmpty() const {
+ return _buffer.empty();
+ }
+
+ private:
+ size_t _bytesInBuffer{0};
+ std::deque<DocumentSource::GetNextResult> _buffer;
+ };
+
+ // Keep a copy of the spec for serialization purposes.
+ const ExchangeSpec _spec;
+
+ // A pattern for extracting a key from a document used by range and hash policies.
+ const BSONObj _keyPattern;
+
+ // Range boundaries.
+ const std::vector<std::string> _boundaries;
+
+ // A policy that tells how to distribute input documents to consumers.
+ const ExchangePolicyEnum _policy;
+
+ // If set to true then a producer sends special 'high watermark' documents to consumers in order
+ // to prevent deadlocks.
+ const bool _orderPreserving;
+
+ // A maximum size of buffer per consumer.
+ const size_t _maxBufferSize;
+
+ // An input to the exchange operator
+ DocumentSource* pSource;
+
+ // Synchronization.
+ stdx::mutex _mutex;
+ stdx::condition_variable _haveBufferSpace;
+
+ // A thread that is currently loading the exchange buffers.
+ size_t _loadingThreadId{kInvalidThreadId};
+
+ size_t _roundRobinCounter{0};
+
+ std::vector<std::unique_ptr<ExchangeBuffer>> _consumers;
+};
+
+class DocumentSourceExchange final : public DocumentSource, public NeedsMergerDocumentSource {
+public:
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ DocumentSourceExchange(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<Exchange> exchange,
+ size_t consumerId);
+
+ GetNextResult getNext() final;
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
+ }
+
+ const char* getSourceName() const final;
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return this;
+ }
+ std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
+ // TODO SERVER-35974 we have to revisit this when we implement consumers.
+ return {this};
+ }
+
+ /**
+ * Set the underlying source this source should use to get Documents from. Must not throw
+ * exceptions.
+ */
+ void setSource(DocumentSource* source) final {
+ DocumentSource::setSource(source);
+ _exchange->setSource(source);
+ }
+
+ GetNextResult getNext(size_t consumerId);
+
+ size_t getConsumers() const {
+ return _exchange->getConsumers();
+ }
+
+ auto getExchange() const {
+ return _exchange;
+ }
+
+private:
+ boost::intrusive_ptr<Exchange> _exchange;
+
+ const size_t _consumerId;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_exchange.idl b/src/mongo/db/pipeline/document_source_exchange.idl
new file mode 100644
index 00000000000..b2ffd23d41f
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_exchange.idl
@@ -0,0 +1,71 @@
+# Copyright (C) 2018 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the GNU Affero General Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+# Document source exchange stage IDL file
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+enums:
+ ExchangePolicy:
+ description: "The type of an exchange distribution policy"
+ type: string
+ values:
+ kBroadcast: "broadcast"
+ kRoundRobin: "roundrobin"
+ kRange: "range"
+ kHash: "hash"
+
+structs:
+ ExchangeSpec:
+ description: "$exchange operator spec"
+ fields:
+ policy:
+ type: ExchangePolicy
+ description: A string indicating a policy of how documents are distributed to consumers.
+ consumers:
+ type: int
+ description: Number of consumers.
+ orderPreserving:
+ type: bool
+ default: false
+ description: A flag indicating documents are merged while preserving the order.
+ bufferSize:
+ type: int
+ default: 16777216
+ description: The size of exchange buffers.
+ key:
+ type: object
+ default: "BSONObj()"
+ description: A key used for document distribution to consumers. The same description as sorting/sharding.
+ boundaries:
+ type: array<object>
+ optional: true
+ description: Range/hash split points.
+
diff --git a/src/mongo/db/pipeline/document_source_exchange_test.cpp b/src/mongo/db/pipeline/document_source_exchange_test.cpp
new file mode 100644
index 00000000000..81d25527fa4
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_exchange_test.cpp
@@ -0,0 +1,329 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_exchange.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/storage/key_string.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/platform/random.h"
+#include "mongo/unittest/temp_dir.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/time_support.h"
+
+
+namespace mongo {
+
+class DocumentSourceExchangeTest : public AggregationContextFixture {
+protected:
+ std::unique_ptr<executor::TaskExecutor> _executor;
+ virtual void setUp() override {
+ auto net = executor::makeNetworkInterface("ExchangeTest");
+
+ ThreadPool::Options options;
+ auto pool = std::make_unique<ThreadPool>(options);
+
+ _executor =
+ std::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ _executor->startup();
+ }
+
+ virtual void tearDown() override {
+ _executor->shutdown();
+ _executor.reset();
+ }
+
+ auto getMockSource(int cnt) {
+ auto source = DocumentSourceMock::create();
+ for (int i = 0; i < cnt; ++i)
+ source->queue.emplace_back(Document{{"a", i}, {"b", "aaaaaaaaaaaaaaaaaaaaaaaaaaa"_sd}});
+
+ return source;
+ }
+
+ static auto getNewSeed() {
+ auto seed = Date_t::now().asInt64();
+ unittest::log() << "Generated new seed is " << seed;
+
+ return seed;
+ }
+
+ auto getRandomMockSource(size_t cnt, int64_t seed) {
+ PseudoRandom prng(seed);
+
+ auto source = DocumentSourceMock::create();
+ for (size_t i = 0; i < cnt; ++i)
+ source->queue.emplace_back(Document{{"a", static_cast<int>(prng.nextInt32() % cnt)},
+ {"b", "aaaaaaaaaaaaaaaaaaaaaaaaaaa"_sd}});
+
+ return source;
+ }
+};
+
+TEST_F(DocumentSourceExchangeTest, SimpleExchange1Consumer) {
+ const size_t nDocs = 500;
+
+ auto source = getMockSource(nDocs);
+
+ ExchangeSpec spec;
+ spec.setPolicy(ExchangePolicyEnum::kRoundRobin);
+ spec.setConsumers(1);
+ spec.setBufferSize(1024);
+
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec);
+
+ ex->setSource(source.get());
+
+ auto input = ex->getNext(0);
+
+ size_t docs = 0;
+ for (; input.isAdvanced(); input = ex->getNext(0)) {
+ ++docs;
+ }
+
+ ASSERT_EQ(docs, nDocs);
+}
+
+TEST_F(DocumentSourceExchangeTest, SimpleExchangeNConsumer) {
+ const size_t nDocs = 500;
+ auto source = getMockSource(500);
+
+ const size_t nConsumers = 5;
+
+ ASSERT_EQ(nDocs % nConsumers, 0u);
+
+ ExchangeSpec spec;
+ spec.setPolicy(ExchangePolicyEnum::kRoundRobin);
+ spec.setConsumers(nConsumers);
+ spec.setBufferSize(1024);
+
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec);
+
+ std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
+
+ for (size_t idx = 0; idx < nConsumers; ++idx) {
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.back()->setSource(source.get());
+ }
+
+ std::vector<executor::TaskExecutor::CallbackHandle> handles;
+
+ for (size_t id = 0; id < nConsumers; ++id) {
+ auto handle = _executor->scheduleWork(
+ [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) {
+ PseudoRandom prng(getNewSeed());
+
+ auto input = prods[id]->getNext();
+
+ size_t docs = 0;
+
+ for (; input.isAdvanced(); input = prods[id]->getNext()) {
+ sleepmillis(prng.nextInt32() % 20 + 1);
+ ++docs;
+ }
+ ASSERT_EQ(docs, nDocs / nConsumers);
+ });
+
+ handles.emplace_back(std::move(handle.getValue()));
+ }
+
+ for (auto& h : handles)
+ _executor->wait(h);
+}
+
+TEST_F(DocumentSourceExchangeTest, BroadcastExchangeNConsumer) {
+ const size_t nDocs = 500;
+ auto source = getMockSource(nDocs);
+
+ const size_t nConsumers = 5;
+
+ ExchangeSpec spec;
+ spec.setPolicy(ExchangePolicyEnum::kBroadcast);
+ spec.setConsumers(nConsumers);
+ spec.setBufferSize(1024);
+
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec);
+
+ std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
+
+ for (size_t idx = 0; idx < nConsumers; ++idx) {
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.back()->setSource(source.get());
+ }
+
+ std::vector<executor::TaskExecutor::CallbackHandle> handles;
+
+ for (size_t id = 0; id < nConsumers; ++id) {
+ auto handle = _executor->scheduleWork(
+ [prods, id, nDocs](const executor::TaskExecutor::CallbackArgs& cb) {
+ size_t docs = 0;
+ for (auto input = prods[id]->getNext(); input.isAdvanced();
+ input = prods[id]->getNext()) {
+ ++docs;
+ }
+ ASSERT_EQ(docs, nDocs);
+ });
+
+ handles.emplace_back(std::move(handle.getValue()));
+ }
+
+ for (auto& h : handles)
+ _executor->wait(h);
+}
+
+TEST_F(DocumentSourceExchangeTest, RangeExchangeNConsumer) {
+ const size_t nDocs = 500;
+ auto source = getMockSource(nDocs);
+
+ std::vector<BSONObj> boundaries;
+ boundaries.push_back(BSON("a" << MINKEY));
+ boundaries.push_back(BSON("a" << 100));
+ boundaries.push_back(BSON("a" << 200));
+ boundaries.push_back(BSON("a" << 300));
+ boundaries.push_back(BSON("a" << 400));
+ boundaries.push_back(BSON("a" << MAXKEY));
+
+ const size_t nConsumers = boundaries.size() - 1;
+
+ ASSERT(nDocs % nConsumers == 0);
+
+ ExchangeSpec spec;
+ spec.setPolicy(ExchangePolicyEnum::kRange);
+ spec.setKey(BSON("a" << 1));
+ spec.setBoundaries(boundaries);
+ spec.setConsumers(nConsumers);
+ spec.setBufferSize(1024);
+
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec);
+
+ std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
+
+ for (size_t idx = 0; idx < nConsumers; ++idx) {
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.back()->setSource(source.get());
+ }
+
+ std::vector<executor::TaskExecutor::CallbackHandle> handles;
+
+ for (size_t id = 0; id < nConsumers; ++id) {
+ auto handle = _executor->scheduleWork(
+ [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) {
+ size_t docs = 0;
+ for (auto input = prods[id]->getNext(); input.isAdvanced();
+ input = prods[id]->getNext()) {
+ size_t value = input.getDocument()["a"].getInt();
+
+ ASSERT(value >= id * 100);
+ ASSERT(value < (id + 1) * 100);
+
+ ++docs;
+ }
+
+ ASSERT_EQ(docs, nDocs / nConsumers);
+ });
+
+ handles.emplace_back(std::move(handle.getValue()));
+ }
+
+ for (auto& h : handles)
+ _executor->wait(h);
+}
+
+TEST_F(DocumentSourceExchangeTest, RangeRandomExchangeNConsumer) {
+ const size_t nDocs = 500;
+ auto source = getRandomMockSource(nDocs, getNewSeed());
+
+ std::vector<BSONObj> boundaries;
+ boundaries.push_back(BSON("a" << MINKEY));
+ boundaries.push_back(BSON("a" << 100));
+ boundaries.push_back(BSON("a" << 200));
+ boundaries.push_back(BSON("a" << 300));
+ boundaries.push_back(BSON("a" << 400));
+ boundaries.push_back(BSON("a" << MAXKEY));
+
+ const size_t nConsumers = boundaries.size() - 1;
+
+ ASSERT(nDocs % nConsumers == 0);
+
+ ExchangeSpec spec;
+ spec.setPolicy(ExchangePolicyEnum::kRange);
+ spec.setKey(BSON("a" << 1));
+ spec.setBoundaries(boundaries);
+ spec.setConsumers(nConsumers);
+ spec.setBufferSize(1024);
+
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec);
+
+ std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
+
+ for (size_t idx = 0; idx < nConsumers; ++idx) {
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.back()->setSource(source.get());
+ }
+
+ std::vector<executor::TaskExecutor::CallbackHandle> handles;
+
+ AtomicWord<size_t> processedDocs{0};
+
+ for (size_t id = 0; id < nConsumers; ++id) {
+ auto handle = _executor->scheduleWork(
+ [prods, id, &processedDocs](const executor::TaskExecutor::CallbackArgs& cb) {
+ PseudoRandom prng(getNewSeed());
+
+ auto input = prods[id]->getNext();
+
+ size_t docs = 0;
+ for (; input.isAdvanced(); input = prods[id]->getNext()) {
+ size_t value = input.getDocument()["a"].getInt();
+
+ ASSERT(value >= id * 100);
+ ASSERT(value < (id + 1) * 100);
+
+ ++docs;
+
+ // This helps randomizing thread scheduling forcing different threads to load
+ // buffers. The sleep API is inherently imprecise so we cannot guarantee 100%
+ // reproducibility.
+ sleepmillis(prng.nextInt32() % 50 + 1);
+ }
+ processedDocs.fetchAndAdd(docs);
+ });
+
+ handles.emplace_back(std::move(handle.getValue()));
+ }
+
+ for (auto& h : handles)
+ _executor->wait(h);
+
+ ASSERT_EQ(nDocs, processedDocs.load());
+}
+} // namespace mongo
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index e94bf14e61e..43510d651f5 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -461,6 +461,10 @@ public:
return _currentState == kDisposed;
}
+ bool isDetached() const {
+ return _currentState == kDetached;
+ }
+
/**
* If the last oplog timestamp is being tracked for this PlanExecutor, return it.
* Otherwise return a null timestamp.