diff options
-rw-r--r-- | jstests/noPassthroughWithMongod/exchangeProducer.js | 25 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_exchange.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_exchange.h | 2 |
3 files changed, 44 insertions, 0 deletions
diff --git a/jstests/noPassthroughWithMongod/exchangeProducer.js b/jstests/noPassthroughWithMongod/exchangeProducer.js index 024e8d67a2e..f05f9db7e29 100644 --- a/jstests/noPassthroughWithMongod/exchangeProducer.js +++ b/jstests/noPassthroughWithMongod/exchangeProducer.js @@ -9,6 +9,8 @@ TestData.disableImplicitSessions = true; (function() { "use strict"; + load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + const coll = db.testCollection; coll.drop(); @@ -42,6 +44,29 @@ TestData.disableImplicitSessions = true; // For simplicity we assume that we can evenly distribute documents among consumers. assert.eq(0, numDocs % numConsumers); + (function testParameterValidation() { + const tooManyConsumers = 101; + assertErrorCode(coll, [], 50950, "Expected too many consumers", { + exchange: { + policy: "roundrobin", + consumers: NumberInt(tooManyConsumers), + bufferSize: NumberInt(1024) + }, + cursor: {batchSize: 0} + }); + + const bufferTooLarge = 200 * 1024 * 1024; // 200 MB + assertErrorCode(coll, [], 50951, "Expected buffer too large", { + exchange: { + policy: "roundrobin", + consumers: NumberInt(numConsumers), + bufferSize: NumberInt(bufferTooLarge) + }, + cursor: {batchSize: 0} + }); + + })(); + /** * RoundRobin - evenly distribute documents to consumers. */ diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp index 49de073af73..3e990f65e43 100644 --- a/src/mongo/db/pipeline/document_source_exchange.cpp +++ b/src/mongo/db/pipeline/document_source_exchange.cpp @@ -41,6 +41,9 @@ namespace mongo { +constexpr size_t Exchange::kMaxBufferSize; +constexpr size_t Exchange::kMaxNumberConsumers; + const char* DocumentSourceExchange::getSourceName() const { return "$_internalExchange"; } @@ -71,6 +74,13 @@ Exchange::Exchange(ExchangeSpec spec, std::unique_ptr<Pipeline, PipelineDeleter> _pipeline(std::move(pipeline)) { uassert(50901, "Exchange must have at least one consumer", _spec.getConsumers() > 0); + uassert(50951, + str::stream() << "Specified exchange buffer size (" << _maxBufferSize + << ") exceeds the maximum allowable amount (" + << kMaxBufferSize + << ").", + _maxBufferSize <= kMaxBufferSize); + for (int idx = 0; idx < _spec.getConsumers(); ++idx) { _consumers.emplace_back(std::make_unique<ExchangeBuffer>()); } @@ -120,6 +130,13 @@ std::vector<std::string> Exchange::extractBoundaries( std::vector<size_t> Exchange::extractConsumerIds( const boost::optional<std::vector<std::int32_t>>& consumerIds, size_t nConsumers) { + uassert(50950, + str::stream() << "Specified number of exchange consumers (" << nConsumers + << ") exceeds the maximum allowable amount (" + << kMaxNumberConsumers + << ").", + nConsumers <= kMaxNumberConsumers); + std::vector<size_t> ret; if (!consumerIds) { diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h index 46277075bec..fbb982dd2cc 100644 --- a/src/mongo/db/pipeline/document_source_exchange.h +++ b/src/mongo/db/pipeline/document_source_exchange.h @@ -41,6 +41,8 @@ namespace mongo { class Exchange : public RefCountable { static constexpr size_t kInvalidThreadId{std::numeric_limits<size_t>::max()}; + static constexpr size_t kMaxBufferSize = 100 * 1024 * 1024; // 100 MB + static constexpr size_t kMaxNumberConsumers = 100; /** * Convert the BSON representation of boundaries (as deserialized off the wire) to the internal |