summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/network_interface.h15
-rw-r--r--src/mongo/executor/network_interface_mock.cpp9
-rw-r--r--src/mongo/executor/network_interface_mock.h8
-rw-r--r--src/mongo/executor/network_interface_tl.cpp90
-rw-r--r--src/mongo/executor/network_interface_tl.h10
-rw-r--r--src/mongo/executor/task_executor.h2
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp54
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h11
8 files changed, 97 insertions, 102 deletions
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 62012162bf6..34685392f1e 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -144,12 +144,11 @@ public:
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
- const transport::BatonHandle& baton = nullptr) = 0;
+ const BatonHandle& baton = nullptr) = 0;
- Future<TaskExecutor::ResponseStatus> startCommand(
- const TaskExecutor::CallbackHandle& cbHandle,
- RemoteCommandRequest& request,
- const transport::BatonHandle& baton = nullptr) {
+ Future<TaskExecutor::ResponseStatus> startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest& request,
+ const BatonHandle& baton = nullptr) {
auto pf = makePromiseFuture<TaskExecutor::ResponseStatus>();
auto status = startCommand(
@@ -171,7 +170,7 @@ public:
* completed.
*/
virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const transport::BatonHandle& baton = nullptr) = 0;
+ const BatonHandle& baton = nullptr) = 0;
/**
* Sets an alarm, which schedules "action" to run no sooner than "when".
@@ -187,9 +186,7 @@ public:
* Any callbacks invoked from setAlarm must observe onNetworkThread to
* return true. See that method for why.
*/
- virtual Status setAlarm(Date_t when,
- unique_function<void()> action,
- const transport::BatonHandle& baton = nullptr) = 0;
+ virtual Status setAlarm(Date_t when, unique_function<void()> action) = 0;
/**
* Returns true if called from a thread dedicated to networking. I.e. not a
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index f700f6c211e..641f1095530 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -108,7 +108,7 @@ std::string NetworkInterfaceMock::getHostName() {
Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
}
@@ -140,8 +140,7 @@ void NetworkInterfaceMock::setHandshakeReplyForHost(
}
}
-void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle,
- const transport::BatonHandle& baton) {
+void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, const BatonHandle& baton) {
invariant(!inShutdown());
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -171,9 +170,7 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock(
}
}
-Status NetworkInterfaceMock::setAlarm(const Date_t when,
- unique_function<void()> action,
- const transport::BatonHandle& baton) {
+Status NetworkInterfaceMock::setAlarm(const Date_t when, unique_function<void()> action) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
}
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 937bed41525..c0f1693a660 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -114,7 +114,7 @@ public:
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
- const transport::BatonHandle& baton = nullptr) override;
+ const BatonHandle& baton = nullptr) override;
/**
* If the network operation is in the _unscheduled or _processing queues, moves the operation
@@ -123,14 +123,12 @@ public:
* called after the task has already completed, but its callback has not yet been run.
*/
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const transport::BatonHandle& baton = nullptr) override;
+ const BatonHandle& baton = nullptr) override;
/**
* Not implemented.
*/
- Status setAlarm(Date_t when,
- unique_function<void()> action,
- const transport::BatonHandle& baton = nullptr) override;
+ Status setAlarm(Date_t when, unique_function<void()> action) override;
bool onNetworkThread() override;
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 512b3fee58b..5ca24991e8c 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -173,7 +173,7 @@ Date_t NetworkInterfaceTL::now() {
Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
@@ -280,14 +280,19 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
if (baton) {
// If we have a baton, we want to get back to the baton thread immediately after we get a
// connection
- std::move(connFuture).getAsync([
- baton,
- rw = std::move(remainingWork)
- ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
- baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable {
- std::move(rw)(std::move(swConn));
+ std::move(connFuture)
+ .getAsync([ baton, reactor = _reactor.get(), rw = std::move(remainingWork) ](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ baton->schedule([ rw = std::move(rw),
+ swConn = std::move(swConn) ](OperationContext * opCtx) mutable {
+ if (opCtx) {
+ std::move(rw)(std::move(swConn));
+ } else {
+ std::move(rw)(Status(ErrorCodes::ShutdownInProgress,
+ "baton is detached, failing operation"));
+ }
+ });
});
- });
} else {
// otherwise we're happy to run inline
std::move(connFuture)
@@ -306,7 +311,7 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
std::shared_ptr<CommandState> state,
Future<RemoteCommandResponse> future,
CommandState::ConnHandle conn,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) {
conn->indicateSuccess();
return future;
@@ -416,7 +421,7 @@ void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbH
}
void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
auto it = _inProgress.find(cbHandle);
if (it == _inProgress.end()) {
@@ -445,19 +450,13 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
}
}
-Status NetworkInterfaceTL::setAlarm(Date_t when,
- unique_function<void()> action,
- const transport::BatonHandle& baton) {
+Status NetworkInterfaceTL::setAlarm(Date_t when, unique_function<void()> action) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
if (when <= now()) {
- if (baton) {
- baton->schedule(std::move(action));
- } else {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
- }
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
return Status::OK();
}
@@ -469,39 +468,34 @@ Status NetworkInterfaceTL::setAlarm(Date_t when,
_inProgressAlarms.insert(alarmTimer);
}
- alarmTimer->waitUntil(when, baton)
- .getAsync(
- [ this, weakTimer, action = std::move(action), when, baton ](Status status) mutable {
- auto alarmTimer = weakTimer.lock();
- if (!alarmTimer) {
- return;
- } else {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _inProgressAlarms.erase(alarmTimer);
- }
-
- auto nowVal = now();
- if (nowVal < when) {
- warning() << "Alarm returned early. Expected at: " << when
- << ", fired at: " << nowVal;
- const auto status = setAlarm(when, std::move(action), baton);
- if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
- fassertFailedWithStatus(50785, status);
- }
+ alarmTimer->waitUntil(when, nullptr)
+ .getAsync([ this, weakTimer, action = std::move(action), when ](Status status) mutable {
+ auto alarmTimer = weakTimer.lock();
+ if (!alarmTimer) {
+ return;
+ } else {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ _inProgressAlarms.erase(alarmTimer);
+ }
- return;
+ auto nowVal = now();
+ if (nowVal < when) {
+ warning() << "Alarm returned early. Expected at: " << when
+ << ", fired at: " << nowVal;
+ const auto status = setAlarm(when, std::move(action));
+ if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
+ fassertFailedWithStatus(50785, status);
}
- if (status.isOK()) {
- if (baton) {
- baton->schedule(std::move(action));
- } else {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
- }
- } else if (status != ErrorCodes::CallbackCanceled) {
- warning() << "setAlarm() received an error: " << status;
- }
- });
+ return;
+ }
+
+ if (status.isOK()) {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ } else if (status != ErrorCodes::CallbackCanceled) {
+ warning() << "setAlarm() received an error: " << status;
+ }
+ });
return Status::OK();
}
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 0b0c99cd03e..5cd5dd6115b 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -68,13 +68,11 @@ public:
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
- const transport::BatonHandle& baton) override;
+ const BatonHandle& baton) override;
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const transport::BatonHandle& baton) override;
- Status setAlarm(Date_t when,
- unique_function<void()> action,
- const transport::BatonHandle& baton) override;
+ const BatonHandle& baton) override;
+ Status setAlarm(Date_t when, unique_function<void()> action) override;
bool onNetworkThread() override;
@@ -117,7 +115,7 @@ private:
Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state,
Future<RemoteCommandResponse> future,
CommandState::ConnHandle conn,
- const transport::BatonHandle& baton);
+ const BatonHandle& baton);
std::string _instanceName;
ServiceContext* _svcCtx;
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 1e3051e17b4..37c2669f306 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -239,7 +239,7 @@ public:
virtual StatusWith<CallbackHandle> scheduleRemoteCommand(
const RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) = 0;
+ const BatonHandle& baton = nullptr) = 0;
/**
* If the callback referenced by "cbHandle" hasn't already executed, marks it as
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 1af20aa8692..4800de47db1 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -67,14 +67,14 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState
public:
static std::shared_ptr<CallbackState> make(CallbackFn&& cb,
Date_t readyDate,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
return std::make_shared<CallbackState>(std::move(cb), readyDate, baton);
}
/**
* Do not call directly. Use make.
*/
- CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton)
+ CallbackState(CallbackFn&& cb, Date_t theReadyDate, const BatonHandle& baton)
: callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {}
virtual ~CallbackState() = default;
@@ -102,7 +102,7 @@ public:
bool isNetworkOperation = false;
AtomicWord<bool> isFinished{false};
boost::optional<stdx::condition_variable> finishedCondition;
- transport::BatonHandle baton;
+ BatonHandle baton;
};
class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState {
@@ -350,21 +350,23 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
return cbHandle;
}
lk.unlock();
- _net->setAlarm(when,
- [this, cbHandle] {
- auto cbState =
- checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue()));
- if (cbState->canceled.load()) {
- return;
- }
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (cbState->canceled.load()) {
- return;
- }
- scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk));
- },
- nullptr)
- .transitional_ignore();
+
+ auto status = _net->setAlarm(when, [this, cbHandle] {
+ auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue()));
+ if (cbState->canceled.load()) {
+ return;
+ }
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (cbState->canceled.load()) {
+ return;
+ }
+ scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk));
+ });
+
+ if (!status.isOK()) {
+ cancel(cbHandle.getValue());
+ return status;
+ }
return cbHandle;
}
@@ -406,7 +408,7 @@ const auto initialSyncPauseCmds =
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand(
const RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
if (MONGO_FAIL_POINT(initialSyncFuzzerSynchronizationPoint1)) {
// We are only going to pause on these failpoints if the command issued is for the
@@ -537,7 +539,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallback
}
ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(
- CallbackFn work, const transport::BatonHandle& baton, Date_t when) {
+ CallbackFn work, const BatonHandle& baton, Date_t when) {
WorkQueue result;
result.emplace_front(CallbackState::make(std::move(work), when, baton));
result.front()->iter = result.begin();
@@ -592,7 +594,17 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
for (const auto& cbState : todo) {
if (cbState->baton) {
- cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ cbState->baton->schedule([this, cbState](OperationContext* opCtx) {
+ if (opCtx) {
+ runCallback(std::move(cbState));
+ return;
+ }
+
+ cbState->canceled.store(1);
+ const auto status =
+ _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress);
+ });
} else {
const auto status =
_pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 1558eaf7f01..1832a021d2c 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -74,7 +74,7 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- void appendDiagnosticBSON(BSONObjBuilder* b) const;
+ void appendDiagnosticBSON(BSONObjBuilder* b) const override;
Date_t now() override;
StatusWith<EventHandle> makeEvent() override;
void signalEvent(const EventHandle& event) override;
@@ -85,10 +85,9 @@ public:
void waitForEvent(const EventHandle& event) override;
StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
- StatusWith<CallbackHandle> scheduleRemoteCommand(
- const RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override;
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override;
void cancel(const CallbackHandle& cbHandle) override;
void wait(const CallbackHandle& cbHandle,
Interruptible* interruptible = Interruptible::notInterruptible()) override;
@@ -138,7 +137,7 @@ private:
* called outside of _mutex.
*/
static WorkQueue makeSingletonWorkQueue(CallbackFn work,
- const transport::BatonHandle& baton,
+ const BatonHandle& baton,
Date_t when = {});
/**