diff options
author | Jason Carey <jcarey@argv.me> | 2016-08-19 16:24:09 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2016-08-22 19:55:09 -0400 |
commit | f4c253f8e3b143e569e7d23ab2bd3462c02cb131 (patch) | |
tree | 286e0784994ce42f9ecc874f18a50f11b1d52bbd | |
parent | adc0df4ccce9d4d1ed9e723e1101134f816a67b4 (diff) | |
download | mongo-f4c253f8e3b143e569e7d23ab2bd3462c02cb131.tar.gz |
SERVER-25465 Fix crasher in cpasio setup timeout
Under enough load, asio can get behind in timeout management. Make sure to bump timer generation to
avoid hitting the callback any time _impl wouldn't be available.
(cherry picked from commit 226a65c73b821760053c58a174e06aa769c59a2d)
-rw-r--r-- | src/mongo/executor/connection_pool_asio.cpp | 23 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_asio.h | 1 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_asio_integration_test.cpp | 65 |
3 files changed, 85 insertions, 4 deletions
diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp index 4fda6c80cdf..9786b0bddcf 100644 --- a/src/mongo/executor/connection_pool_asio.cpp +++ b/src/mongo/executor/connection_pool_asio.cpp @@ -113,6 +113,13 @@ ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation _impl(makeAsyncOp(this)), _timer(&_impl->strand()) {} +ASIOConnection::~ASIOConnection() { + if (_impl) { + stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex); + _impl->_access->id++; + } +} + void ASIOConnection::indicateSuccess() { _status = Status::OK(); } @@ -187,20 +194,24 @@ void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) { cb(ptr, status); }; + // Capturing the shared access pad and generation before calling setTimeout gives us enough + // information to avoid calling the timer if we shouldn't without needing any other + // resources that might have been cleaned up. + decltype(_impl->_access) access; std::size_t generation; { stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex); - generation = _impl->_access->id; + access = _impl->_access; + generation = access->id; } // Actually timeout setup setTimeout(timeout, - [this, generation] { + [this, access, generation] { // For operations that time out immediately, we can't simply cancel the // connection, because it may not have been initialized. - stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex); + stdx::lock_guard<stdx::mutex> lk(access->mutex); - auto access = _impl->_access; if (generation != access->id) { // The operation has been cleaned up, do not access. return; @@ -276,6 +287,10 @@ void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) { } std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::releaseAsyncOp() { + { + stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex); + _impl->_access->id++; + } return std::move(_impl); } diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h index 2924331d61c..4cdcd14ec90 100644 --- a/src/mongo/executor/connection_pool_asio.h +++ b/src/mongo/executor/connection_pool_asio.h @@ -70,6 +70,7 @@ private: class ASIOConnection final : public ConnectionPool::ConnectionInterface { public: ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global); + ~ASIOConnection(); void indicateSuccess() override; void indicateUsed() override; diff --git a/src/mongo/executor/connection_pool_asio_integration_test.cpp b/src/mongo/executor/connection_pool_asio_integration_test.cpp index 0569c212d32..f69d1f5d4ce 100644 --- a/src/mongo/executor/connection_pool_asio_integration_test.cpp +++ b/src/mongo/executor/connection_pool_asio_integration_test.cpp @@ -256,6 +256,71 @@ TEST(ConnectionPoolASIO, ConnRefreshSurvivesFailure) { ASSERT_EQ(cps.totalCreated, 1u); } +/** + * Tests that thrashing the timer while calls to setup are occurring doesn't crash. This could + * occur if a setup connection goes through, but the timer thread is too over-worked to cancel the + * timers before they're invoked + */ +TEST(ConnectionPoolASIO, ConnSetupSurvivesFailure) { + auto fixture = unittest::getFixtureConnectionString(); + + NetworkInterfaceASIO::Options options; + options.streamFactory = stdx::make_unique<AsyncStreamFactory>(); + options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>(); + options.connectionPoolOptions.refreshTimeout = Seconds(1); + NetworkInterfaceASIO net{std::move(options)}; + + net.startup(); + auto guard = MakeGuard([&] { net.shutdown(); }); + + const int kNumThreads = 8; + const int kNumOps = 1000; + + AtomicWord<size_t> unfinished(kNumThreads * kNumOps); + AtomicWord<size_t> unstarted(kNumThreads * kNumOps); + + std::array<stdx::thread, kNumThreads> threads; + stdx::mutex mutex; + stdx::condition_variable condvar; + + for (auto& thread : threads) { + thread = stdx::thread([&] { + for (int i = 0; i < kNumOps; i++) { + RemoteCommandRequest request{fixture.getServers()[0], + "admin", + BSON("sleep" << 1 << "lock" + << "none" + << "secs" << 3), + BSONObj()}; + net.startCommand(makeCallbackHandle(), + request, + [&](StatusWith<RemoteCommandResponse> resp) { + if (!unfinished.subtractAndFetch(1)) { + condvar.notify_one(); + } + }); + unstarted.subtractAndFetch(1); + } + }); + } + + stdx::thread timerThrasher([&] { + while (unstarted.load()) { + net.setAlarm(Date_t::now() + Seconds(1), [] {}); + } + }); + + + for (auto& thread : threads) { + thread.join(); + } + + stdx::unique_lock<stdx::mutex> lk(mutex); + condvar.wait(lk, [&] { return !unfinished.load(); }); + + timerThrasher.join(); +} + } // namespace } // namespace executor } // namespace mongo |