From 4eee17a5fdc14af2c3770b01cc4f906fa3620fe5 Mon Sep 17 00:00:00 2001 From: Ian Boros Date: Tue, 4 Dec 2018 14:46:24 -0500 Subject: SERVER-37499 prevent deadlock within Exchange during transaction --- src/mongo/db/commands/run_aggregate.cpp | 4 +- src/mongo/db/pipeline/document_source_exchange.cpp | 61 +++++++- src/mongo/db/pipeline/document_source_exchange.h | 28 +++- .../db/pipeline/document_source_exchange_test.cpp | 153 ++++++++++++++++++--- 4 files changed, 219 insertions(+), 27 deletions(-) (limited to 'src') diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index affce5b9600..e62c2bfd10a 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -544,8 +544,8 @@ Status runAggregate(OperationContext* opCtx, // Create a new pipeline for the consumer consisting of a single // DocumentSourceExchange. - boost::intrusive_ptr consumer = - new DocumentSourceExchange(expCtx, exchange, idx); + boost::intrusive_ptr consumer = new DocumentSourceExchange( + expCtx, exchange, idx, expCtx->mongoProcessInterface->getResourceYielder()); pipelines.emplace_back(uassertStatusOK(Pipeline::create({consumer}, expCtx))); } } else { diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp index c2de6307031..0256a306ae6 100644 --- a/src/mongo/db/pipeline/document_source_exchange.cpp +++ b/src/mongo/db/pipeline/document_source_exchange.cpp @@ -36,6 +36,7 @@ #include #include +#include "mongo/db/curop.h" #include "mongo/db/hasher.h" #include "mongo/db/pipeline/document_source_exchange.h" #include "mongo/db/storage/key_string.h" @@ -45,6 +46,45 @@ namespace mongo { MONGO_FAIL_POINT_DEFINE(exchangeFailLoadNextBatch); +class MutexAndResourceLock { + OperationContext* _opCtx; + ResourceYielder* _resourceYielder; + stdx::unique_lock _lock; + +public: + // Must be constructed with the mutex held. 'yielder' may be null if there are no resources + // which need to be yielded while waiting. + MutexAndResourceLock(OperationContext* opCtx, + stdx::unique_lock m, + ResourceYielder* yielder) + : _opCtx(opCtx), _resourceYielder(yielder), _lock(std::move(m)) { + invariant(_lock.owns_lock()); + } + + void lock() { + // Acquire the operation-wide resources, then the mutex. + if (_resourceYielder) { + _resourceYielder->unyield(_opCtx); + } + _lock.lock(); + } + void unlock() { + _lock.unlock(); + if (_resourceYielder) { + _resourceYielder->yield(_opCtx); + } + } + + /** + * Releases ownership of the lock to the caller. May only be called when the mutex is held + * (after a call to unlock(), for example). + */ + stdx::unique_lock releaseLockOwnership() { + invariant(_lock.owns_lock()); + return std::move(_lock); + } +}; + constexpr size_t Exchange::kMaxBufferSize; constexpr size_t Exchange::kMaxNumberConsumers; @@ -59,11 +99,15 @@ Value DocumentSourceExchange::serialize(boost::optional& expCtx, const boost::intrusive_ptr exchange, - size_t consumerId) - : DocumentSource(expCtx), _exchange(exchange), _consumerId(consumerId) {} + size_t consumerId, + std::unique_ptr yielder) + : DocumentSource(expCtx), + _exchange(exchange), + _consumerId(consumerId), + _resourceYielder(std::move(yielder)) {} DocumentSource::GetNextResult DocumentSourceExchange::getNext() { - return _exchange->getNext(pExpCtx->opCtx, _consumerId); + return _exchange->getNext(pExpCtx->opCtx, _consumerId, _resourceYielder.get()); } Exchange::Exchange(ExchangeSpec spec, std::unique_ptr pipeline) @@ -236,11 +280,16 @@ void Exchange::unblockLoading(size_t consumerId) { _haveBufferSpace.notify_all(); } } -DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t consumerId) { +DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, + size_t consumerId, + ResourceYielder* resourceYielder) { // Grab a lock. stdx::unique_lock lk(_mutex); for (;;) { + // Guard against some of the trickiness we do with moving the lock to/from the + // MutexAndResourceLock. + invariant(lk.owns_lock()); // Execute only in case we have not encountered an error. if (!_errorInLoadNextBatch.isOK()) { uasserted(ErrorCodes::ExchangePassthrough, @@ -296,7 +345,9 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t } else { // Some other consumer is already loading the buffers. There is nothing else we can do // but wait. - _haveBufferSpace.wait(lk); + MutexAndResourceLock mutexAndResourceLock(opCtx, std::move(lk), resourceYielder); + _haveBufferSpace.wait(mutexAndResourceLock); + lk = mutexAndResourceLock.releaseLockOwnership(); } } } diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h index 50d84344932..071c04ec32d 100644 --- a/src/mongo/db/pipeline/document_source_exchange.h +++ b/src/mongo/db/pipeline/document_source_exchange.h @@ -72,8 +72,19 @@ class Exchange : public RefCountable { static std::vector extractKeyPaths(const BSONObj& keyPattern); public: + /** + * Create an exchange. 'pipeline' represents the input to the exchange operator and must not be + * nullptr. + **/ Exchange(ExchangeSpec spec, std::unique_ptr pipeline); - DocumentSource::GetNextResult getNext(OperationContext* opCtx, size_t consumerId); + + /** + * Interface for retrieving the next document. 'resourceYielder' is optional, and if provided, + * will be used to give up resources while waiting for other threads to empty their buffers. + */ + DocumentSource::GetNextResult getNext(OperationContext* opCtx, + size_t consumerId, + ResourceYielder* resourceYielder); size_t getConsumers() const { return _consumers.size(); @@ -161,7 +172,7 @@ private: // Synchronization. stdx::mutex _mutex; - stdx::condition_variable _haveBufferSpace; + stdx::condition_variable_any _haveBufferSpace; // A thread that is currently loading the exchange buffers. size_t _loadingThreadId{kInvalidThreadId}; @@ -181,9 +192,16 @@ private: class DocumentSourceExchange final : public DocumentSource { public: + /** + * Create an Exchange consumer. 'resourceYielder' is so the exchange may temporarily yield + * resources (such as the Session) while waiting for other threads to do + * work. 'resourceYielder' may be nullptr if there are no resources which need to be given up + * while waiting. + */ DocumentSourceExchange(const boost::intrusive_ptr& expCtx, const boost::intrusive_ptr exchange, - size_t consumerId); + size_t consumerId, + std::unique_ptr yielder); GetNextResult getNext() final; @@ -230,6 +248,10 @@ private: boost::intrusive_ptr _exchange; const size_t _consumerId; + + // While waiting for another thread to make room in its buffer, we may want to yield certain + // resources (such as the Session). Through this interface we can do that. + std::unique_ptr _resourceYielder; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_exchange_test.cpp b/src/mongo/db/pipeline/document_source_exchange_test.cpp index 6ab1fbc918f..37ac0c65853 100644 --- a/src/mongo/db/pipeline/document_source_exchange_test.cpp +++ b/src/mongo/db/pipeline/document_source_exchange_test.cpp @@ -125,10 +125,10 @@ TEST_F(DocumentSourceExchangeTest, SimpleExchange1Consumer) { boost::intrusive_ptr ex = new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx()))); - auto input = ex->getNext(getExpCtx()->opCtx, 0); + auto input = ex->getNext(getExpCtx()->opCtx, 0, nullptr); size_t docs = 0; - for (; input.isAdvanced(); input = ex->getNext(getExpCtx()->opCtx, 0)) { + for (; input.isAdvanced(); input = ex->getNext(getExpCtx()->opCtx, 0, nullptr)) { ++docs; } @@ -154,7 +154,7 @@ TEST_F(DocumentSourceExchangeTest, SimpleExchangeNConsumer) { std::vector> prods; for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx)); + prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); } std::vector handles; @@ -201,7 +201,7 @@ TEST_F(DocumentSourceExchangeTest, ExchangeNConsumerEarlyout) { std::vector> prods; for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx)); + prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); } std::vector handles; @@ -255,7 +255,7 @@ TEST_F(DocumentSourceExchangeTest, BroadcastExchangeNConsumer) { std::vector> prods; for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx)); + prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); } std::vector handles; @@ -300,13 +300,13 @@ TEST_F(DocumentSourceExchangeTest, RangeExchangeNConsumer) { spec.setConsumers(nConsumers); spec.setBufferSize(1024); - boost::intrusive_ptr ex = - new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + boost::intrusive_ptr ex = new Exchange( + std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); std::vector> prods; for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx)); + prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); } std::vector handles; @@ -366,13 +366,13 @@ TEST_F(DocumentSourceExchangeTest, RangeShardingExchangeNConsumer) { spec.setConsumers(nConsumers); spec.setBufferSize(1024); - boost::intrusive_ptr ex = - new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + boost::intrusive_ptr ex = new Exchange( + std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); std::vector> prods; for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx)); + prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); } std::vector handles; @@ -423,13 +423,13 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomExchangeNConsumer) { spec.setConsumers(nConsumers); spec.setBufferSize(1024); - boost::intrusive_ptr ex = - new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + boost::intrusive_ptr ex = new Exchange( + std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); std::vector> prods; for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx)); + prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); } std::vector handles; @@ -469,6 +469,125 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomExchangeNConsumer) { ASSERT_EQ(nDocs, processedDocs.load()); } +TEST_F(DocumentSourceExchangeTest, RandomExchangeNConsumerResourceYielding) { + const size_t nDocs = 500; + auto source = getRandomMockSource(nDocs, getNewSeed()); + + const std::vector boundaries = { + BSON("a" << MINKEY), BSON("a" << 500), BSON("a" << MAXKEY)}; + + const size_t nConsumers = boundaries.size() - 1; + + ASSERT(nDocs % nConsumers == 0); + + ExchangeSpec spec; + spec.setPolicy(ExchangePolicyEnum::kKeyRange); + spec.setKey(BSON("a" << 1)); + spec.setBoundaries(boundaries); + spec.setConsumers(nConsumers); + + // Tiny buffer so if there are deadlocks possible they reproduce more often. + spec.setBufferSize(64); + + // An "artifical" mutex that's not actually necessary for thread safety. We enforce that each + // thread holds this while it calls getNext(). This is to simulate the case where a thread may + // hold some "real" resources which need to be yielded while waiting, such as the Session, or + // the locks held in a transaction. + stdx::mutex artificalGlobalMutex; + + boost::intrusive_ptr ex = + new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + + /** + * This class is used for an Exchange consumer to temporarily relinquish control of a mutex + * while it's blocked. + */ + class MutexYielder : public ResourceYielder { + public: + MutexYielder(stdx::mutex* mutex) : _lock(*mutex, std::defer_lock) {} + + void yield(OperationContext* opCtx) override { + _lock.unlock(); + } + + void unyield(OperationContext* opCtx) override { + _lock.lock(); + } + + stdx::unique_lock& getLock() { + return _lock; + } + + private: + stdx::unique_lock _lock; + }; + + /** + * Used to keep track of each client and operation context. + */ + struct ThreadInfo { + ServiceContext::UniqueClient client; + ServiceContext::UniqueOperationContext opCtx; + boost::intrusive_ptr documentSourceExchange; + MutexYielder* yielder; + }; + std::vector threads; + + for (size_t idx = 0; idx < nConsumers; ++idx) { + ServiceContext::UniqueClient client = getServiceContext()->makeClient("exchange client"); + ServiceContext::UniqueOperationContext opCtxOwned = + getServiceContext()->makeOperationContext(client.get()); + OperationContext* opCtx = opCtxOwned.get(); + auto yielder = std::make_unique(&artificalGlobalMutex); + auto yielderRaw = yielder.get(); + + threads.push_back( + ThreadInfo{std::move(client), + std::move(opCtxOwned), + new DocumentSourceExchange( + new ExpressionContext(opCtx, nullptr), ex, idx, std::move(yielder)), + yielderRaw + }); + } + + std::vector handles; + + AtomicWord processedDocs{0}; + + for (size_t id = 0; id < nConsumers; ++id) { + ThreadInfo* threadInfo = &threads[id]; + auto handle = _executor->scheduleWork( + [threadInfo, &processedDocs](const executor::TaskExecutor::CallbackArgs& cb) { + + DocumentSourceExchange* exchange = threadInfo->documentSourceExchange.get(); + const auto getNext = [exchange, threadInfo]() { + // Will acquire 'artificalGlobalMutex'. Within getNext() it will be released and + // reacquired by the MutexYielder if the Exchange has to block. + threadInfo->yielder->getLock().lock(); + auto res = exchange->getNext(); + threadInfo->yielder->getLock().unlock(); + return res; + }; + + for (auto input = getNext(); input.isAdvanced(); input = getNext()) { + // This helps randomizing thread scheduling forcing different threads to load + // buffers. The sleep API is inherently imprecise so we cannot guarantee 100% + // reproducibility. + PseudoRandom prng(getNewSeed()); + sleepmillis(prng.nextInt32() % 50 + 1); + processedDocs.fetchAndAdd(1); + } + }); + + handles.emplace_back(std::move(handle.getValue())); + } + + for (auto& h : handles) + _executor->wait(h); + + ASSERT_EQ(nDocs, processedDocs.load()); +} + TEST_F(DocumentSourceExchangeTest, RangeRandomHashExchangeNConsumer) { const size_t nDocs = 500; auto source = getRandomMockSource(nDocs, getNewSeed()); @@ -491,13 +610,13 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomHashExchangeNConsumer) { spec.setConsumers(nConsumers); spec.setBufferSize(1024); - boost::intrusive_ptr ex = - new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + boost::intrusive_ptr ex = new Exchange( + std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); std::vector> prods; for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx)); + prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); } std::vector handles; -- cgit v1.2.1