summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_exchange.cpp
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@mongodb.com>2018-08-09 17:09:08 -0400
committerMartin Neupauer <martin.neupauer@mongodb.com>2018-08-30 14:17:06 -0400
commit47306b9f203abee01f6fc54aa8d7ab8f8e25c8c9 (patch)
tree9d1734d0958b5f07afd6dad4adede420696fba3a /src/mongo/db/pipeline/document_source_exchange.cpp
parentb46de3f6c06fab5cf9b7ea0f4176b32ff544a4bf (diff)
downloadmongo-47306b9f203abee01f6fc54aa8d7ab8f8e25c8c9.tar.gz
SERVER-35905 Plug pieces together to perform a distributed when applicable
Diffstat (limited to 'src/mongo/db/pipeline/document_source_exchange.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.cpp88
1 files changed, 45 insertions, 43 deletions
diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp
index dddc3d74f0c..49de073af73 100644
--- a/src/mongo/db/pipeline/document_source_exchange.cpp
+++ b/src/mongo/db/pipeline/document_source_exchange.cpp
@@ -41,27 +41,8 @@
namespace mongo {
-REGISTER_DOCUMENT_SOURCE(exchange,
- LiteParsedDocumentSourceDefault::parse,
- DocumentSourceExchange::createFromBson);
-
const char* DocumentSourceExchange::getSourceName() const {
- return "$exchange";
-}
-
-boost::intrusive_ptr<DocumentSource> DocumentSourceExchange::createFromBson(
- BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "$exchange options must be specified in an object, but found: "
- << typeName(spec.type()),
- spec.type() == BSONType::Object);
-
- IDLParserErrorContext ctx("$exchange");
- auto parsed = ExchangeSpec::parse(ctx, spec.embeddedObject());
-
- boost::intrusive_ptr<Exchange> exchange = new Exchange(parsed);
-
- return new DocumentSourceExchange(pExpCtx, exchange, 0);
+ return "$_internalExchange";
}
Value DocumentSourceExchange::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
@@ -75,31 +56,36 @@ DocumentSourceExchange::DocumentSourceExchange(
: DocumentSource(expCtx), _exchange(exchange), _consumerId(consumerId) {}
DocumentSource::GetNextResult DocumentSourceExchange::getNext() {
- return _exchange->getNext(_consumerId);
+ return _exchange->getNext(pExpCtx->opCtx, _consumerId);
}
-Exchange::Exchange(const ExchangeSpec& spec)
- : _spec(spec),
- _keyPattern(spec.getKey().getOwned()),
+Exchange::Exchange(ExchangeSpec spec, std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
+ : _spec(std::move(spec)),
+ _keyPattern(_spec.getKey().getOwned()),
_ordering(extractOrdering(_keyPattern)),
- _boundaries(extractBoundaries(spec.getBoundaries())),
- _consumerIds(extractConsumerIds(spec.getConsumerids(), spec.getConsumers())),
- _policy(spec.getPolicy()),
- _orderPreserving(spec.getOrderPreserving()),
- _maxBufferSize(spec.getBufferSize()) {
- uassert(50901, "$exchange must have at least one consumer", spec.getConsumers() > 0);
-
- for (int idx = 0; idx < spec.getConsumers(); ++idx) {
+ _boundaries(extractBoundaries(_spec.getBoundaries())),
+ _consumerIds(extractConsumerIds(_spec.getConsumerids(), _spec.getConsumers())),
+ _policy(_spec.getPolicy()),
+ _orderPreserving(_spec.getOrderPreserving()),
+ _maxBufferSize(_spec.getBufferSize()),
+ _pipeline(std::move(pipeline)) {
+ uassert(50901, "Exchange must have at least one consumer", _spec.getConsumers() > 0);
+
+ for (int idx = 0; idx < _spec.getConsumers(); ++idx) {
_consumers.emplace_back(std::make_unique<ExchangeBuffer>());
}
if (_policy == ExchangePolicyEnum::kRange || _policy == ExchangePolicyEnum::kHash) {
uassert(50900,
- "$exchange boundaries do not match number of consumers.",
+ "Exchange boundaries do not match number of consumers.",
_boundaries.size() == _consumerIds.size() + 1);
} else {
- uassert(50899, "$exchange boundaries must not be specified.", _boundaries.empty());
+ uassert(50899, "Exchange boundaries must not be specified.", _boundaries.empty());
}
+
+ // We will manually detach and reattach when iterating '_pipeline', we expect it to start in the
+ // detached state.
+ _pipeline->detachFromOperationContext();
}
std::vector<std::string> Exchange::extractBoundaries(
@@ -125,7 +111,7 @@ std::vector<std::string> Exchange::extractBoundaries(
for (size_t idx = 1; idx < ret.size(); ++idx) {
uassert(50893,
- str::stream() << "$exchange range boundaries are not in ascending order.",
+ str::stream() << "Exchange range boundaries are not in ascending order.",
ret[idx - 1] < ret[idx]);
}
return ret;
@@ -151,7 +137,7 @@ std::vector<size_t> Exchange::extractConsumerIds(
}
uassert(50894,
- str::stream() << "$exchange consumers ids are invalid.",
+ str::stream() << "Exchange consumers ids are invalid.",
nConsumers > 0 && validation.size() == nConsumers && *validation.begin() == 0 &&
*validation.rbegin() == nConsumers - 1);
}
@@ -165,29 +151,29 @@ Ordering Exchange::extractOrdering(const BSONObj& obj) {
for (const auto& element : obj) {
if (element.type() == BSONType::String) {
uassert(50895,
- str::stream() << "$exchange key description is invalid: " << element,
+ str::stream() << "Exchange key description is invalid: " << element,
element.valueStringData() == "hashed"_sd);
hasHashKey = true;
} else if (element.isNumber()) {
auto num = element.number();
if (!(num == 1 || num == -1)) {
uasserted(50896,
- str::stream() << "$exchange key description is invalid: " << element);
+ str::stream() << "Exchange key description is invalid: " << element);
}
hasOrderKey = true;
} else {
- uasserted(50897, str::stream() << "$exchange key description is invalid: " << element);
+ uasserted(50897, str::stream() << "Exchange key description is invalid: " << element);
}
}
uassert(50898,
- str::stream() << "$exchange hash and order keys cannot be mixed together: " << obj,
+ str::stream() << "Exchange hash and order keys cannot be mixed together: " << obj,
!(hasHashKey && hasOrderKey));
return hasHashKey ? Ordering::make(BSONObj()) : Ordering::make(obj);
}
-DocumentSource::GetNextResult Exchange::getNext(size_t consumerId) {
+DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t consumerId) {
// Grab a lock.
stdx::unique_lock<stdx::mutex> lk(_mutex);
@@ -212,11 +198,15 @@ DocumentSource::GetNextResult Exchange::getNext(size_t consumerId) {
// This consumer won the race and will fill the buffers.
_loadingThreadId = consumerId;
+ _pipeline->reattachToOperationContext(opCtx);
+
// This will return when some exchange buffer is full and we cannot make any forward
// progress anymore.
// The return value is an index of a full consumer buffer.
size_t fullConsumerId = loadNextBatch();
+ _pipeline->detachFromOperationContext();
+
// The loading cannot continue until the consumer with the full buffer consumes some
// documents.
_loadingThreadId = fullConsumerId;
@@ -232,9 +222,9 @@ DocumentSource::GetNextResult Exchange::getNext(size_t consumerId) {
}
size_t Exchange::loadNextBatch() {
- auto input = pSource->getNext();
+ auto input = _pipeline->getSources().back()->getNext();
- for (; input.isAdvanced(); input = pSource->getNext()) {
+ for (; input.isAdvanced(); input = _pipeline->getSources().back()->getNext()) {
// We have a document and we will deliver it to a consumer(s) based on the policy.
switch (_policy) {
case ExchangePolicyEnum::kBroadcast: {
@@ -317,6 +307,18 @@ size_t Exchange::getTargetConsumer(const Document& input) {
return cid;
}
+void Exchange::dispose(OperationContext* opCtx) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ invariant(_disposeRunDown < getConsumers());
+
+ ++_disposeRunDown;
+
+ if (_disposeRunDown == getConsumers()) {
+ _pipeline->dispose(opCtx);
+ }
+}
+
DocumentSource::GetNextResult Exchange::ExchangeBuffer::getNext() {
invariant(!_buffer.empty());