summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-11-23 13:50:36 -0500
committerJudah Schvimer <judah@mongodb.com>2016-12-22 13:54:16 -0500
commita89e9c71a7ff29a652afbc837cc3e26d80fc17a0 (patch)
tree182a83bd4fac5ddfc59504b6f5c3c6083876066d /src/mongo/executor
parent05c812f26b9652077bc0f7d5a12048cc5357dc3f (diff)
downloadmongo-a89e9c71a7ff29a652afbc837cc3e26d80fc17a0.tar.gz
SERVER-27052 added asynchronous operation support to DataReplicator
(cherry picked from commit 2068c42aa2179902d4a96941fcfc7cd577a4c2a9) (cherry picked from commit 5313489266e75c5d40d1b7aae382d3e3fc8997a0) (cherry picked from commit 06da357873b3500f39832dee914c06b1968d05ca) (cherry picked from commit 19a7178e9b457d75b0431f384b85d69d7309a563) (cherry picked from commit a73e65ba511d047431b85138b010818800b7de04) (cherry picked from commit 2113ca6c7a18f109aad1e11200e8ab034bcd78fa) (cherry picked from commit bed3e78f3c511752d894f9449b93f12992a9eb3c) (cherry picked from commit a8ce641a86cff734d3e3756dbfd1beb892f639ba) (cherry picked from commit b1a5aefc4467c5cff6e7bcf3f4aef3cb15f3ef05) (cherry picked from commit e30e39ce1a4a55c46db13ad85f6c1000297ea6ff) (cherry picked from commit 39d11435c22a8c6ce2c489221102605a9185e815) (cherry picked from commit 1f9513ef67551db6ea93b8b9e2f40604167f952b)
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/task_executor_test_fixture.cpp5
-rw-r--r--src/mongo/executor/task_executor_test_fixture.h7
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp11
-rw-r--r--src/mongo/executor/thread_pool_task_executor_test.cpp46
4 files changed, 63 insertions, 6 deletions
diff --git a/src/mongo/executor/task_executor_test_fixture.cpp b/src/mongo/executor/task_executor_test_fixture.cpp
index acbb21cbeed..bc99e1538fa 100644
--- a/src/mongo/executor/task_executor_test_fixture.cpp
+++ b/src/mongo/executor/task_executor_test_fixture.cpp
@@ -43,8 +43,8 @@ Status TaskExecutorTest::getDetectableErrorStatus() {
return Status(ErrorCodes::InternalError, "Not mutated");
}
-void TaskExecutorTest::assertRemoteCommandNameEquals(StringData cmdName,
- const RemoteCommandRequest& request) {
+RemoteCommandRequest TaskExecutorTest::assertRemoteCommandNameEquals(
+ StringData cmdName, const RemoteCommandRequest& request) {
auto&& cmdObj = request.cmdObj;
ASSERT_FALSE(cmdObj.isEmpty());
if (cmdName != cmdObj.firstElementFieldName()) {
@@ -53,6 +53,7 @@ void TaskExecutorTest::assertRemoteCommandNameEquals(StringData cmdName,
<< cmdObj.firstElementFieldName() << "\" instead: " << request.toString();
FAIL(msg);
}
+ return request;
}
TaskExecutorTest::~TaskExecutorTest() = default;
diff --git a/src/mongo/executor/task_executor_test_fixture.h b/src/mongo/executor/task_executor_test_fixture.h
index 0ba3a6d6604..d89b6e0062d 100644
--- a/src/mongo/executor/task_executor_test_fixture.h
+++ b/src/mongo/executor/task_executor_test_fixture.h
@@ -53,10 +53,11 @@ public:
static Status getDetectableErrorStatus();
/**
- * Validates command name in remote command request.
+ * Validates command name in remote command request. Returns the remote command request from
+ * the network interface for further validation if the command name matches.
*/
- static void assertRemoteCommandNameEquals(StringData cmdName,
- const RemoteCommandRequest& request);
+ static RemoteCommandRequest assertRemoteCommandNameEquals(StringData cmdName,
+ const RemoteCommandRequest& request);
protected:
virtual ~TaskExecutorTest();
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index efd321e1d7d..7916c316a82 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -34,6 +34,7 @@
#include <boost/optional.hpp>
#include <iterator>
+#include <utility>
#include "mongo/base/checked_cast.h"
#include "mongo/base/disallow_copying.h"
@@ -505,7 +506,15 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA
? Status({ErrorCodes::CallbackCanceled, "Callback canceled"})
: Status::OK());
invariant(!cbStateArg->isFinished.load());
- cbStateArg->callback(std::move(args));
+ {
+ // After running callback function, clear 'cbStateArg->callback' to release any resources
+ // that might be held by this function object.
+ // Swap 'cbStateArg->callback' with temporary copy before running callback for exception
+ // safety.
+ TaskExecutor::CallbackFn callback;
+ std::swap(cbStateArg->callback, callback);
+ callback(std::move(args));
+ }
cbStateArg->isFinished.store(true);
stdx::lock_guard<stdx::mutex> lk(_mutex);
_poolInProgressQueue.erase(cbStateArg->iter);
diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp
index 887cfa59d45..350c475ed01 100644
--- a/src/mongo/executor/thread_pool_task_executor_test.cpp
+++ b/src/mongo/executor/thread_pool_task_executor_test.cpp
@@ -79,6 +79,52 @@ TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) {
joinExecutorThread();
}
+bool sharedCallbackStateDestroyed = false;
+class SharedCallbackState {
+ MONGO_DISALLOW_COPYING(SharedCallbackState);
+
+public:
+ SharedCallbackState() {}
+ ~SharedCallbackState() {
+ sharedCallbackStateDestroyed = true;
+ }
+};
+
+TEST_F(ThreadPoolExecutorTest,
+ ExecutorResetsCallbackFunctionInCallbackStateUponReturnFromCallbackFunction) {
+ auto net = getNet();
+ auto& executor = getExecutor();
+ launchExecutorThread();
+
+ auto sharedCallbackData = std::make_shared<SharedCallbackState>();
+ auto callbackInvoked = false;
+
+ const auto when = net->now() + Milliseconds(5000);
+ const auto cb1 = unittest::assertGet(executor.scheduleWorkAt(
+ when, [&callbackInvoked, sharedCallbackData](const executor::TaskExecutor::CallbackArgs&) {
+ callbackInvoked = true;
+ }));
+
+ sharedCallbackData.reset();
+ ASSERT_FALSE(sharedCallbackStateDestroyed);
+
+ net->enterNetwork();
+ ASSERT_EQUALS(when, net->runUntil(when));
+ net->exitNetwork();
+
+ executor.wait(cb1);
+
+ // Task executor should reset CallbackState::callback after running callback function.
+ // This ensures that we release resources associated with 'CallbackState::callback' without
+ // having to destroy every outstanding callback handle (which contains a shared pointer
+ // to ThreadPoolTaskExecutor::CallbackState).
+ ASSERT_TRUE(callbackInvoked);
+ ASSERT_TRUE(sharedCallbackStateDestroyed);
+
+ executor.shutdown();
+ joinExecutorThread();
+}
+
TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) {
// This is a regression test for SERVER-23686. It works by scheduling a work item in the
// ThreadPoolTaskExecutor that blocks waiting to be signaled by this thread. Once that work item