diff options
author | Martin Neupauer <martin.neupauer@10gen.com> | 2018-06-20 13:33:34 -0400 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-07-06 15:02:32 -0400 |
commit | fda766f6be1a20fa28ce361511bc62e5c995186b (patch) | |
tree | 4289bb82ff7ce5ca55bc9f33915ff3205431dedf | |
parent | 5b2739dbff77811dbbfbccbc8a7ca8b973c8525f (diff) | |
download | mongo-fda766f6be1a20fa28ce361511bc62e5c995186b.tar.gz |
SERVER-35894 The initial implementation of the producer document source
for the exchange operator.
SERVER-35940 Remove IntrusiveCounter in favor of RefCountable.
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 137 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_exchange.cpp | 274 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_exchange.h | 172 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_exchange.idl | 71 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_exchange_test.cpp | 329 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.h | 4 |
9 files changed, 958 insertions, 36 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index f4a97b7cfef..5e00d3f867c 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -45,6 +45,7 @@ #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_exchange.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" @@ -88,14 +89,48 @@ namespace { */ bool handleCursorCommand(OperationContext* opCtx, const NamespaceString& nsForCursor, - ClientCursor* cursor, + std::vector<ClientCursor*> cursors, const AggregationRequest& request, BSONObjBuilder& result) { - invariant(cursor); - + invariant(!cursors.empty()); long long batchSize = request.getBatchSize(); + if (cursors.size() > 1) { + + uassert( + ErrorCodes::BadValue, "the exchange initial batch size must be zero", batchSize == 0); + + BSONArrayBuilder cursorsBuilder; + for (size_t idx = 0; idx < cursors.size(); ++idx) { + invariant(cursors[idx]); + + BSONObjBuilder cursorResult; + appendCursorResponseObject( + cursors[idx]->cursorid(), nsForCursor.ns(), BSONArray(), &cursorResult); + cursorResult.appendBool("ok", 1); + + cursorsBuilder.append(cursorResult.obj()); + + // If a time limit was set on the pipeline, remaining time is "rolled over" to the + // cursor (for use by future getmore ops). + cursors[idx]->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); + + // Cursor needs to be in a saved state while we yield locks for getmore. State + // will be restored in getMore(). + cursors[idx]->getExecutor()->saveState(); + cursors[idx]->getExecutor()->detachFromOperationContext(); + } + + result.appendArray("cursors", cursorsBuilder.obj()); + + return true; + } + CursorResponseBuilder responseBuilder(true, &result); + + ClientCursor* cursor = cursors[0]; + invariant(cursor); + BSONObj next; for (int objCount = 0; objCount < batchSize; objCount++) { // The initial getNext() on a PipelineProxyStage may be very expensive so we don't @@ -322,9 +357,8 @@ Status runAggregate(OperationContext* opCtx, // streams, this will be the UUID of the original namespace instead of the oplog namespace. boost::optional<UUID> uuid; - unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; + std::vector<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs; boost::intrusive_ptr<ExpressionContext> expCtx; - Pipeline* unownedPipeline; auto curOp = CurOp::get(opCtx); { const LiteParsedPipeline liteParsedPipeline(request); @@ -498,22 +532,43 @@ Status runAggregate(OperationContext* opCtx, // this process uses the correct collation if it does any string comparisons. pipeline->optimizePipeline(); - // Transfer ownership of the Pipeline to the PipelineProxyStage. - unownedPipeline = pipeline.get(); - auto ws = make_unique<WorkingSet>(); - auto proxy = make_unique<PipelineProxyStage>(opCtx, std::move(pipeline), ws.get()); + std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> pipelines; - // This PlanExecutor will simply forward requests to the Pipeline, so does not need to - // yield or to be registered with any collection's CursorManager to receive invalidations. - // The Pipeline may contain PlanExecutors which *are* yielding PlanExecutors and which *are* - // registered with their respective collection's CursorManager - auto statusWithPlanExecutor = - PlanExecutor::make(opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::NO_YIELD); - invariant(statusWithPlanExecutor.isOK()); - exec = std::move(statusWithPlanExecutor.getValue()); + pipelines.emplace_back(std::move(pipeline)); + + auto exchange = + dynamic_cast<DocumentSourceExchange*>(pipelines[0]->getSources().back().get()); + if (exchange) { + for (size_t idx = 1; idx < exchange->getConsumers(); ++idx) { + auto sources = pipelines[0]->getSources(); + sources.back() = new DocumentSourceExchange(expCtx, exchange->getExchange(), idx); + pipelines.emplace_back( + uassertStatusOK(Pipeline::create(std::move(sources), expCtx))); + } + } + + // TODO we will revisit the current vector of pipelines design when we will implement + // plan summaries, explains, etc. + for (size_t idx = 0; idx < pipelines.size(); ++idx) { + // Transfer ownership of the Pipeline to the PipelineProxyStage. + auto ws = make_unique<WorkingSet>(); + auto proxy = + make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get()); + + // This PlanExecutor will simply forward requests to the Pipeline, so does not need to + // yield or to be registered with any collection's CursorManager to receive + // invalidations. The Pipeline may contain PlanExecutors which *are* yielding + // PlanExecutors and which *are* registered with their respective collection's + // CursorManager + + auto statusWithPlanExecutor = PlanExecutor::make( + opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::NO_YIELD); + invariant(statusWithPlanExecutor.isOK()); + execs.emplace_back(std::move(statusWithPlanExecutor.getValue())); + } { - auto planSummary = Explain::getPlanSummary(exec.get()); + auto planSummary = Explain::getPlanSummary(execs[0].get()); stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setPlanSummary_inlock(std::move(planSummary)); } @@ -524,30 +579,44 @@ Status runAggregate(OperationContext* opCtx, // cursor manager. The global cursor manager does not deliver invalidations or kill // notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving // invalidations and kill notifications themselves, not the cursor we create here. - ClientCursorParams cursorParams( - std::move(exec), - origNss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - repl::ReadConcernArgs::get(opCtx).getLevel(), - cmdObj); - if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { - cursorParams.setTailable(true); - cursorParams.setAwaitData(true); - } - auto pin = - CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams)); + std::vector<ClientCursorPin> pins; + std::vector<ClientCursor*> cursors; + + ScopeGuard cursorFreer = MakeGuard( + [](std::vector<ClientCursorPin>* pins) { + for (auto& p : *pins) { + p.deleteUnderlying(); + } + }, + &pins); + + for (size_t idx = 0; idx < execs.size(); ++idx) { + ClientCursorParams cursorParams( + std::move(execs[idx]), + origNss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + repl::ReadConcernArgs::get(opCtx).getLevel(), + cmdObj); + if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { + cursorParams.setTailable(true); + cursorParams.setAwaitData(true); + } - ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin); + auto pin = + CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams)); + cursors.emplace_back(pin.getCursor()); + pins.emplace_back(std::move(pin)); + } // If both explain and cursor are specified, explain wins. if (expCtx->explain) { Explain::explainPipelineExecutor( - pin.getCursor()->getExecutor(), *(expCtx->explain), &result); + pins[0].getCursor()->getExecutor(), *(expCtx->explain), &result); } else { // Cursor must be specified, if explain is not. const bool keepCursor = - handleCursorCommand(opCtx, origNss, pin.getCursor(), request, result); + handleCursorCommand(opCtx, origNss, std::move(cursors), request, result); if (keepCursor) { cursorFreer.Dismiss(); } @@ -555,7 +624,7 @@ Status runAggregate(OperationContext* opCtx, if (!expCtx->explain) { PlanSummaryStats stats; - Explain::getSummaryStats(*(pin.getCursor()->getExecutor()), &stats); + Explain::getSummaryStats(*(pins[0].getCursor()->getExecutor()), &stats); curOp->debug().setPlanSummaryMetrics(stats); curOp->debug().nreturned = stats.nReturned; } diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 813a9bd05a1..6ca079e6b8f 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -202,6 +202,7 @@ env.CppUnitTest( 'document_source_check_resume_token_test.cpp', 'document_source_count_test.cpp', 'document_source_current_op_test.cpp', + 'document_source_exchange_test.cpp', 'document_source_geo_near_test.cpp', 'document_source_graph_lookup_test.cpp', 'document_source_group_test.cpp', @@ -277,6 +278,7 @@ pipelineeEnv.Library( 'document_source_coll_stats.cpp', 'document_source_count.cpp', 'document_source_current_op.cpp', + 'document_source_exchange.cpp', 'document_source_facet.cpp', 'document_source_geo_near.cpp', 'document_source_graph_lookup.cpp', @@ -490,6 +492,7 @@ env.Library( env.Idlc('document_source_change_stream.idl')[0], env.Idlc('document_source_list_sessions.idl')[0], env.Idlc('document_source_out.idl')[0], + env.Idlc('document_source_exchange.idl')[0], 'resume_token.cpp', ], LIBDEPS=[ diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 7555f8a321d..580a2fabc93 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -125,7 +125,7 @@ class Document; return Status::OK(); \ } -class DocumentSource : public IntrusiveCounterUnsigned { +class DocumentSource : public RefCountable { public: using Parser = stdx::function<std::list<boost::intrusive_ptr<DocumentSource>>( BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>; diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index aeb16094abb..24f9be39320 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -231,7 +231,7 @@ Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity> } void DocumentSourceCursor::detachFromOperationContext() { - if (_exec) { + if (_exec && !_exec->isDetached()) { _exec->detachFromOperationContext(); } } diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp new file mode 100644 index 00000000000..94dc7920b0a --- /dev/null +++ b/src/mongo/db/pipeline/document_source_exchange.cpp @@ -0,0 +1,274 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include <algorithm> +#include <iterator> + +#include "mongo/db/pipeline/document_source_exchange.h" +#include "mongo/db/storage/key_string.h" +#include "mongo/util/log.h" + +namespace mongo { + +REGISTER_DOCUMENT_SOURCE(exchange, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceExchange::createFromBson); + +const char* DocumentSourceExchange::getSourceName() const { + return "$exchange"; +} + +boost::intrusive_ptr<DocumentSource> DocumentSourceExchange::createFromBson( + BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "$exchange options must be specified in an object, but found: " + << typeName(spec.type()), + spec.type() == BSONType::Object); + + IDLParserErrorContext ctx("$exchange"); + auto parsed = ExchangeSpec::parse(ctx, spec.embeddedObject()); + + boost::intrusive_ptr<Exchange> exchange = new Exchange(parsed); + + return new DocumentSourceExchange(pExpCtx, exchange, 0); +} + +Value DocumentSourceExchange::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { + return Value(DOC(getSourceName() << _exchange->getSpec().toBSON())); +} + +DocumentSourceExchange::DocumentSourceExchange( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::intrusive_ptr<Exchange> exchange, + size_t consumerId) + : DocumentSource(expCtx), _exchange(exchange), _consumerId(consumerId) {} + +DocumentSource::GetNextResult DocumentSourceExchange::getNext() { + return _exchange->getNext(_consumerId); +} + +Exchange::Exchange(const ExchangeSpec& spec) + : _spec(spec), + _keyPattern(spec.getKey().getOwned()), + _boundaries(extractBoundaries(spec.getBoundaries())), + _policy(spec.getPolicy()), + _orderPreserving(spec.getOrderPreserving()), + _maxBufferSize(spec.getBufferSize()) { + for (int idx = 0; idx < spec.getConsumers(); ++idx) { + _consumers.emplace_back(std::make_unique<ExchangeBuffer>()); + } + + if (_policy == ExchangePolicyEnum::kRange) { + uassert(ErrorCodes::BadValue, + "$exchange boundaries do not much number of consumers.", + getConsumers() + 1 == _boundaries.size()); + } else if (_policy == ExchangePolicyEnum::kHash) { + uasserted(ErrorCodes::BadValue, "$exchange hash is not yet implemented."); + } +} + +std::vector<std::string> Exchange::extractBoundaries( + const boost::optional<std::vector<BSONObj>>& obj) { + std::vector<std::string> ret; + + if (!obj) { + return ret; + } + + for (auto& b : *obj) { + // Build the key. + BSONObjBuilder kb; + for (auto elem : b) { + kb << "" << elem; + } + + KeyString key{KeyString::Version::V1, kb.obj(), Ordering::make(BSONObj())}; + std::string keyStr{key.getBuffer(), key.getSize()}; + + ret.emplace_back(std::move(keyStr)); + } + + for (size_t idx = 1; idx < ret.size(); ++idx) { + uassert(ErrorCodes::BadValue, + str::stream() << "$exchange range boundaries are not in ascending order.", + ret[idx - 1] < ret[idx]); + } + return ret; +} + +DocumentSource::GetNextResult Exchange::getNext(size_t consumerId) { + // Grab a lock. + stdx::unique_lock<stdx::mutex> lk(_mutex); + + for (;;) { + // Check if we have a document. + if (!_consumers[consumerId]->isEmpty()) { + auto doc = _consumers[consumerId]->getNext(); + + // See if the loading is blocked on this consumer and if so unblock it. + if (_loadingThreadId == consumerId) { + _loadingThreadId = kInvalidThreadId; + _haveBufferSpace.notify_all(); + } + + return doc; + } + + // There is not any document so try to load more from the source. + if (_loadingThreadId == kInvalidThreadId) { + LOG(3) << "A consumer " << consumerId << " begins loading"; + + // This consumer won the race and will fill the buffers. + _loadingThreadId = consumerId; + + // This will return when some exchange buffer is full and we cannot make any forward + // progress anymore. + // The return value is an index of a full consumer buffer. + size_t fullConsumerId = loadNextBatch(); + + // The loading cannot continue until the consumer with the full buffer consumes some + // documents. + _loadingThreadId = fullConsumerId; + + // Wake up everybody and try to make some progress. + _haveBufferSpace.notify_all(); + } else { + // Some other consumer is already loading the buffers. There is nothing else we can do + // but wait. + _haveBufferSpace.wait(lk); + } + } +} + +size_t Exchange::loadNextBatch() { + auto input = pSource->getNext(); + + for (; input.isAdvanced(); input = pSource->getNext()) { + // We have a document and we will deliver it to a consumer(s) based on the policy. + switch (_policy) { + case ExchangePolicyEnum::kBroadcast: { + bool full = false; + // The document is sent to all consumers. + for (auto& c : _consumers) { + full = c->appendDocument(input, _maxBufferSize); + } + + if (full) + return 0; + } break; + case ExchangePolicyEnum::kRoundRobin: { + size_t target = _roundRobinCounter; + _roundRobinCounter = (_roundRobinCounter + 1) % _consumers.size(); + + if (_consumers[target]->appendDocument(std::move(input), _maxBufferSize)) + return target; + } break; + case ExchangePolicyEnum::kRange: { + size_t target = getTargetConsumer(input.getDocument()); + bool full = _consumers[target]->appendDocument(std::move(input), _maxBufferSize); + if (full && _orderPreserving) { + // TODO send the high watermark here. + } + if (full) + return target; + } break; + case ExchangePolicyEnum::kHash: { + // TODO implement the hash policy. Note that returning 0 is technically correct. + size_t target = 0; + bool full = _consumers[target]->appendDocument(std::move(input), _maxBufferSize); + if (full && _orderPreserving) { + // TODO send the high watermark here. + } + if (full) + return target; + } break; + default: + MONGO_UNREACHABLE; + } + } + + invariant(input.isEOF()); + + // We have reached the end so send EOS to all consumers. + for (auto& c : _consumers) { + c->appendDocument(input, _maxBufferSize); + } + + return kInvalidThreadId; +} + +size_t Exchange::getTargetConsumer(const Document& input) { + // Build the key. + BSONObjBuilder kb; + for (auto elem : _keyPattern) { + auto value = input[elem.fieldName()]; + kb << "" << value; + } + + // TODO implement hash keys for the hash policy. + KeyString key{KeyString::Version::V1, kb.obj(), Ordering::make(BSONObj())}; + std::string keyStr{key.getBuffer(), key.getSize()}; + + // Binary search for the consumer id. + auto it = std::upper_bound(_boundaries.begin(), _boundaries.end(), keyStr); + invariant(it != _boundaries.end()); + + size_t distance = std::distance(_boundaries.begin(), it) - 1; + invariant(distance < _consumers.size()); + + return distance; +} + +DocumentSource::GetNextResult Exchange::ExchangeBuffer::getNext() { + invariant(!_buffer.empty()); + + auto result = std::move(_buffer.front()); + _buffer.pop_front(); + + if (result.isAdvanced()) { + _bytesInBuffer -= result.getDocument().getApproximateSize(); + } + + return result; +} + +bool Exchange::ExchangeBuffer::appendDocument(DocumentSource::GetNextResult input, size_t limit) { + if (input.isAdvanced()) { + _bytesInBuffer += input.getDocument().getApproximateSize(); + } + _buffer.push_back(std::move(input)); + + // The buffer is full. + return _bytesInBuffer >= limit; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h new file mode 100644 index 00000000000..3457cf6e4fa --- /dev/null +++ b/src/mongo/db/pipeline/document_source_exchange.h @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2018 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <deque> +#include <vector> + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_exchange_gen.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { + +class Exchange : public RefCountable { + static constexpr size_t kInvalidThreadId{std::numeric_limits<size_t>::max()}; + + static std::vector<std::string> extractBoundaries( + const boost::optional<std::vector<BSONObj>>& obj); + +public: + explicit Exchange(const ExchangeSpec& spec); + DocumentSource::GetNextResult getNext(size_t consumerId); + + size_t getConsumers() const { + return _consumers.size(); + } + + void setSource(DocumentSource* source) { + pSource = source; + } + + const auto& getSpec() const { + return _spec; + } + +private: + size_t loadNextBatch(); + + size_t getTargetConsumer(const Document& input); + + class ExchangeBuffer { + public: + bool appendDocument(DocumentSource::GetNextResult input, size_t limit); + DocumentSource::GetNextResult getNext(); + bool isEmpty() const { + return _buffer.empty(); + } + + private: + size_t _bytesInBuffer{0}; + std::deque<DocumentSource::GetNextResult> _buffer; + }; + + // Keep a copy of the spec for serialization purposes. + const ExchangeSpec _spec; + + // A pattern for extracting a key from a document used by range and hash policies. + const BSONObj _keyPattern; + + // Range boundaries. + const std::vector<std::string> _boundaries; + + // A policy that tells how to distribute input documents to consumers. + const ExchangePolicyEnum _policy; + + // If set to true then a producer sends special 'high watermark' documents to consumers in order + // to prevent deadlocks. + const bool _orderPreserving; + + // A maximum size of buffer per consumer. + const size_t _maxBufferSize; + + // An input to the exchange operator + DocumentSource* pSource; + + // Synchronization. + stdx::mutex _mutex; + stdx::condition_variable _haveBufferSpace; + + // A thread that is currently loading the exchange buffers. + size_t _loadingThreadId{kInvalidThreadId}; + + size_t _roundRobinCounter{0}; + + std::vector<std::unique_ptr<ExchangeBuffer>> _consumers; +}; + +class DocumentSourceExchange final : public DocumentSource, public NeedsMergerDocumentSource { +public: + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + DocumentSourceExchange(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::intrusive_ptr<Exchange> exchange, + size_t consumerId); + + GetNextResult getNext() final; + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; + } + + const char* getSourceName() const final; + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return this; + } + std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { + // TODO SERVER-35974 we have to revisit this when we implement consumers. + return {this}; + } + + /** + * Set the underlying source this source should use to get Documents from. Must not throw + * exceptions. + */ + void setSource(DocumentSource* source) final { + DocumentSource::setSource(source); + _exchange->setSource(source); + } + + GetNextResult getNext(size_t consumerId); + + size_t getConsumers() const { + return _exchange->getConsumers(); + } + + auto getExchange() const { + return _exchange; + } + +private: + boost::intrusive_ptr<Exchange> _exchange; + + const size_t _consumerId; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_exchange.idl b/src/mongo/db/pipeline/document_source_exchange.idl new file mode 100644 index 00000000000..b2ffd23d41f --- /dev/null +++ b/src/mongo/db/pipeline/document_source_exchange.idl @@ -0,0 +1,71 @@ +# Copyright (C) 2018 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the GNU Affero General Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +# Document source exchange stage IDL file + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +enums: + ExchangePolicy: + description: "The type of an exchange distribution policy" + type: string + values: + kBroadcast: "broadcast" + kRoundRobin: "roundrobin" + kRange: "range" + kHash: "hash" + +structs: + ExchangeSpec: + description: "$exchange operator spec" + fields: + policy: + type: ExchangePolicy + description: A string indicating a policy of how documents are distributed to consumers. + consumers: + type: int + description: Number of consumers. + orderPreserving: + type: bool + default: false + description: A flag indicating documents are merged while preserving the order. + bufferSize: + type: int + default: 16777216 + description: The size of exchange buffers. + key: + type: object + default: "BSONObj()" + description: A key used for document distribution to consumers. The same description as sorting/sharding. + boundaries: + type: array<object> + optional: true + description: Range/hash split points. + diff --git a/src/mongo/db/pipeline/document_source_exchange_test.cpp b/src/mongo/db/pipeline/document_source_exchange_test.cpp new file mode 100644 index 00000000000..81d25527fa4 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_exchange_test.cpp @@ -0,0 +1,329 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_exchange.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/storage/key_string.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/platform/random.h" +#include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/time_support.h" + + +namespace mongo { + +class DocumentSourceExchangeTest : public AggregationContextFixture { +protected: + std::unique_ptr<executor::TaskExecutor> _executor; + virtual void setUp() override { + auto net = executor::makeNetworkInterface("ExchangeTest"); + + ThreadPool::Options options; + auto pool = std::make_unique<ThreadPool>(options); + + _executor = + std::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); + _executor->startup(); + } + + virtual void tearDown() override { + _executor->shutdown(); + _executor.reset(); + } + + auto getMockSource(int cnt) { + auto source = DocumentSourceMock::create(); + for (int i = 0; i < cnt; ++i) + source->queue.emplace_back(Document{{"a", i}, {"b", "aaaaaaaaaaaaaaaaaaaaaaaaaaa"_sd}}); + + return source; + } + + static auto getNewSeed() { + auto seed = Date_t::now().asInt64(); + unittest::log() << "Generated new seed is " << seed; + + return seed; + } + + auto getRandomMockSource(size_t cnt, int64_t seed) { + PseudoRandom prng(seed); + + auto source = DocumentSourceMock::create(); + for (size_t i = 0; i < cnt; ++i) + source->queue.emplace_back(Document{{"a", static_cast<int>(prng.nextInt32() % cnt)}, + {"b", "aaaaaaaaaaaaaaaaaaaaaaaaaaa"_sd}}); + + return source; + } +}; + +TEST_F(DocumentSourceExchangeTest, SimpleExchange1Consumer) { + const size_t nDocs = 500; + + auto source = getMockSource(nDocs); + + ExchangeSpec spec; + spec.setPolicy(ExchangePolicyEnum::kRoundRobin); + spec.setConsumers(1); + spec.setBufferSize(1024); + + boost::intrusive_ptr<Exchange> ex = new Exchange(spec); + + ex->setSource(source.get()); + + auto input = ex->getNext(0); + + size_t docs = 0; + for (; input.isAdvanced(); input = ex->getNext(0)) { + ++docs; + } + + ASSERT_EQ(docs, nDocs); +} + +TEST_F(DocumentSourceExchangeTest, SimpleExchangeNConsumer) { + const size_t nDocs = 500; + auto source = getMockSource(500); + + const size_t nConsumers = 5; + + ASSERT_EQ(nDocs % nConsumers, 0u); + + ExchangeSpec spec; + spec.setPolicy(ExchangePolicyEnum::kRoundRobin); + spec.setConsumers(nConsumers); + spec.setBufferSize(1024); + + boost::intrusive_ptr<Exchange> ex = new Exchange(spec); + + std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods; + + for (size_t idx = 0; idx < nConsumers; ++idx) { + prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx)); + prods.back()->setSource(source.get()); + } + + std::vector<executor::TaskExecutor::CallbackHandle> handles; + + for (size_t id = 0; id < nConsumers; ++id) { + auto handle = _executor->scheduleWork( + [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) { + PseudoRandom prng(getNewSeed()); + + auto input = prods[id]->getNext(); + + size_t docs = 0; + + for (; input.isAdvanced(); input = prods[id]->getNext()) { + sleepmillis(prng.nextInt32() % 20 + 1); + ++docs; + } + ASSERT_EQ(docs, nDocs / nConsumers); + }); + + handles.emplace_back(std::move(handle.getValue())); + } + + for (auto& h : handles) + _executor->wait(h); +} + +TEST_F(DocumentSourceExchangeTest, BroadcastExchangeNConsumer) { + const size_t nDocs = 500; + auto source = getMockSource(nDocs); + + const size_t nConsumers = 5; + + ExchangeSpec spec; + spec.setPolicy(ExchangePolicyEnum::kBroadcast); + spec.setConsumers(nConsumers); + spec.setBufferSize(1024); + + boost::intrusive_ptr<Exchange> ex = new Exchange(spec); + + std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods; + + for (size_t idx = 0; idx < nConsumers; ++idx) { + prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx)); + prods.back()->setSource(source.get()); + } + + std::vector<executor::TaskExecutor::CallbackHandle> handles; + + for (size_t id = 0; id < nConsumers; ++id) { + auto handle = _executor->scheduleWork( + [prods, id, nDocs](const executor::TaskExecutor::CallbackArgs& cb) { + size_t docs = 0; + for (auto input = prods[id]->getNext(); input.isAdvanced(); + input = prods[id]->getNext()) { + ++docs; + } + ASSERT_EQ(docs, nDocs); + }); + + handles.emplace_back(std::move(handle.getValue())); + } + + for (auto& h : handles) + _executor->wait(h); +} + +TEST_F(DocumentSourceExchangeTest, RangeExchangeNConsumer) { + const size_t nDocs = 500; + auto source = getMockSource(nDocs); + + std::vector<BSONObj> boundaries; + boundaries.push_back(BSON("a" << MINKEY)); + boundaries.push_back(BSON("a" << 100)); + boundaries.push_back(BSON("a" << 200)); + boundaries.push_back(BSON("a" << 300)); + boundaries.push_back(BSON("a" << 400)); + boundaries.push_back(BSON("a" << MAXKEY)); + + const size_t nConsumers = boundaries.size() - 1; + + ASSERT(nDocs % nConsumers == 0); + + ExchangeSpec spec; + spec.setPolicy(ExchangePolicyEnum::kRange); + spec.setKey(BSON("a" << 1)); + spec.setBoundaries(boundaries); + spec.setConsumers(nConsumers); + spec.setBufferSize(1024); + + boost::intrusive_ptr<Exchange> ex = new Exchange(spec); + + std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods; + + for (size_t idx = 0; idx < nConsumers; ++idx) { + prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx)); + prods.back()->setSource(source.get()); + } + + std::vector<executor::TaskExecutor::CallbackHandle> handles; + + for (size_t id = 0; id < nConsumers; ++id) { + auto handle = _executor->scheduleWork( + [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) { + size_t docs = 0; + for (auto input = prods[id]->getNext(); input.isAdvanced(); + input = prods[id]->getNext()) { + size_t value = input.getDocument()["a"].getInt(); + + ASSERT(value >= id * 100); + ASSERT(value < (id + 1) * 100); + + ++docs; + } + + ASSERT_EQ(docs, nDocs / nConsumers); + }); + + handles.emplace_back(std::move(handle.getValue())); + } + + for (auto& h : handles) + _executor->wait(h); +} + +TEST_F(DocumentSourceExchangeTest, RangeRandomExchangeNConsumer) { + const size_t nDocs = 500; + auto source = getRandomMockSource(nDocs, getNewSeed()); + + std::vector<BSONObj> boundaries; + boundaries.push_back(BSON("a" << MINKEY)); + boundaries.push_back(BSON("a" << 100)); + boundaries.push_back(BSON("a" << 200)); + boundaries.push_back(BSON("a" << 300)); + boundaries.push_back(BSON("a" << 400)); + boundaries.push_back(BSON("a" << MAXKEY)); + + const size_t nConsumers = boundaries.size() - 1; + + ASSERT(nDocs % nConsumers == 0); + + ExchangeSpec spec; + spec.setPolicy(ExchangePolicyEnum::kRange); + spec.setKey(BSON("a" << 1)); + spec.setBoundaries(boundaries); + spec.setConsumers(nConsumers); + spec.setBufferSize(1024); + + boost::intrusive_ptr<Exchange> ex = new Exchange(spec); + + std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods; + + for (size_t idx = 0; idx < nConsumers; ++idx) { + prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx)); + prods.back()->setSource(source.get()); + } + + std::vector<executor::TaskExecutor::CallbackHandle> handles; + + AtomicWord<size_t> processedDocs{0}; + + for (size_t id = 0; id < nConsumers; ++id) { + auto handle = _executor->scheduleWork( + [prods, id, &processedDocs](const executor::TaskExecutor::CallbackArgs& cb) { + PseudoRandom prng(getNewSeed()); + + auto input = prods[id]->getNext(); + + size_t docs = 0; + for (; input.isAdvanced(); input = prods[id]->getNext()) { + size_t value = input.getDocument()["a"].getInt(); + + ASSERT(value >= id * 100); + ASSERT(value < (id + 1) * 100); + + ++docs; + + // This helps randomizing thread scheduling forcing different threads to load + // buffers. The sleep API is inherently imprecise so we cannot guarantee 100% + // reproducibility. + sleepmillis(prng.nextInt32() % 50 + 1); + } + processedDocs.fetchAndAdd(docs); + }); + + handles.emplace_back(std::move(handle.getValue())); + } + + for (auto& h : handles) + _executor->wait(h); + + ASSERT_EQ(nDocs, processedDocs.load()); +} +} // namespace mongo diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index e94bf14e61e..43510d651f5 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -461,6 +461,10 @@ public: return _currentState == kDisposed; } + bool isDetached() const { + return _currentState == kDetached; + } + /** * If the last oplog timestamp is being tracked for this PlanExecutor, return it. * Otherwise return a null timestamp. |