summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2020-07-27 16:10:30 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-31 21:23:21 +0000
commitcf05a11f71768a4ae692d1ff83a5e016d29cec1c (patch)
treee14f5a1ef8faa43a7a8d0e88bddc99916dab208a
parent0e0614dfac5e0ba89e355a5822abfee9bd6766a7 (diff)
downloadmongo-cf05a11f71768a4ae692d1ff83a5e016d29cec1c.tar.gz
SERVER-49239 Move ownership of TaskExecutor up to PrimaryOnlyService, out of the Instances
-rw-r--r--src/mongo/db/repl/SConscript7
-rw-r--r--src/mongo/db/repl/primary_only_service.cpp78
-rw-r--r--src/mongo/db/repl/primary_only_service.h38
-rw-r--r--src/mongo/db/repl/primary_only_service_test.cpp43
4 files changed, 82 insertions, 84 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 6dcdb4801ee..e06386f1469 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1590,7 +1590,14 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/dbdirectclient',
+ '$BUILD_DIR/mongo/db/logical_time_metadata_hook',
+ '$BUILD_DIR/mongo/executor/connection_pool_executor',
+ '$BUILD_DIR/mongo/executor/network_interface',
+ '$BUILD_DIR/mongo/executor/network_interface_factory',
+ '$BUILD_DIR/mongo/executor/network_interface_thread_pool',
+ '$BUILD_DIR/mongo/executor/network_interface_tl',
'$BUILD_DIR/mongo/executor/scoped_task_executor',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
'$BUILD_DIR/mongo/s/write_ops/batch_write_types',
'wait_for_majority_service',
],
diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp
index 9351ad28fdf..eaeb1b32e02 100644
--- a/src/mongo/db/repl/primary_only_service.cpp
+++ b/src/mongo/db/repl/primary_only_service.cpp
@@ -33,15 +33,23 @@
#include "mongo/db/repl/primary_only_service.h"
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replica_set_aware_service.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/service_context.h"
+#include "mongo/executor/network_connection_hook.h"
+#include "mongo/executor/network_interface.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
+#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/s/write_ops/batched_command_response.h"
+#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
namespace repl {
@@ -55,6 +63,9 @@ const auto _registryRegisterer =
ReplicaSetAwareServiceRegistry::Registerer<PrimaryOnlyServiceRegistry>(
"PrimaryOnlyServiceRegistry");
+const Status kExecutorShutdownStatus(ErrorCodes::InterruptedDueToReplStateChange,
+ "PrimaryOnlyService executor shut down due to stepDown");
+
// Throws on error.
void insertDocument(OperationContext* opCtx,
const NamespaceString& collectionName,
@@ -96,6 +107,12 @@ PrimaryOnlyService* PrimaryOnlyServiceRegistry::lookupService(StringData service
return servicePtr;
}
+void PrimaryOnlyServiceRegistry::onStartup(OperationContext* opCtx) {
+ for (auto& service : _services) {
+ service.second->startup(opCtx);
+ }
+}
+
void PrimaryOnlyServiceRegistry::onStepUpComplete(OperationContext*, long long term) {
for (auto& service : _services) {
service.second->onStepUp(term);
@@ -117,8 +134,26 @@ void PrimaryOnlyServiceRegistry::shutdown() {
PrimaryOnlyService::PrimaryOnlyService(ServiceContext* serviceContext)
: _serviceContext(serviceContext) {}
+void PrimaryOnlyService::startup(OperationContext* opCtx) {
+ ThreadPool::Options threadPoolOptions;
+ threadPoolOptions.threadNamePrefix = getServiceName() + "-";
+ threadPoolOptions.poolName = getServiceName() + "ThreadPool";
+ threadPoolOptions.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ AuthorizationSession::get(cc())->grantInternalAuthorization(&cc());
+ };
+
+ auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
+ hookList->addHook(std::make_unique<rpc::LogicalTimeMetadataHook>(opCtx->getServiceContext()));
+ _executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPool>(threadPoolOptions),
+ executor::makeNetworkInterface(getServiceName() + "Network", nullptr, std::move(hookList)));
+ _executor->startup();
+}
+
void PrimaryOnlyService::onStepUp(long long term) {
- auto executor2 = getTaskExecutor();
+ auto newThenOldScopedExecutor =
+ std::make_shared<executor::ScopedTaskExecutor>(_executor, kExecutorShutdownStatus);
{
stdx::lock_guard lk(_mutex);
@@ -128,46 +163,45 @@ void PrimaryOnlyService::onStepUp(long long term) {
_term = term;
_state = State::kRunning;
- // Install a new executor, while moving the old one into 'executor2' so it can be accessed
- // outside of _mutex.
- _executor.swap(executor2);
+ // Install a new executor, while moving the old one into 'newThenOldScopedExecutor' so it
+ // can be accessed outside of _mutex.
+ _scopedExecutor.swap(newThenOldScopedExecutor);
}
- // Ensure that all tasks from the previous term have completed.
- if (executor2) {
- (*executor2)->join();
+ // Ensure that all tasks from the previous term have completed before allowing tasks to be
+ // scheduled on the new executor.
+ if (newThenOldScopedExecutor) {
+ (*newThenOldScopedExecutor)->join();
}
}
void PrimaryOnlyService::onStepDown() {
stdx::lock_guard lk(_mutex);
- if (_executor) {
- (*_executor)->shutdown();
+ if (_scopedExecutor) {
+ (*_scopedExecutor)->shutdown();
}
_state = State::kPaused;
_instances.clear();
}
void PrimaryOnlyService::shutdown() {
- {
- std::unique_ptr<executor::ScopedTaskExecutor> savedExecutor;
- {
- stdx::lock_guard lk(_mutex);
+ std::shared_ptr<executor::TaskExecutor> savedExecutor;
- _executor.swap(savedExecutor);
- _state = State::kShutdown;
- _instances.clear();
- }
+ {
+ stdx::lock_guard lk(_mutex);
- if (savedExecutor) {
- (*savedExecutor)->shutdown();
- (*savedExecutor)->join();
- }
+ _executor.swap(savedExecutor);
+ _scopedExecutor.reset();
+ _state = State::kShutdown;
+ _instances.clear();
}
- shutdownImpl();
+ if (savedExecutor) {
+ savedExecutor->shutdown();
+ savedExecutor->join();
+ }
}
std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::getOrCreateInstance(
diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h
index 7c96c61a604..3cd16b8897a 100644
--- a/src/mongo/db/repl/primary_only_service.h
+++ b/src/mongo/db/repl/primary_only_service.h
@@ -175,14 +175,13 @@ public:
virtual NamespaceString getStateDocumentsNS() const = 0;
/**
- * Virtual shutdown method where derived services can put their specific shutdown logic.
+ * Constructs and starts up _executor.
*/
- virtual void shutdownImpl() = 0;
+ void startup(OperationContext* opCtx);
/**
- * Releases all running Instances, then shuts down and joins _executor, ensuring that there are
- * no remaining tasks running.
- * Ends by calling shutdownImpl so that derived services can clean up their local state.
+ * Releases all running Instances, then shuts down and joins _executor, ensuring that
+ * there are no remaining tasks running.
*/
void shutdown();
@@ -203,11 +202,6 @@ public:
protected:
/**
- * Returns a ScopedTaskExecutor used to run instances of this service.
- */
- virtual std::unique_ptr<executor::ScopedTaskExecutor> getTaskExecutor() = 0;
-
- /**
* Constructs a new Instance object with the given initial state.
*/
virtual std::shared_ptr<Instance> constructInstance(const BSONObj& initialState) const = 0;
@@ -233,16 +227,18 @@ private:
Mutex _mutex = MONGO_MAKE_LATCH("PrimaryOnlyService::_mutex");
- // A ScopedTaskExecutor that is used to schedule calls to runOnce against Instance objects.
- // PrimaryOnlyService implementations are responsible for creating a TaskExecutor configured
- // with the desired options. The size of the thread pool within the TaskExecutor limits the
- // number of Instances of this PrimaryOnlyService that can be actively running on a thread
- // simultaneously (though it does not limit the number of Instance objects that can
- // simultaneously exist).
- // This ScopedTaskExecutor wraps the TaskExecutor owned by the PrimaryOnlyService
- // implementation, and is created at stepUp and destroyed at stepDown so that all outstanding
- // tasks get interrupted.
- std::unique_ptr<executor::ScopedTaskExecutor> _executor;
+ // A ScopedTaskExecutor that is used to perform all work run on behalf of an Instance.
+ // This ScopedTaskExecutor wraps _executor and is created at stepUp and destroyed at
+ // stepDown so that all outstanding tasks get interrupted.
+ std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor;
+
+ // The concrete TaskExecutor backing _scopedExecutor. While _scopedExecutor is created and
+ // destroyed with each stepUp/stepDown, _executor persists for the lifetime of the process. We
+ // want _executor to survive failover to prevent us from having to reallocate lots of
+ // thread and connection resources on every stepUp. Service instances should never have access
+ // to _executor directly, they should only ever use _scopedExecutor so that they get the
+ // guarantee that all outstanding tasks are interrupted at stepDown.
+ std::shared_ptr<executor::TaskExecutor> _executor;
enum class State {
kRunning,
@@ -291,7 +287,7 @@ public:
*/
void shutdown();
- void onStartup(OperationContext*) final {}
+ void onStartup(OperationContext*) final;
void onStepUpBegin(OperationContext*, long long term) final {}
void onBecomeArbiter() final {}
void onStepUpComplete(OperationContext*, long long term) final;
diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp
index 2989f6e6980..4926cb56256 100644
--- a/src/mongo/db/repl/primary_only_service_test.cpp
+++ b/src/mongo/db/repl/primary_only_service_test.cpp
@@ -29,10 +29,9 @@
#include <memory>
-#include "mongo/db/auth/authorization_session.h"
+
#include "mongo/db/client.h"
#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/op_observer_impl.h"
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/repl/oplog.h"
@@ -40,14 +39,8 @@
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/service_context_d_test_fixture.h"
-#include "mongo/executor/network_connection_hook.h"
-#include "mongo/executor/network_interface.h"
-#include "mongo/executor/network_interface_factory.h"
-#include "mongo/executor/thread_pool_task_executor.h"
-#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
-#include "mongo/util/concurrency/thread_pool.h"
using namespace mongo;
using namespace mongo::repl;
@@ -56,28 +49,9 @@ constexpr StringData kTestServiceName = "TestService"_sd;
class TestService final : public PrimaryOnlyService {
public:
- explicit TestService(ServiceContext* serviceContext)
- : PrimaryOnlyService(serviceContext), _executor(makeTaskExecutor(serviceContext)) {
- _executor->startup();
- }
+ explicit TestService(ServiceContext* serviceContext) : PrimaryOnlyService(serviceContext) {}
~TestService() = default;
- std::shared_ptr<executor::TaskExecutor> makeTaskExecutor(ServiceContext* serviceContext) {
- ThreadPool::Options threadPoolOptions;
- threadPoolOptions.threadNamePrefix = getServiceName() + "-";
- threadPoolOptions.poolName = getServiceName() + "ThreadPool";
- threadPoolOptions.onCreateThread = [](const std::string& threadName) {
- Client::initThread(threadName.c_str());
- AuthorizationSession::get(cc())->grantInternalAuthorization(&cc());
- };
-
- auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
- hookList->addHook(std::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext));
- return std::make_shared<executor::ThreadPoolTaskExecutor>(
- std::make_unique<ThreadPool>(threadPoolOptions),
- executor::makeNetworkInterface("TestServiceNetwork", nullptr, std::move(hookList)));
- }
-
StringData getServiceName() const override {
return kTestServiceName;
}
@@ -91,16 +65,6 @@ public:
return std::make_shared<TestService::Instance>(initialState);
}
- std::unique_ptr<executor::ScopedTaskExecutor> getTaskExecutor() override {
- return std::make_unique<executor::ScopedTaskExecutor>(_executor);
- }
-
- void shutdownImpl() override {
- _executor->shutdown();
- _executor->join();
- _executor.reset();
- }
-
class Instance final : public PrimaryOnlyService::TypedInstance<Instance> {
public:
Instance(const BSONObj& state)
@@ -117,9 +81,6 @@ public:
private:
BSONObj _state;
};
-
-private:
- std::shared_ptr<executor::TaskExecutor> _executor;
};
class PrimaryOnlyServiceTest : public ServiceContextMongoDTest {