summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2021-01-25 15:10:30 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-01 17:13:58 +0000
commitfdf53c68a6eceb8d0dffeb8a48479cfd4d01861f (patch)
tree5920a2fea19d3e26fab54652225bae00f7e01c1b
parente31f945ddc59c270ba61c44ca792f4d7058c1703 (diff)
downloadmongo-fdf53c68a6eceb8d0dffeb8a48479cfd4d01861f.tar.gz
SERVER-46740 establishCursors() must always drain the AsyncRequestsSender::_baton
-rw-r--r--src/mongo/db/service_context.cpp6
-rw-r--r--src/mongo/executor/network_interface_tl.cpp29
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp16
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/async_requests_sender.cpp57
-rw-r--r--src/mongo/s/async_requests_sender.h37
-rw-r--r--src/mongo/s/query/establish_cursors.cpp249
-rw-r--r--src/mongo/s/query/establish_cursors.h4
-rw-r--r--src/mongo/transport/baton.h2
-rw-r--r--src/mongo/transport/baton_asio_linux.h76
10 files changed, 304 insertions, 173 deletions
diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp
index 7d64f9c5a2a..d966ab5ffc0 100644
--- a/src/mongo/db/service_context.cpp
+++ b/src/mongo/db/service_context.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/storage/recovery_unit_noop.h"
#include "mongo/stdx/list.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/baton.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer.h"
@@ -270,6 +271,11 @@ void ServiceContext::OperationContextDeleter::operator()(OperationContext* opCtx
stdx::lock_guard<Client> lk(*client);
client->resetOperationContext();
}
+
+ if (auto baton = opCtx->getBaton()) {
+ baton->detach();
+ }
+
onDestroy(opCtx, service->_clientObservers);
delete opCtx;
}
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 58d18efe0b4..4858ef5d53e 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -314,14 +314,14 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
if (baton) {
// If we have a baton, we want to get back to the baton thread immediately after we
// get a connection
- baton->schedule(
- [ resolver = std::move(resolver), swConn = std::move(swConn) ]() mutable {
- std::move(resolver)(std::move(swConn));
- });
- } else {
- // otherwise we're happy to run inline
- std::move(resolver)(std::move(swConn));
+ if (baton->schedule(
+ [resolver, swConn]() mutable { std::move(resolver)(std::move(swConn)); })) {
+ return;
+ }
}
+ // otherwise we're happy to run inline
+ std::move(resolver)(std::move(swConn));
+
});
return Status::OK();
@@ -472,11 +472,11 @@ Status NetworkInterfaceTL::setAlarm(Date_t when,
}
if (when <= now()) {
- if (baton) {
- baton->schedule(std::move(action));
- } else {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ if (baton && baton->schedule(action)) {
+ return Status::OK();
}
+
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
return Status::OK();
}
@@ -511,11 +511,10 @@ Status NetworkInterfaceTL::setAlarm(Date_t when,
}
if (status.isOK()) {
- if (baton) {
- baton->schedule(std::move(action));
- } else {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ if (baton && baton->schedule(action)) {
+ return;
}
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
} else if (status != ErrorCodes::CallbackCanceled) {
warning() << "setAlarm() received an error: " << status;
}
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 4fd7347b85e..4b1455f340d 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -589,15 +589,15 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
}
for (const auto& cbState : todo) {
- if (cbState->baton) {
- cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); });
- } else {
- const auto status =
- _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
- if (status == ErrorCodes::ShutdownInProgress)
- break;
- fassert(28735, status);
+ if (cbState->baton &&
+ cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); })) {
+ continue;
}
+
+ const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ if (status == ErrorCodes::ShutdownInProgress)
+ break;
+ fassert(28735, status);
}
_net->signalWorkAvailable();
}
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 821768c2348..21bb2f9eb57 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -101,6 +101,7 @@ env.Library(
"async_requests_sender.cpp",
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/commands/test_commands_enabled',
"$BUILD_DIR/mongo/db/query/command_request_response",
"$BUILD_DIR/mongo/executor/task_executor_interface",
"$BUILD_DIR/mongo/s/client/sharding_client",
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 586a76183dd..9cf82f2b49e 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -35,6 +35,7 @@
#include "mongo/s/async_requests_sender.h"
#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/server_parameters.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -56,6 +57,22 @@ namespace {
// Maximum number of retries for network and replication notMaster errors (per host).
const int kMaxNumFailedHostRetryAttempts = 3;
+// Maximum time to wait for a network operation.
+const Seconds kMaxWait = Seconds(20);
+
+transport::BatonHandle makeBaton(OperationContext* opCtx) {
+ if (!AsyncRequestsSenderUseBaton.load()) {
+ return nullptr;
+ }
+
+ auto tl = opCtx->getServiceContext()->getTransportLayer();
+ if (!tl) {
+ return nullptr;
+ }
+
+ return tl->makeBaton(opCtx);
+}
+
} // namespace
AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
@@ -66,7 +83,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
Shard::RetryPolicy retryPolicy)
: _opCtx(opCtx),
_executor(executor),
- _baton(opCtx),
+ _baton(makeBaton(_opCtx)),
_db(dbName.toString()),
_readPreference(readPreference),
_retryPolicy(retryPolicy) {
@@ -82,6 +99,11 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
}
AsyncRequestsSender::~AsyncRequestsSender() {
+ ON_BLOCK_EXIT([&] {
+ if (_baton) {
+ _baton->detach();
+ }
+ });
_cancelPendingRequests();
// Wait on remaining callbacks to run.
@@ -276,8 +298,21 @@ void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) {
// If we're using a baton, we peek the queue, and block on the baton if it's empty
if (boost::optional<boost::optional<Job>> tryJob = _responseQueue.tryPop()) {
job = std::move(*tryJob);
+ } else if (opCtx) {
+ auto didWork = _baton->run(opCtx, opCtx->getDeadline());
+ if (!didWork) {
+ // If we resumed without doing work, then we hit the deadline which is also
+ // maxTimeMS. Thus we expect checkForInterrupt() to throw in normal operation. If it
+ // does not throw, then we are likely subject to the maxTimeNeverTimeOut fail point
+ // and we should wait without a deadline.
+ opCtx->checkForInterrupt();
+ uassert(ErrorCodes::ExceededTimeLimit,
+ "Experienced an unexpected timeout outside of testing",
+ getTestCommandsEnabled());
+ _baton->run(opCtx, boost::none);
+ }
} else {
- _baton->run(opCtx, boost::none);
+ _baton->run(nullptr, boost::none);
}
} else {
// Otherwise we block on the queue
@@ -331,14 +366,15 @@ Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
auto clock = ars->_opCtx->getServiceContext()->getFastClockSource();
- auto deadline = clock->now() + Seconds(20);
+ const auto maxDeadline = clock->now() + kMaxWait;
+ const auto deadline = std::min(ars->_opCtx->getDeadline(), maxDeadline);
auto targeter = shard->getTargeter();
auto findHostStatus = [&] {
// If we don't have a baton, just go ahead and block in targeting
if (!ars->_baton) {
- return targeter->findHostWithMaxWait(readPref, Seconds{20});
+ return targeter->findHostWithMaxWait(readPref, kMaxWait);
}
// If we do have a baton, and we can target quickly, just do that
@@ -386,17 +422,4 @@ std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() {
return Grid::get(getGlobalServiceContext())->shardRegistry()->getShardNoReload(shardId);
}
-AsyncRequestsSender::BatonDetacher::BatonDetacher(OperationContext* opCtx)
- : _baton(AsyncRequestsSenderUseBaton.load()
- ? (opCtx->getServiceContext()->getTransportLayer()
- ? opCtx->getServiceContext()->getTransportLayer()->makeBaton(opCtx)
- : nullptr)
- : nullptr) {}
-
-AsyncRequestsSender::BatonDetacher::~BatonDetacher() {
- if (_baton) {
- _baton->detach();
- }
-}
-
} // namespace mongo
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index abb6514bd7f..a44101d253f 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -81,7 +81,8 @@ namespace mongo {
* Does not throw exceptions.
*/
class AsyncRequestsSender {
- MONGO_DISALLOW_COPYING(AsyncRequestsSender);
+ AsyncRequestsSender(const AsyncRequestsSender&) = delete;
+ AsyncRequestsSender& operator=(const AsyncRequestsSender&) = delete;
public:
/**
@@ -215,38 +216,6 @@ private:
};
/**
- * We have to make sure to detach the baton if we throw in construction. We also need a baton
- * that lives longer than this type (because it can end up in callbacks that won't actually
- * modify it).
- *
- * TODO: work out actual lifetime semantics for a baton. For now, leaving this as a wort in ARS
- */
- class BatonDetacher {
- public:
- explicit BatonDetacher(OperationContext* opCtx);
- ~BatonDetacher();
-
- transport::Baton& operator*() const {
- return *_baton;
- }
-
- transport::Baton* operator->() const noexcept {
- return _baton.get();
- }
-
- operator transport::BatonHandle() const {
- return _baton;
- }
-
- explicit operator bool() const noexcept {
- return static_cast<bool>(_baton);
- }
-
- private:
- transport::BatonHandle _baton;
- };
-
- /**
* Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag.
*/
void _cancelPendingRequests();
@@ -292,7 +261,7 @@ private:
OperationContext* _opCtx;
executor::TaskExecutor* _executor;
- BatonDetacher _baton;
+ transport::BatonHandle _baton;
size_t _batonRequests = 0;
// The metadata obj to pass along with the command remote. Used to indicate that the command is
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index 588902151a5..82c12d79204 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -34,6 +34,8 @@
#include "mongo/s/query/establish_cursors.h"
+#include <set>
+
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/query/cursor_response.h"
@@ -49,105 +51,176 @@
namespace mongo {
-std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- const NamespaceString& nss,
- const ReadPreferenceSetting readPref,
- const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults) {
+namespace {
+
+/**
+ * This class wraps logic for establishing cursors using a MultiStatementTransactionRequestsSender.
+ */
+class CursorEstablisher {
+public:
+ CursorEstablisher(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ bool allowPartialResults)
+ : _opCtx(opCtx),
+ _executor{std::move(executor)},
+ _nss(nss),
+ _allowPartialResults(allowPartialResults) {}
+
+ /**
+ * Make a RequestSender and thus send requests.
+ */
+ void sendRequests(const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ Shard::RetryPolicy retryPolicy);
+
+ /**
+ * Wait for a single response via the RequestSender.
+ */
+ void waitForResponse() noexcept;
+
+ /**
+ * Wait for all responses via the RequestSender.
+ */
+ void waitForResponses() noexcept {
+ while (!_ars->done()) {
+ waitForResponse();
+ }
+ }
+
+ /**
+ * If any request recieved a non-retriable error response and partial results are not allowed,
+ * cancel any requests that may have succeeded and throw the first such error encountered.
+ */
+ void checkForFailedRequests();
+
+ /**
+ * Take all cursors currently tracked by the CursorEstablsher.
+ */
+ std::vector<RemoteCursor> takeCursors() {
+ return std::exchange(_remoteCursors, {});
+ };
+
+private:
+ void _handleFailure(const AsyncRequestsSender::Response& response, Status status) noexcept;
+
+ OperationContext* const _opCtx;
+ executor::TaskExecutor* const _executor;
+ const NamespaceString _nss;
+ const bool _allowPartialResults;
+
+ std::unique_ptr<AsyncRequestsSender> _ars;
+
+ boost::optional<Status> _maybeFailure;
+ std::vector<RemoteCursor> _remoteCursors;
+};
+
+void CursorEstablisher::sendRequests(const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ Shard::RetryPolicy retryPolicy) {
// Construct the requests
std::vector<AsyncRequestsSender::Request> requests;
for (const auto& remote : remotes) {
requests.emplace_back(remote.first, remote.second);
}
+ LOG(3) << "Establishing cursors on remotes {"
+ << "opId: " << _opCtx->getOpID() << ","
+ << "numRemotes: " << remotes.size() << "}";
+
// Send the requests
- AsyncRequestsSender ars(opCtx,
- executor,
- nss.db().toString(),
- std::move(requests),
- readPref,
- Shard::RetryPolicy::kIdempotent);
-
- std::vector<RemoteCursor> remoteCursors;
+ _ars = std::make_unique<AsyncRequestsSender>(
+ _opCtx, _executor, _nss.db().toString(), std::move(requests), readPref, retryPolicy);
+}
+
+void CursorEstablisher::waitForResponse() noexcept {
+ auto response = _ars->next();
try {
- // Get the responses
- while (!ars.done()) {
- try {
- auto response = ars.next();
- // Note the shardHostAndPort may not be populated if there was an error, so be sure
- // to do this after parsing the cursor response to ensure the response was ok.
- // Additionally, be careful not to push into 'remoteCursors' until we are sure we
- // have a valid cursor, since the error handling path will attempt to clean up
- // anything in 'remoteCursors'
- RemoteCursor cursor;
- cursor.setCursorResponse(CursorResponse::parseFromBSONThrowing(
- uassertStatusOK(std::move(response.swResponse)).data));
- cursor.setShardId(std::move(response.shardId));
- cursor.setHostAndPort(*response.shardHostAndPort);
- remoteCursors.push_back(std::move(cursor));
- } catch (const DBException& ex) {
- // Retriable errors are swallowed if 'allowPartialResults' is true. Targeting shard
- // replica sets can also throw FailedToSatisfyReadPreference, so we swallow it too.
- bool isEligibleException = (ErrorCodes::isRetriableError(ex.code()) ||
- ex.code() == ErrorCodes::FailedToSatisfyReadPreference);
- // Fail if the exception is something other than a retriable or read preference
- // error, or if the 'allowPartialResults' query parameter was not enabled.
- if (!allowPartialResults || !isEligibleException) {
- throw;
- }
- }
- }
- return remoteCursors;
- } catch (const DBException&) {
- // If one of the remotes had an error, we make a best effort to finish retrieving responses
- // for other requests that were already sent, so that we can send killCursors to any cursors
- // that we know were established.
- try {
- // Do not schedule any new requests.
- ars.stopRetrying();
-
- // Collect responses from all requests that were already sent.
- while (!ars.done()) {
- auto response = ars.next();
-
- // Check if the response contains an established cursor, and if so, store it.
- StatusWith<CursorResponse> swCursorResponse(
- response.swResponse.isOK()
- ? CursorResponse::parseFromBSON(response.swResponse.getValue().data)
- : response.swResponse.getStatus());
-
- if (swCursorResponse.isOK()) {
- RemoteCursor cursor;
- cursor.setShardId(std::move(response.shardId));
- cursor.setHostAndPort(*response.shardHostAndPort);
- cursor.setCursorResponse(std::move(swCursorResponse.getValue()));
- remoteCursors.push_back(std::move(cursor));
- }
- }
-
- // Schedule killCursors against all cursors that were established.
- for (const auto& remoteCursor : remoteCursors) {
- BSONObj cmdObj =
- KillCursorsRequest(nss, {remoteCursor.getCursorResponse().getCursorId()})
- .toBSON();
- executor::RemoteCommandRequest request(
- remoteCursor.getHostAndPort(), nss.db().toString(), cmdObj, opCtx);
-
- // We do not process the response to the killCursors request (we make a good-faith
- // attempt at cleaning up the cursors, but ignore any returned errors).
- executor
- ->scheduleRemoteCommand(
- request,
- [](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {})
- .status_with_transitional_ignore();
- }
- } catch (const DBException&) {
- // Ignore the new error and rethrow the original one.
+ // Note the shardHostAndPort may not be populated if there was an error, so be sure
+ // to do this after parsing the cursor response to ensure the response was ok.
+ // Additionally, be careful not to push into 'remoteCursors' until we are sure we
+ // have a valid cursor, since the error handling path will attempt to clean up
+ // anything in 'remoteCursors'
+ auto responseData = uassertStatusOK(std::move(response.swResponse)).data;
+ auto cursorResponse = CursorResponse::parseFromBSONThrowing(std::move(responseData));
+
+ RemoteCursor cursor;
+ cursor.setCursorResponse(std::move(cursorResponse));
+ cursor.setShardId(std::move(response.shardId));
+ cursor.setHostAndPort(*response.shardHostAndPort);
+ _remoteCursors.push_back(std::move(cursor));
+ } catch (const DBException& ex) {
+ _handleFailure(response, ex.toStatus());
+ }
+}
+
+void CursorEstablisher::checkForFailedRequests() {
+ if (!_maybeFailure) {
+ // If we saw no failures, there is nothing to do.
+ return;
+ }
+
+ LOG(0) << "Unable to establish remote cursors - {"
+ << "error: " << *_maybeFailure << ", "
+ << "numActiveRemotes: " << _remoteCursors.size() << "}";
+
+ // Schedule killCursors against all cursors that were established.
+ for (const auto& remoteCursor : _remoteCursors) {
+ BSONObj cmdObj =
+ KillCursorsRequest(_nss, {remoteCursor.getCursorResponse().getCursorId()}).toBSON();
+ executor::RemoteCommandRequest request(
+ remoteCursor.getHostAndPort(), _nss.db().toString(), cmdObj, _opCtx);
+
+ // We do not process the response to the killCursors request (we make a good-faith
+ // attempt at cleaning up the cursors, but ignore any returned errors).
+ auto swHandle = _executor->scheduleRemoteCommand(
+ request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {});
+ if (!swHandle.isOK()) {
+ LOG(3) << "Unable to cancel remote cursor - " << swHandle.getStatus();
}
+ }
+
- throw;
+ // Throw our failure.
+ uassertStatusOK(*_maybeFailure);
+}
+
+void CursorEstablisher::_handleFailure(const AsyncRequestsSender::Response& response,
+ Status status) noexcept {
+ LOG(3) << "Experienced a failure while establishing cursors - " << status;
+ if (_maybeFailure) {
+ // If we've already failed, just log and move on.
+ return;
}
+
+ // Retriable errors are swallowed if '_allowPartialResults' is true. Targeting shard replica
+ // sets can also throw FailedToSatisfyReadPreference, so we swallow it too.
+ bool isEligibleException = (ErrorCodes::isRetriableError(status.code()) ||
+ status.code() == ErrorCodes::FailedToSatisfyReadPreference);
+ if (_allowPartialResults && isEligibleException) {
+ // This exception is eligible to be swallowed.
+ return;
+ }
+
+ // Do not schedule any new requests.
+ _ars->stopRetrying();
+ _maybeFailure = std::move(status);
+}
+
+} // namespace
+
+std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults,
+ Shard::RetryPolicy retryPolicy) {
+ auto establisher = CursorEstablisher(opCtx, executor, nss, allowPartialResults);
+ establisher.sendRequests(readPref, remotes, retryPolicy);
+ establisher.waitForResponses();
+ establisher.checkForFailedRequests();
+ return establisher.takeCursors();
}
} // namespace mongo
diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h
index df73973f9fa..db5a21314db 100644
--- a/src/mongo/s/query/establish_cursors.h
+++ b/src/mongo/s/query/establish_cursors.h
@@ -38,6 +38,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/cursor_id.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/s/client/shard.h"
#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
@@ -68,6 +69,7 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
const NamespaceString& nss,
const ReadPreferenceSetting readPref,
const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults);
+ bool allowPartialResults,
+ Shard::RetryPolicy = Shard::RetryPolicy::kIdempotent);
} // namespace mongo
diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h
index 549e84b32a6..f1f96b7d4fb 100644
--- a/src/mongo/transport/baton.h
+++ b/src/mongo/transport/baton.h
@@ -84,7 +84,7 @@ public:
/**
* Executes a callback on the baton.
*/
- virtual void schedule(stdx::function<void()> func) = 0;
+ virtual bool schedule(stdx::function<void()> func) = 0;
/**
* Wakes the Baton up if it is currently blocked, or ensures that the next time it tries to
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index 0c64f4a8eb0..47f904b3030 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -56,6 +56,9 @@ namespace transport {
* We implement our networking reactor on top of poll + eventfd for wakeups
*/
class TransportLayerASIO::BatonASIO : public Baton {
+ static auto makeDetachedError() {
+ return Status(ErrorCodes::ShutdownInProgress, "Baton detached");
+ }
/**
* RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the
@@ -110,23 +113,64 @@ public:
}
void detach() override {
+ OperationContext* opCtx;
+
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_sessions.empty());
- invariant(_scheduled.empty());
- invariant(_timers.empty());
+ if (!_opCtx) {
+ return;
+ }
+
+ opCtx = std::exchange(_opCtx, nullptr);
}
{
- stdx::lock_guard<Client> lk(*_opCtx->getClient());
- invariant(_opCtx->getBaton().get() == this);
- _opCtx->setBaton(nullptr);
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ invariant(opCtx->getBaton().get() == this);
+ opCtx->setBaton(nullptr);
+ }
+
+ decltype(_scheduled) scheduled;
+ decltype(_sessions) sessions;
+ decltype(_timers) timers;
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ using std::swap;
+ swap(_scheduled, scheduled);
+ swap(_sessions, sessions);
+ swap(_timers, timers);
+ _timersById.clear();
+ }
+
+ for (auto& job : scheduled) {
+ try {
+ job();
+ job = {};
+ } catch (const DBException& ex) {
+ LOG(3) << "Job threw during detach: " << ex;
+ }
+ }
+
+ for (auto& session : sessions) {
+ session.second.promise.setError(makeDetachedError());
}
- _opCtx = nullptr;
+ for (auto& timer : timers) {
+ auto promise = std::move(timer.promise);
+ promise.setError(makeDetachedError());
+ }
}
Future<void> addSession(Session& session, Type type) override {
+ {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (!_opCtx) {
+ return makeDetachedError();
+ }
+ }
+
auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
auto pf = makePromiseFuture<void>();
@@ -142,6 +186,13 @@ public:
}
Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override {
+ {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (!_opCtx) {
+ return makeDetachedError();
+ }
+ }
+
auto pf = makePromiseFuture<void>();
_safeExecute([ timerPtr = &timer, expiration, sp = pf.promise.share(), this ] {
auto pair = _timers.insert({
@@ -191,14 +242,21 @@ public:
return true;
}
- void schedule(stdx::function<void()> func) override {
+ bool schedule(stdx::function<void()> func) override {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!_opCtx) {
+ // If we're already detached, people have to find somewhere else to run.
+ return false;
+ }
+
_scheduled.push_back(std::move(func));
if (_inPoll) {
_efd.notify();
}
+
+ return true;
}
void notify() noexcept override {
@@ -383,7 +441,7 @@ private:
*/
template <typename Callback>
void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
- if (_inPoll) {
+ if (_inPoll && _opCtx) {
_scheduled.push_back([cb, this] {
stdx::lock_guard<stdx::mutex> lk(_mutex);
cb();