diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2021-01-25 15:10:30 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-01 17:13:58 +0000 |
commit | fdf53c68a6eceb8d0dffeb8a48479cfd4d01861f (patch) | |
tree | 5920a2fea19d3e26fab54652225bae00f7e01c1b | |
parent | e31f945ddc59c270ba61c44ca792f4d7058c1703 (diff) | |
download | mongo-fdf53c68a6eceb8d0dffeb8a48479cfd4d01861f.tar.gz |
SERVER-46740 establishCursors() must always drain the AsyncRequestsSender::_baton
-rw-r--r-- | src/mongo/db/service_context.cpp | 6 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 29 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 16 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 57 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 37 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors.cpp | 249 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors.h | 4 | ||||
-rw-r--r-- | src/mongo/transport/baton.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/baton_asio_linux.h | 76 |
10 files changed, 304 insertions, 173 deletions
diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp index 7d64f9c5a2a..d966ab5ffc0 100644 --- a/src/mongo/db/service_context.cpp +++ b/src/mongo/db/service_context.cpp @@ -43,6 +43,7 @@ #include "mongo/db/storage/recovery_unit_noop.h" #include "mongo/stdx/list.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/baton.h" #include "mongo/transport/service_entry_point.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer.h" @@ -270,6 +271,11 @@ void ServiceContext::OperationContextDeleter::operator()(OperationContext* opCtx stdx::lock_guard<Client> lk(*client); client->resetOperationContext(); } + + if (auto baton = opCtx->getBaton()) { + baton->detach(); + } + onDestroy(opCtx, service->_clientObservers); delete opCtx; } diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 58d18efe0b4..4858ef5d53e 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -314,14 +314,14 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa if (baton) { // If we have a baton, we want to get back to the baton thread immediately after we // get a connection - baton->schedule( - [ resolver = std::move(resolver), swConn = std::move(swConn) ]() mutable { - std::move(resolver)(std::move(swConn)); - }); - } else { - // otherwise we're happy to run inline - std::move(resolver)(std::move(swConn)); + if (baton->schedule( + [resolver, swConn]() mutable { std::move(resolver)(std::move(swConn)); })) { + return; + } } + // otherwise we're happy to run inline + std::move(resolver)(std::move(swConn)); + }); return Status::OK(); @@ -472,11 +472,11 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, } if (when <= now()) { - if (baton) { - baton->schedule(std::move(action)); - } else { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); + if (baton && baton->schedule(action)) { + return Status::OK(); } + + _reactor->schedule(transport::Reactor::kPost, std::move(action)); return Status::OK(); } @@ -511,11 +511,10 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, } if (status.isOK()) { - if (baton) { - baton->schedule(std::move(action)); - } else { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); + if (baton && baton->schedule(action)) { + return; } + _reactor->schedule(transport::Reactor::kPost, std::move(action)); } else if (status != ErrorCodes::CallbackCanceled) { warning() << "setAlarm() received an error: " << status; } diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 4fd7347b85e..4b1455f340d 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -589,15 +589,15 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, } for (const auto& cbState : todo) { - if (cbState->baton) { - cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); }); - } else { - const auto status = - _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); - if (status == ErrorCodes::ShutdownInProgress) - break; - fassert(28735, status); + if (cbState->baton && + cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); })) { + continue; } + + const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + if (status == ErrorCodes::ShutdownInProgress) + break; + fassert(28735, status); } _net->signalWorkAvailable(); } diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 821768c2348..21bb2f9eb57 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -101,6 +101,7 @@ env.Library( "async_requests_sender.cpp", ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/commands/test_commands_enabled', "$BUILD_DIR/mongo/db/query/command_request_response", "$BUILD_DIR/mongo/executor/task_executor_interface", "$BUILD_DIR/mongo/s/client/sharding_client", diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 586a76183dd..9cf82f2b49e 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -35,6 +35,7 @@ #include "mongo/s/async_requests_sender.h" #include "mongo/client/remote_command_targeter.h" +#include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/server_parameters.h" #include "mongo/executor/remote_command_request.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -56,6 +57,22 @@ namespace { // Maximum number of retries for network and replication notMaster errors (per host). const int kMaxNumFailedHostRetryAttempts = 3; +// Maximum time to wait for a network operation. +const Seconds kMaxWait = Seconds(20); + +transport::BatonHandle makeBaton(OperationContext* opCtx) { + if (!AsyncRequestsSenderUseBaton.load()) { + return nullptr; + } + + auto tl = opCtx->getServiceContext()->getTransportLayer(); + if (!tl) { + return nullptr; + } + + return tl->makeBaton(opCtx); +} + } // namespace AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, @@ -66,7 +83,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, Shard::RetryPolicy retryPolicy) : _opCtx(opCtx), _executor(executor), - _baton(opCtx), + _baton(makeBaton(_opCtx)), _db(dbName.toString()), _readPreference(readPreference), _retryPolicy(retryPolicy) { @@ -82,6 +99,11 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, } AsyncRequestsSender::~AsyncRequestsSender() { + ON_BLOCK_EXIT([&] { + if (_baton) { + _baton->detach(); + } + }); _cancelPendingRequests(); // Wait on remaining callbacks to run. @@ -276,8 +298,21 @@ void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) { // If we're using a baton, we peek the queue, and block on the baton if it's empty if (boost::optional<boost::optional<Job>> tryJob = _responseQueue.tryPop()) { job = std::move(*tryJob); + } else if (opCtx) { + auto didWork = _baton->run(opCtx, opCtx->getDeadline()); + if (!didWork) { + // If we resumed without doing work, then we hit the deadline which is also + // maxTimeMS. Thus we expect checkForInterrupt() to throw in normal operation. If it + // does not throw, then we are likely subject to the maxTimeNeverTimeOut fail point + // and we should wait without a deadline. + opCtx->checkForInterrupt(); + uassert(ErrorCodes::ExceededTimeLimit, + "Experienced an unexpected timeout outside of testing", + getTestCommandsEnabled()); + _baton->run(opCtx, boost::none); + } } else { - _baton->run(opCtx, boost::none); + _baton->run(nullptr, boost::none); } } else { // Otherwise we block on the queue @@ -331,14 +366,15 @@ Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( auto clock = ars->_opCtx->getServiceContext()->getFastClockSource(); - auto deadline = clock->now() + Seconds(20); + const auto maxDeadline = clock->now() + kMaxWait; + const auto deadline = std::min(ars->_opCtx->getDeadline(), maxDeadline); auto targeter = shard->getTargeter(); auto findHostStatus = [&] { // If we don't have a baton, just go ahead and block in targeting if (!ars->_baton) { - return targeter->findHostWithMaxWait(readPref, Seconds{20}); + return targeter->findHostWithMaxWait(readPref, kMaxWait); } // If we do have a baton, and we can target quickly, just do that @@ -386,17 +422,4 @@ std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() { return Grid::get(getGlobalServiceContext())->shardRegistry()->getShardNoReload(shardId); } -AsyncRequestsSender::BatonDetacher::BatonDetacher(OperationContext* opCtx) - : _baton(AsyncRequestsSenderUseBaton.load() - ? (opCtx->getServiceContext()->getTransportLayer() - ? opCtx->getServiceContext()->getTransportLayer()->makeBaton(opCtx) - : nullptr) - : nullptr) {} - -AsyncRequestsSender::BatonDetacher::~BatonDetacher() { - if (_baton) { - _baton->detach(); - } -} - } // namespace mongo diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index abb6514bd7f..a44101d253f 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -81,7 +81,8 @@ namespace mongo { * Does not throw exceptions. */ class AsyncRequestsSender { - MONGO_DISALLOW_COPYING(AsyncRequestsSender); + AsyncRequestsSender(const AsyncRequestsSender&) = delete; + AsyncRequestsSender& operator=(const AsyncRequestsSender&) = delete; public: /** @@ -215,38 +216,6 @@ private: }; /** - * We have to make sure to detach the baton if we throw in construction. We also need a baton - * that lives longer than this type (because it can end up in callbacks that won't actually - * modify it). - * - * TODO: work out actual lifetime semantics for a baton. For now, leaving this as a wort in ARS - */ - class BatonDetacher { - public: - explicit BatonDetacher(OperationContext* opCtx); - ~BatonDetacher(); - - transport::Baton& operator*() const { - return *_baton; - } - - transport::Baton* operator->() const noexcept { - return _baton.get(); - } - - operator transport::BatonHandle() const { - return _baton; - } - - explicit operator bool() const noexcept { - return static_cast<bool>(_baton); - } - - private: - transport::BatonHandle _baton; - }; - - /** * Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag. */ void _cancelPendingRequests(); @@ -292,7 +261,7 @@ private: OperationContext* _opCtx; executor::TaskExecutor* _executor; - BatonDetacher _baton; + transport::BatonHandle _baton; size_t _batonRequests = 0; // The metadata obj to pass along with the command remote. Used to indicate that the command is diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index 588902151a5..82c12d79204 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -34,6 +34,8 @@ #include "mongo/s/query/establish_cursors.h" +#include <set> + #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/query/cursor_response.h" @@ -49,105 +51,176 @@ namespace mongo { -std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, - executor::TaskExecutor* executor, - const NamespaceString& nss, - const ReadPreferenceSetting readPref, - const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults) { +namespace { + +/** + * This class wraps logic for establishing cursors using a MultiStatementTransactionRequestsSender. + */ +class CursorEstablisher { +public: + CursorEstablisher(OperationContext* opCtx, + executor::TaskExecutor* executor, + const NamespaceString& nss, + bool allowPartialResults) + : _opCtx(opCtx), + _executor{std::move(executor)}, + _nss(nss), + _allowPartialResults(allowPartialResults) {} + + /** + * Make a RequestSender and thus send requests. + */ + void sendRequests(const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + Shard::RetryPolicy retryPolicy); + + /** + * Wait for a single response via the RequestSender. + */ + void waitForResponse() noexcept; + + /** + * Wait for all responses via the RequestSender. + */ + void waitForResponses() noexcept { + while (!_ars->done()) { + waitForResponse(); + } + } + + /** + * If any request recieved a non-retriable error response and partial results are not allowed, + * cancel any requests that may have succeeded and throw the first such error encountered. + */ + void checkForFailedRequests(); + + /** + * Take all cursors currently tracked by the CursorEstablsher. + */ + std::vector<RemoteCursor> takeCursors() { + return std::exchange(_remoteCursors, {}); + }; + +private: + void _handleFailure(const AsyncRequestsSender::Response& response, Status status) noexcept; + + OperationContext* const _opCtx; + executor::TaskExecutor* const _executor; + const NamespaceString _nss; + const bool _allowPartialResults; + + std::unique_ptr<AsyncRequestsSender> _ars; + + boost::optional<Status> _maybeFailure; + std::vector<RemoteCursor> _remoteCursors; +}; + +void CursorEstablisher::sendRequests(const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + Shard::RetryPolicy retryPolicy) { // Construct the requests std::vector<AsyncRequestsSender::Request> requests; for (const auto& remote : remotes) { requests.emplace_back(remote.first, remote.second); } + LOG(3) << "Establishing cursors on remotes {" + << "opId: " << _opCtx->getOpID() << "," + << "numRemotes: " << remotes.size() << "}"; + // Send the requests - AsyncRequestsSender ars(opCtx, - executor, - nss.db().toString(), - std::move(requests), - readPref, - Shard::RetryPolicy::kIdempotent); - - std::vector<RemoteCursor> remoteCursors; + _ars = std::make_unique<AsyncRequestsSender>( + _opCtx, _executor, _nss.db().toString(), std::move(requests), readPref, retryPolicy); +} + +void CursorEstablisher::waitForResponse() noexcept { + auto response = _ars->next(); try { - // Get the responses - while (!ars.done()) { - try { - auto response = ars.next(); - // Note the shardHostAndPort may not be populated if there was an error, so be sure - // to do this after parsing the cursor response to ensure the response was ok. - // Additionally, be careful not to push into 'remoteCursors' until we are sure we - // have a valid cursor, since the error handling path will attempt to clean up - // anything in 'remoteCursors' - RemoteCursor cursor; - cursor.setCursorResponse(CursorResponse::parseFromBSONThrowing( - uassertStatusOK(std::move(response.swResponse)).data)); - cursor.setShardId(std::move(response.shardId)); - cursor.setHostAndPort(*response.shardHostAndPort); - remoteCursors.push_back(std::move(cursor)); - } catch (const DBException& ex) { - // Retriable errors are swallowed if 'allowPartialResults' is true. Targeting shard - // replica sets can also throw FailedToSatisfyReadPreference, so we swallow it too. - bool isEligibleException = (ErrorCodes::isRetriableError(ex.code()) || - ex.code() == ErrorCodes::FailedToSatisfyReadPreference); - // Fail if the exception is something other than a retriable or read preference - // error, or if the 'allowPartialResults' query parameter was not enabled. - if (!allowPartialResults || !isEligibleException) { - throw; - } - } - } - return remoteCursors; - } catch (const DBException&) { - // If one of the remotes had an error, we make a best effort to finish retrieving responses - // for other requests that were already sent, so that we can send killCursors to any cursors - // that we know were established. - try { - // Do not schedule any new requests. - ars.stopRetrying(); - - // Collect responses from all requests that were already sent. - while (!ars.done()) { - auto response = ars.next(); - - // Check if the response contains an established cursor, and if so, store it. - StatusWith<CursorResponse> swCursorResponse( - response.swResponse.isOK() - ? CursorResponse::parseFromBSON(response.swResponse.getValue().data) - : response.swResponse.getStatus()); - - if (swCursorResponse.isOK()) { - RemoteCursor cursor; - cursor.setShardId(std::move(response.shardId)); - cursor.setHostAndPort(*response.shardHostAndPort); - cursor.setCursorResponse(std::move(swCursorResponse.getValue())); - remoteCursors.push_back(std::move(cursor)); - } - } - - // Schedule killCursors against all cursors that were established. - for (const auto& remoteCursor : remoteCursors) { - BSONObj cmdObj = - KillCursorsRequest(nss, {remoteCursor.getCursorResponse().getCursorId()}) - .toBSON(); - executor::RemoteCommandRequest request( - remoteCursor.getHostAndPort(), nss.db().toString(), cmdObj, opCtx); - - // We do not process the response to the killCursors request (we make a good-faith - // attempt at cleaning up the cursors, but ignore any returned errors). - executor - ->scheduleRemoteCommand( - request, - [](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {}) - .status_with_transitional_ignore(); - } - } catch (const DBException&) { - // Ignore the new error and rethrow the original one. + // Note the shardHostAndPort may not be populated if there was an error, so be sure + // to do this after parsing the cursor response to ensure the response was ok. + // Additionally, be careful not to push into 'remoteCursors' until we are sure we + // have a valid cursor, since the error handling path will attempt to clean up + // anything in 'remoteCursors' + auto responseData = uassertStatusOK(std::move(response.swResponse)).data; + auto cursorResponse = CursorResponse::parseFromBSONThrowing(std::move(responseData)); + + RemoteCursor cursor; + cursor.setCursorResponse(std::move(cursorResponse)); + cursor.setShardId(std::move(response.shardId)); + cursor.setHostAndPort(*response.shardHostAndPort); + _remoteCursors.push_back(std::move(cursor)); + } catch (const DBException& ex) { + _handleFailure(response, ex.toStatus()); + } +} + +void CursorEstablisher::checkForFailedRequests() { + if (!_maybeFailure) { + // If we saw no failures, there is nothing to do. + return; + } + + LOG(0) << "Unable to establish remote cursors - {" + << "error: " << *_maybeFailure << ", " + << "numActiveRemotes: " << _remoteCursors.size() << "}"; + + // Schedule killCursors against all cursors that were established. + for (const auto& remoteCursor : _remoteCursors) { + BSONObj cmdObj = + KillCursorsRequest(_nss, {remoteCursor.getCursorResponse().getCursorId()}).toBSON(); + executor::RemoteCommandRequest request( + remoteCursor.getHostAndPort(), _nss.db().toString(), cmdObj, _opCtx); + + // We do not process the response to the killCursors request (we make a good-faith + // attempt at cleaning up the cursors, but ignore any returned errors). + auto swHandle = _executor->scheduleRemoteCommand( + request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {}); + if (!swHandle.isOK()) { + LOG(3) << "Unable to cancel remote cursor - " << swHandle.getStatus(); } + } + - throw; + // Throw our failure. + uassertStatusOK(*_maybeFailure); +} + +void CursorEstablisher::_handleFailure(const AsyncRequestsSender::Response& response, + Status status) noexcept { + LOG(3) << "Experienced a failure while establishing cursors - " << status; + if (_maybeFailure) { + // If we've already failed, just log and move on. + return; } + + // Retriable errors are swallowed if '_allowPartialResults' is true. Targeting shard replica + // sets can also throw FailedToSatisfyReadPreference, so we swallow it too. + bool isEligibleException = (ErrorCodes::isRetriableError(status.code()) || + status.code() == ErrorCodes::FailedToSatisfyReadPreference); + if (_allowPartialResults && isEligibleException) { + // This exception is eligible to be swallowed. + return; + } + + // Do not schedule any new requests. + _ars->stopRetrying(); + _maybeFailure = std::move(status); +} + +} // namespace + +std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, + executor::TaskExecutor* executor, + const NamespaceString& nss, + const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + bool allowPartialResults, + Shard::RetryPolicy retryPolicy) { + auto establisher = CursorEstablisher(opCtx, executor, nss, allowPartialResults); + establisher.sendRequests(readPref, remotes, retryPolicy); + establisher.waitForResponses(); + establisher.checkForFailedRequests(); + return establisher.takeCursors(); } } // namespace mongo diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h index df73973f9fa..db5a21314db 100644 --- a/src/mongo/s/query/establish_cursors.h +++ b/src/mongo/s/query/establish_cursors.h @@ -38,6 +38,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/cursor_id.h" #include "mongo/executor/task_executor.h" +#include "mongo/s/client/shard.h" #include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" @@ -68,6 +69,7 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, const NamespaceString& nss, const ReadPreferenceSetting readPref, const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults); + bool allowPartialResults, + Shard::RetryPolicy = Shard::RetryPolicy::kIdempotent); } // namespace mongo diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h index 549e84b32a6..f1f96b7d4fb 100644 --- a/src/mongo/transport/baton.h +++ b/src/mongo/transport/baton.h @@ -84,7 +84,7 @@ public: /** * Executes a callback on the baton. */ - virtual void schedule(stdx::function<void()> func) = 0; + virtual bool schedule(stdx::function<void()> func) = 0; /** * Wakes the Baton up if it is currently blocked, or ensures that the next time it tries to diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index 0c64f4a8eb0..47f904b3030 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -56,6 +56,9 @@ namespace transport { * We implement our networking reactor on top of poll + eventfd for wakeups */ class TransportLayerASIO::BatonASIO : public Baton { + static auto makeDetachedError() { + return Status(ErrorCodes::ShutdownInProgress, "Baton detached"); + } /** * RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the @@ -110,23 +113,64 @@ public: } void detach() override { + OperationContext* opCtx; + { stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_sessions.empty()); - invariant(_scheduled.empty()); - invariant(_timers.empty()); + if (!_opCtx) { + return; + } + + opCtx = std::exchange(_opCtx, nullptr); } { - stdx::lock_guard<Client> lk(*_opCtx->getClient()); - invariant(_opCtx->getBaton().get() == this); - _opCtx->setBaton(nullptr); + stdx::lock_guard<Client> lk(*opCtx->getClient()); + invariant(opCtx->getBaton().get() == this); + opCtx->setBaton(nullptr); + } + + decltype(_scheduled) scheduled; + decltype(_sessions) sessions; + decltype(_timers) timers; + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + using std::swap; + swap(_scheduled, scheduled); + swap(_sessions, sessions); + swap(_timers, timers); + _timersById.clear(); + } + + for (auto& job : scheduled) { + try { + job(); + job = {}; + } catch (const DBException& ex) { + LOG(3) << "Job threw during detach: " << ex; + } + } + + for (auto& session : sessions) { + session.second.promise.setError(makeDetachedError()); } - _opCtx = nullptr; + for (auto& timer : timers) { + auto promise = std::move(timer.promise); + promise.setError(makeDetachedError()); + } } Future<void> addSession(Session& session, Type type) override { + { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (!_opCtx) { + return makeDetachedError(); + } + } + auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); auto pf = makePromiseFuture<void>(); @@ -142,6 +186,13 @@ public: } Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override { + { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (!_opCtx) { + return makeDetachedError(); + } + } + auto pf = makePromiseFuture<void>(); _safeExecute([ timerPtr = &timer, expiration, sp = pf.promise.share(), this ] { auto pair = _timers.insert({ @@ -191,14 +242,21 @@ public: return true; } - void schedule(stdx::function<void()> func) override { + bool schedule(stdx::function<void()> func) override { stdx::lock_guard<stdx::mutex> lk(_mutex); + if (!_opCtx) { + // If we're already detached, people have to find somewhere else to run. + return false; + } + _scheduled.push_back(std::move(func)); if (_inPoll) { _efd.notify(); } + + return true; } void notify() noexcept override { @@ -383,7 +441,7 @@ private: */ template <typename Callback> void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { - if (_inPoll) { + if (_inPoll && _opCtx) { _scheduled.push_back([cb, this] { stdx::lock_guard<stdx::mutex> lk(_mutex); cb(); |