diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_exchange.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_exchange.cpp | 88 |
1 files changed, 45 insertions, 43 deletions
diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp index dddc3d74f0c..49de073af73 100644 --- a/src/mongo/db/pipeline/document_source_exchange.cpp +++ b/src/mongo/db/pipeline/document_source_exchange.cpp @@ -41,27 +41,8 @@ namespace mongo { -REGISTER_DOCUMENT_SOURCE(exchange, - LiteParsedDocumentSourceDefault::parse, - DocumentSourceExchange::createFromBson); - const char* DocumentSourceExchange::getSourceName() const { - return "$exchange"; -} - -boost::intrusive_ptr<DocumentSource> DocumentSourceExchange::createFromBson( - BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) { - uassert(ErrorCodes::FailedToParse, - str::stream() << "$exchange options must be specified in an object, but found: " - << typeName(spec.type()), - spec.type() == BSONType::Object); - - IDLParserErrorContext ctx("$exchange"); - auto parsed = ExchangeSpec::parse(ctx, spec.embeddedObject()); - - boost::intrusive_ptr<Exchange> exchange = new Exchange(parsed); - - return new DocumentSourceExchange(pExpCtx, exchange, 0); + return "$_internalExchange"; } Value DocumentSourceExchange::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { @@ -75,31 +56,36 @@ DocumentSourceExchange::DocumentSourceExchange( : DocumentSource(expCtx), _exchange(exchange), _consumerId(consumerId) {} DocumentSource::GetNextResult DocumentSourceExchange::getNext() { - return _exchange->getNext(_consumerId); + return _exchange->getNext(pExpCtx->opCtx, _consumerId); } -Exchange::Exchange(const ExchangeSpec& spec) - : _spec(spec), - _keyPattern(spec.getKey().getOwned()), +Exchange::Exchange(ExchangeSpec spec, std::unique_ptr<Pipeline, PipelineDeleter> pipeline) + : _spec(std::move(spec)), + _keyPattern(_spec.getKey().getOwned()), _ordering(extractOrdering(_keyPattern)), - _boundaries(extractBoundaries(spec.getBoundaries())), - _consumerIds(extractConsumerIds(spec.getConsumerids(), spec.getConsumers())), - _policy(spec.getPolicy()), - _orderPreserving(spec.getOrderPreserving()), - _maxBufferSize(spec.getBufferSize()) { - uassert(50901, "$exchange must have at least one consumer", spec.getConsumers() > 0); - - for (int idx = 0; idx < spec.getConsumers(); ++idx) { + _boundaries(extractBoundaries(_spec.getBoundaries())), + _consumerIds(extractConsumerIds(_spec.getConsumerids(), _spec.getConsumers())), + _policy(_spec.getPolicy()), + _orderPreserving(_spec.getOrderPreserving()), + _maxBufferSize(_spec.getBufferSize()), + _pipeline(std::move(pipeline)) { + uassert(50901, "Exchange must have at least one consumer", _spec.getConsumers() > 0); + + for (int idx = 0; idx < _spec.getConsumers(); ++idx) { _consumers.emplace_back(std::make_unique<ExchangeBuffer>()); } if (_policy == ExchangePolicyEnum::kRange || _policy == ExchangePolicyEnum::kHash) { uassert(50900, - "$exchange boundaries do not match number of consumers.", + "Exchange boundaries do not match number of consumers.", _boundaries.size() == _consumerIds.size() + 1); } else { - uassert(50899, "$exchange boundaries must not be specified.", _boundaries.empty()); + uassert(50899, "Exchange boundaries must not be specified.", _boundaries.empty()); } + + // We will manually detach and reattach when iterating '_pipeline', we expect it to start in the + // detached state. + _pipeline->detachFromOperationContext(); } std::vector<std::string> Exchange::extractBoundaries( @@ -125,7 +111,7 @@ std::vector<std::string> Exchange::extractBoundaries( for (size_t idx = 1; idx < ret.size(); ++idx) { uassert(50893, - str::stream() << "$exchange range boundaries are not in ascending order.", + str::stream() << "Exchange range boundaries are not in ascending order.", ret[idx - 1] < ret[idx]); } return ret; @@ -151,7 +137,7 @@ std::vector<size_t> Exchange::extractConsumerIds( } uassert(50894, - str::stream() << "$exchange consumers ids are invalid.", + str::stream() << "Exchange consumers ids are invalid.", nConsumers > 0 && validation.size() == nConsumers && *validation.begin() == 0 && *validation.rbegin() == nConsumers - 1); } @@ -165,29 +151,29 @@ Ordering Exchange::extractOrdering(const BSONObj& obj) { for (const auto& element : obj) { if (element.type() == BSONType::String) { uassert(50895, - str::stream() << "$exchange key description is invalid: " << element, + str::stream() << "Exchange key description is invalid: " << element, element.valueStringData() == "hashed"_sd); hasHashKey = true; } else if (element.isNumber()) { auto num = element.number(); if (!(num == 1 || num == -1)) { uasserted(50896, - str::stream() << "$exchange key description is invalid: " << element); + str::stream() << "Exchange key description is invalid: " << element); } hasOrderKey = true; } else { - uasserted(50897, str::stream() << "$exchange key description is invalid: " << element); + uasserted(50897, str::stream() << "Exchange key description is invalid: " << element); } } uassert(50898, - str::stream() << "$exchange hash and order keys cannot be mixed together: " << obj, + str::stream() << "Exchange hash and order keys cannot be mixed together: " << obj, !(hasHashKey && hasOrderKey)); return hasHashKey ? Ordering::make(BSONObj()) : Ordering::make(obj); } -DocumentSource::GetNextResult Exchange::getNext(size_t consumerId) { +DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t consumerId) { // Grab a lock. stdx::unique_lock<stdx::mutex> lk(_mutex); @@ -212,11 +198,15 @@ DocumentSource::GetNextResult Exchange::getNext(size_t consumerId) { // This consumer won the race and will fill the buffers. _loadingThreadId = consumerId; + _pipeline->reattachToOperationContext(opCtx); + // This will return when some exchange buffer is full and we cannot make any forward // progress anymore. // The return value is an index of a full consumer buffer. size_t fullConsumerId = loadNextBatch(); + _pipeline->detachFromOperationContext(); + // The loading cannot continue until the consumer with the full buffer consumes some // documents. _loadingThreadId = fullConsumerId; @@ -232,9 +222,9 @@ DocumentSource::GetNextResult Exchange::getNext(size_t consumerId) { } size_t Exchange::loadNextBatch() { - auto input = pSource->getNext(); + auto input = _pipeline->getSources().back()->getNext(); - for (; input.isAdvanced(); input = pSource->getNext()) { + for (; input.isAdvanced(); input = _pipeline->getSources().back()->getNext()) { // We have a document and we will deliver it to a consumer(s) based on the policy. switch (_policy) { case ExchangePolicyEnum::kBroadcast: { @@ -317,6 +307,18 @@ size_t Exchange::getTargetConsumer(const Document& input) { return cid; } +void Exchange::dispose(OperationContext* opCtx) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + invariant(_disposeRunDown < getConsumers()); + + ++_disposeRunDown; + + if (_disposeRunDown == getConsumers()) { + _pipeline->dispose(opCtx); + } +} + DocumentSource::GetNextResult Exchange::ExchangeBuffer::getNext() { invariant(!_buffer.empty()); |