summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2018-12-04 14:46:24 -0500
committerIan Boros <ian.boros@10gen.com>2018-12-19 18:35:53 -0500
commit4eee17a5fdc14af2c3770b01cc4f906fa3620fe5 (patch)
treee4284a4756f5b37a118ef7ace2aa566ac1d05606 /src
parent7dd4db71be216b202ac05546c07329ba2727338d (diff)
downloadmongo-4eee17a5fdc14af2c3770b01cc4f906fa3620fe5.tar.gz
SERVER-37499 prevent deadlock within Exchange during transaction
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.cpp61
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.h28
-rw-r--r--src/mongo/db/pipeline/document_source_exchange_test.cpp153
4 files changed, 219 insertions, 27 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index affce5b9600..e62c2bfd10a 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -544,8 +544,8 @@ Status runAggregate(OperationContext* opCtx,
// Create a new pipeline for the consumer consisting of a single
// DocumentSourceExchange.
- boost::intrusive_ptr<DocumentSource> consumer =
- new DocumentSourceExchange(expCtx, exchange, idx);
+ boost::intrusive_ptr<DocumentSource> consumer = new DocumentSourceExchange(
+ expCtx, exchange, idx, expCtx->mongoProcessInterface->getResourceYielder());
pipelines.emplace_back(uassertStatusOK(Pipeline::create({consumer}, expCtx)));
}
} else {
diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp
index c2de6307031..0256a306ae6 100644
--- a/src/mongo/db/pipeline/document_source_exchange.cpp
+++ b/src/mongo/db/pipeline/document_source_exchange.cpp
@@ -36,6 +36,7 @@
#include <iterator>
#include <set>
+#include "mongo/db/curop.h"
#include "mongo/db/hasher.h"
#include "mongo/db/pipeline/document_source_exchange.h"
#include "mongo/db/storage/key_string.h"
@@ -45,6 +46,45 @@ namespace mongo {
MONGO_FAIL_POINT_DEFINE(exchangeFailLoadNextBatch);
+class MutexAndResourceLock {
+ OperationContext* _opCtx;
+ ResourceYielder* _resourceYielder;
+ stdx::unique_lock<stdx::mutex> _lock;
+
+public:
+ // Must be constructed with the mutex held. 'yielder' may be null if there are no resources
+ // which need to be yielded while waiting.
+ MutexAndResourceLock(OperationContext* opCtx,
+ stdx::unique_lock<stdx::mutex> m,
+ ResourceYielder* yielder)
+ : _opCtx(opCtx), _resourceYielder(yielder), _lock(std::move(m)) {
+ invariant(_lock.owns_lock());
+ }
+
+ void lock() {
+ // Acquire the operation-wide resources, then the mutex.
+ if (_resourceYielder) {
+ _resourceYielder->unyield(_opCtx);
+ }
+ _lock.lock();
+ }
+ void unlock() {
+ _lock.unlock();
+ if (_resourceYielder) {
+ _resourceYielder->yield(_opCtx);
+ }
+ }
+
+ /**
+ * Releases ownership of the lock to the caller. May only be called when the mutex is held
+ * (after a call to unlock(), for example).
+ */
+ stdx::unique_lock<stdx::mutex> releaseLockOwnership() {
+ invariant(_lock.owns_lock());
+ return std::move(_lock);
+ }
+};
+
constexpr size_t Exchange::kMaxBufferSize;
constexpr size_t Exchange::kMaxNumberConsumers;
@@ -59,11 +99,15 @@ Value DocumentSourceExchange::serialize(boost::optional<ExplainOptions::Verbosit
DocumentSourceExchange::DocumentSourceExchange(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const boost::intrusive_ptr<Exchange> exchange,
- size_t consumerId)
- : DocumentSource(expCtx), _exchange(exchange), _consumerId(consumerId) {}
+ size_t consumerId,
+ std::unique_ptr<ResourceYielder> yielder)
+ : DocumentSource(expCtx),
+ _exchange(exchange),
+ _consumerId(consumerId),
+ _resourceYielder(std::move(yielder)) {}
DocumentSource::GetNextResult DocumentSourceExchange::getNext() {
- return _exchange->getNext(pExpCtx->opCtx, _consumerId);
+ return _exchange->getNext(pExpCtx->opCtx, _consumerId, _resourceYielder.get());
}
Exchange::Exchange(ExchangeSpec spec, std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
@@ -236,11 +280,16 @@ void Exchange::unblockLoading(size_t consumerId) {
_haveBufferSpace.notify_all();
}
}
-DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t consumerId) {
+DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx,
+ size_t consumerId,
+ ResourceYielder* resourceYielder) {
// Grab a lock.
stdx::unique_lock<stdx::mutex> lk(_mutex);
for (;;) {
+ // Guard against some of the trickiness we do with moving the lock to/from the
+ // MutexAndResourceLock.
+ invariant(lk.owns_lock());
// Execute only in case we have not encountered an error.
if (!_errorInLoadNextBatch.isOK()) {
uasserted(ErrorCodes::ExchangePassthrough,
@@ -296,7 +345,9 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t
} else {
// Some other consumer is already loading the buffers. There is nothing else we can do
// but wait.
- _haveBufferSpace.wait(lk);
+ MutexAndResourceLock mutexAndResourceLock(opCtx, std::move(lk), resourceYielder);
+ _haveBufferSpace.wait(mutexAndResourceLock);
+ lk = mutexAndResourceLock.releaseLockOwnership();
}
}
}
diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h
index 50d84344932..071c04ec32d 100644
--- a/src/mongo/db/pipeline/document_source_exchange.h
+++ b/src/mongo/db/pipeline/document_source_exchange.h
@@ -72,8 +72,19 @@ class Exchange : public RefCountable {
static std::vector<FieldPath> extractKeyPaths(const BSONObj& keyPattern);
public:
+ /**
+ * Create an exchange. 'pipeline' represents the input to the exchange operator and must not be
+ * nullptr.
+ **/
Exchange(ExchangeSpec spec, std::unique_ptr<Pipeline, PipelineDeleter> pipeline);
- DocumentSource::GetNextResult getNext(OperationContext* opCtx, size_t consumerId);
+
+ /**
+ * Interface for retrieving the next document. 'resourceYielder' is optional, and if provided,
+ * will be used to give up resources while waiting for other threads to empty their buffers.
+ */
+ DocumentSource::GetNextResult getNext(OperationContext* opCtx,
+ size_t consumerId,
+ ResourceYielder* resourceYielder);
size_t getConsumers() const {
return _consumers.size();
@@ -161,7 +172,7 @@ private:
// Synchronization.
stdx::mutex _mutex;
- stdx::condition_variable _haveBufferSpace;
+ stdx::condition_variable_any _haveBufferSpace;
// A thread that is currently loading the exchange buffers.
size_t _loadingThreadId{kInvalidThreadId};
@@ -181,9 +192,16 @@ private:
class DocumentSourceExchange final : public DocumentSource {
public:
+ /**
+ * Create an Exchange consumer. 'resourceYielder' is so the exchange may temporarily yield
+ * resources (such as the Session) while waiting for other threads to do
+ * work. 'resourceYielder' may be nullptr if there are no resources which need to be given up
+ * while waiting.
+ */
DocumentSourceExchange(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const boost::intrusive_ptr<Exchange> exchange,
- size_t consumerId);
+ size_t consumerId,
+ std::unique_ptr<ResourceYielder> yielder);
GetNextResult getNext() final;
@@ -230,6 +248,10 @@ private:
boost::intrusive_ptr<Exchange> _exchange;
const size_t _consumerId;
+
+ // While waiting for another thread to make room in its buffer, we may want to yield certain
+ // resources (such as the Session). Through this interface we can do that.
+ std::unique_ptr<ResourceYielder> _resourceYielder;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_exchange_test.cpp b/src/mongo/db/pipeline/document_source_exchange_test.cpp
index 6ab1fbc918f..37ac0c65853 100644
--- a/src/mongo/db/pipeline/document_source_exchange_test.cpp
+++ b/src/mongo/db/pipeline/document_source_exchange_test.cpp
@@ -125,10 +125,10 @@ TEST_F(DocumentSourceExchangeTest, SimpleExchange1Consumer) {
boost::intrusive_ptr<Exchange> ex =
new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx())));
- auto input = ex->getNext(getExpCtx()->opCtx, 0);
+ auto input = ex->getNext(getExpCtx()->opCtx, 0, nullptr);
size_t docs = 0;
- for (; input.isAdvanced(); input = ex->getNext(getExpCtx()->opCtx, 0)) {
+ for (; input.isAdvanced(); input = ex->getNext(getExpCtx()->opCtx, 0, nullptr)) {
++docs;
}
@@ -154,7 +154,7 @@ TEST_F(DocumentSourceExchangeTest, SimpleExchangeNConsumer) {
std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
}
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -201,7 +201,7 @@ TEST_F(DocumentSourceExchangeTest, ExchangeNConsumerEarlyout) {
std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
}
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -255,7 +255,7 @@ TEST_F(DocumentSourceExchangeTest, BroadcastExchangeNConsumer) {
std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
}
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -300,13 +300,13 @@ TEST_F(DocumentSourceExchangeTest, RangeExchangeNConsumer) {
spec.setConsumers(nConsumers);
spec.setBufferSize(1024);
- boost::intrusive_ptr<Exchange> ex =
- new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ boost::intrusive_ptr<Exchange> ex = new Exchange(
+ std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
}
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -366,13 +366,13 @@ TEST_F(DocumentSourceExchangeTest, RangeShardingExchangeNConsumer) {
spec.setConsumers(nConsumers);
spec.setBufferSize(1024);
- boost::intrusive_ptr<Exchange> ex =
- new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ boost::intrusive_ptr<Exchange> ex = new Exchange(
+ std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
}
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -423,13 +423,13 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomExchangeNConsumer) {
spec.setConsumers(nConsumers);
spec.setBufferSize(1024);
- boost::intrusive_ptr<Exchange> ex =
- new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ boost::intrusive_ptr<Exchange> ex = new Exchange(
+ std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
}
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -469,6 +469,125 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomExchangeNConsumer) {
ASSERT_EQ(nDocs, processedDocs.load());
}
+TEST_F(DocumentSourceExchangeTest, RandomExchangeNConsumerResourceYielding) {
+ const size_t nDocs = 500;
+ auto source = getRandomMockSource(nDocs, getNewSeed());
+
+ const std::vector<BSONObj> boundaries = {
+ BSON("a" << MINKEY), BSON("a" << 500), BSON("a" << MAXKEY)};
+
+ const size_t nConsumers = boundaries.size() - 1;
+
+ ASSERT(nDocs % nConsumers == 0);
+
+ ExchangeSpec spec;
+ spec.setPolicy(ExchangePolicyEnum::kKeyRange);
+ spec.setKey(BSON("a" << 1));
+ spec.setBoundaries(boundaries);
+ spec.setConsumers(nConsumers);
+
+ // Tiny buffer so if there are deadlocks possible they reproduce more often.
+ spec.setBufferSize(64);
+
+ // An "artifical" mutex that's not actually necessary for thread safety. We enforce that each
+ // thread holds this while it calls getNext(). This is to simulate the case where a thread may
+ // hold some "real" resources which need to be yielded while waiting, such as the Session, or
+ // the locks held in a transaction.
+ stdx::mutex artificalGlobalMutex;
+
+ boost::intrusive_ptr<Exchange> ex =
+ new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+
+ /**
+ * This class is used for an Exchange consumer to temporarily relinquish control of a mutex
+ * while it's blocked.
+ */
+ class MutexYielder : public ResourceYielder {
+ public:
+ MutexYielder(stdx::mutex* mutex) : _lock(*mutex, std::defer_lock) {}
+
+ void yield(OperationContext* opCtx) override {
+ _lock.unlock();
+ }
+
+ void unyield(OperationContext* opCtx) override {
+ _lock.lock();
+ }
+
+ stdx::unique_lock<stdx::mutex>& getLock() {
+ return _lock;
+ }
+
+ private:
+ stdx::unique_lock<stdx::mutex> _lock;
+ };
+
+ /**
+ * Used to keep track of each client and operation context.
+ */
+ struct ThreadInfo {
+ ServiceContext::UniqueClient client;
+ ServiceContext::UniqueOperationContext opCtx;
+ boost::intrusive_ptr<DocumentSourceExchange> documentSourceExchange;
+ MutexYielder* yielder;
+ };
+ std::vector<ThreadInfo> threads;
+
+ for (size_t idx = 0; idx < nConsumers; ++idx) {
+ ServiceContext::UniqueClient client = getServiceContext()->makeClient("exchange client");
+ ServiceContext::UniqueOperationContext opCtxOwned =
+ getServiceContext()->makeOperationContext(client.get());
+ OperationContext* opCtx = opCtxOwned.get();
+ auto yielder = std::make_unique<MutexYielder>(&artificalGlobalMutex);
+ auto yielderRaw = yielder.get();
+
+ threads.push_back(
+ ThreadInfo{std::move(client),
+ std::move(opCtxOwned),
+ new DocumentSourceExchange(
+ new ExpressionContext(opCtx, nullptr), ex, idx, std::move(yielder)),
+ yielderRaw
+ });
+ }
+
+ std::vector<executor::TaskExecutor::CallbackHandle> handles;
+
+ AtomicWord<size_t> processedDocs{0};
+
+ for (size_t id = 0; id < nConsumers; ++id) {
+ ThreadInfo* threadInfo = &threads[id];
+ auto handle = _executor->scheduleWork(
+ [threadInfo, &processedDocs](const executor::TaskExecutor::CallbackArgs& cb) {
+
+ DocumentSourceExchange* exchange = threadInfo->documentSourceExchange.get();
+ const auto getNext = [exchange, threadInfo]() {
+ // Will acquire 'artificalGlobalMutex'. Within getNext() it will be released and
+ // reacquired by the MutexYielder if the Exchange has to block.
+ threadInfo->yielder->getLock().lock();
+ auto res = exchange->getNext();
+ threadInfo->yielder->getLock().unlock();
+ return res;
+ };
+
+ for (auto input = getNext(); input.isAdvanced(); input = getNext()) {
+ // This helps randomizing thread scheduling forcing different threads to load
+ // buffers. The sleep API is inherently imprecise so we cannot guarantee 100%
+ // reproducibility.
+ PseudoRandom prng(getNewSeed());
+ sleepmillis(prng.nextInt32() % 50 + 1);
+ processedDocs.fetchAndAdd(1);
+ }
+ });
+
+ handles.emplace_back(std::move(handle.getValue()));
+ }
+
+ for (auto& h : handles)
+ _executor->wait(h);
+
+ ASSERT_EQ(nDocs, processedDocs.load());
+}
+
TEST_F(DocumentSourceExchangeTest, RangeRandomHashExchangeNConsumer) {
const size_t nDocs = 500;
auto source = getRandomMockSource(nDocs, getNewSeed());
@@ -491,13 +610,13 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomHashExchangeNConsumer) {
spec.setConsumers(nConsumers);
spec.setBufferSize(1024);
- boost::intrusive_ptr<Exchange> ex =
- new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ boost::intrusive_ptr<Exchange> ex = new Exchange(
+ std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
}
std::vector<executor::TaskExecutor::CallbackHandle> handles;