summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-01-15 17:51:49 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-01-16 09:10:43 -0500
commit75f8c26ace392f77aa35b87faeb2933f8ce6e6ad (patch)
treedf255ce69e09b80ceedb241a2ec89450d80acc6c
parent692be593357338e488623dbd2d4ecabff4ab323c (diff)
downloadmongo-75f8c26ace392f77aa35b87faeb2933f8ce6e6ad.tar.gz
SERVER-39022 Make the sharding fixed task executor use unlimited thread pool size
-rw-r--r--jstests/sharding/shard_aware_primary_failover.js5
-rw-r--r--src/mongo/executor/network_interface.h3
-rw-r--r--src/mongo/executor/network_interface_mock.cpp34
-rw-r--r--src/mongo/executor/network_interface_mock.h79
-rw-r--r--src/mongo/s/query/results_merger_test_fixture.h2
-rw-r--r--src/mongo/s/sharding_initialization.cpp18
6 files changed, 69 insertions, 72 deletions
diff --git a/jstests/sharding/shard_aware_primary_failover.js b/jstests/sharding/shard_aware_primary_failover.js
index 91b16d5686d..abbfb47c1cf 100644
--- a/jstests/sharding/shard_aware_primary_failover.js
+++ b/jstests/sharding/shard_aware_primary_failover.js
@@ -1,7 +1,6 @@
/**
* Test that a new primary that gets elected will properly perform shard initialization.
*/
-
(function() {
"use strict";
@@ -9,6 +8,7 @@
var replTest = new ReplSetTest({nodes: 3});
replTest.startSet({shardsvr: ''});
+
var nodes = replTest.nodeList();
replTest.initiate({
_id: replTest.name,
@@ -41,7 +41,7 @@
shardIdentityQuery, shardIdentityUpdate, {upsert: true, writeConcern: {w: 'majority'}}));
replTest.stopMaster();
- replTest.waitForMaster();
+ replTest.waitForMaster(30000);
primaryConn = replTest.getPrimary();
@@ -55,5 +55,4 @@
replTest.stopSet();
st.stop();
-
})();
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index c55a7925bdc..62012162bf6 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -42,9 +42,6 @@
#include "mongo/util/future.h"
namespace mongo {
-
-class BSONObjBuilder;
-
namespace executor {
MONGO_FAIL_POINT_DECLARE(networkInterfaceDiscardCommandsBeforeAcquireConn);
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index c7c5178761a..ed9d67ea536 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -67,34 +67,17 @@ NetworkInterfaceMock::~NetworkInterfaceMock() {
void NetworkInterfaceMock::logQueues() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _logQueues_inlock();
-}
-
-std::string NetworkInterfaceMock::getDiagnosticString() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- return _getDiagnosticString_inlock();
-}
-
-std::string NetworkInterfaceMock::_getDiagnosticString_inlock() const {
- return str::stream() << "NetworkInterfaceMock -- waitingToRunMask:" << _waitingToRunMask
- << ", now:" << _now_inlock().toString() << ", hasStarted:" << _hasStarted
- << ", inShutdown: " << _inShutdown.load()
- << ", processing: " << _processing.size()
- << ", scheduled: " << _scheduled.size()
- << ", blackHoled: " << _blackHoled.size()
- << ", unscheduled: " << _unscheduled.size();
-}
-
-void NetworkInterfaceMock::_logQueues_inlock() const {
- std::vector<std::pair<std::string, const NetworkOperationList*>> queues{
+ const std::vector<std::pair<std::string, const NetworkOperationList*>> queues{
{"unscheduled", &_unscheduled},
{"scheduled", &_scheduled},
{"processing", &_processing},
{"blackholes", &_blackHoled}};
+
for (auto&& queue : queues) {
if (queue.second->empty()) {
continue;
}
+
log() << "**** queue: " << queue.first << " ****";
for (auto&& item : *queue.second) {
log() << "\t\t " << item.getDiagnosticString();
@@ -102,7 +85,16 @@ void NetworkInterfaceMock::_logQueues_inlock() const {
}
}
-void NetworkInterfaceMock::appendConnectionStats(ConnectionPoolStats* stats) const {}
+std::string NetworkInterfaceMock::getDiagnosticString() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return str::stream() << "NetworkInterfaceMock -- waitingToRunMask:" << _waitingToRunMask
+ << ", now:" << _now_inlock().toString() << ", hasStarted:" << _hasStarted
+ << ", inShutdown: " << _inShutdown.load()
+ << ", processing: " << _processing.size()
+ << ", scheduled: " << _scheduled.size()
+ << ", blackHoled: " << _blackHoled.size()
+ << ", unscheduled: " << _unscheduled.size();
+}
Date_t NetworkInterfaceMock::now() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 7023101fd59..621a44a02c5 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -52,7 +52,6 @@ class BSONObj;
namespace executor {
-using ResponseStatus = TaskExecutor::ResponseStatus;
class NetworkConnectionHook;
/**
@@ -84,16 +83,11 @@ public:
NetworkInterfaceMock();
virtual ~NetworkInterfaceMock();
- virtual void appendConnectionStats(ConnectionPoolStats* stats) const;
- virtual std::string getDiagnosticString();
- Counters getCounters() const override {
- return Counters();
- }
/**
* Logs the contents of the queues for diagnostics.
*/
- virtual void logQueues();
+ void logQueues();
////////////////////////////////////////////////////////////////////////////////
//
@@ -101,20 +95,26 @@ public:
//
////////////////////////////////////////////////////////////////////////////////
- virtual void startup();
- virtual void shutdown();
- virtual bool inShutdown() const;
- virtual void waitForWork();
- virtual void waitForWorkUntil(Date_t when);
- virtual void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook);
- virtual void setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
- virtual void signalWorkAvailable();
- virtual Date_t now();
- virtual std::string getHostName();
- virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- RemoteCommandRequest& request,
- RemoteCommandCompletionFn&& onFinish,
- const transport::BatonHandle& baton = nullptr);
+ void appendConnectionStats(ConnectionPoolStats* stats) const override {}
+
+ std::string getDiagnosticString() override;
+
+ Counters getCounters() const override {
+ return Counters();
+ }
+
+ void startup() override;
+ void shutdown() override;
+ bool inShutdown() const override;
+ void waitForWork() override;
+ void waitForWorkUntil(Date_t when) override;
+ void signalWorkAvailable() override;
+ Date_t now() override;
+ std::string getHostName() override;
+ Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest& request,
+ RemoteCommandCompletionFn&& onFinish,
+ const transport::BatonHandle& baton = nullptr) override;
/**
* If the network operation is in the _unscheduled or _processing queues, moves the operation
@@ -122,21 +122,20 @@ public:
* the _scheduled queue, does nothing. The latter simulates the case where cancelCommand() is
* called after the task has already completed, but its callback has not yet been run.
*/
- virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const transport::BatonHandle& baton = nullptr);
+ void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton = nullptr) override;
/**
* Not implemented.
*/
- virtual Status setAlarm(Date_t when,
- unique_function<void()> action,
- const transport::BatonHandle& baton = nullptr);
+ Status setAlarm(Date_t when,
+ unique_function<void()> action,
+ const transport::BatonHandle& baton = nullptr) override;
- virtual bool onNetworkThread();
+ bool onNetworkThread() override;
void dropConnections(const HostAndPort&) override {}
-
////////////////////////////////////////////////////////////////////////////////
//
// Methods for simulating network operations and the passage of time.
@@ -151,6 +150,10 @@ public:
*/
class InNetworkGuard;
+ void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook);
+
+ void setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
+
/**
* Causes the currently running (non-executor) thread to assume the mantle of the network
* simulation thread.
@@ -199,7 +202,7 @@ public:
*/
void scheduleResponse(NetworkOperationIterator noi,
Date_t when,
- const ResponseStatus& response);
+ const TaskExecutor::ResponseStatus& response);
/**
* Schedules a successful "response" to "noi" at virtual time "when".
@@ -221,14 +224,13 @@ public:
* "when" defaults to now().
*/
RemoteCommandRequest scheduleErrorResponse(const Status& response);
- RemoteCommandRequest scheduleErrorResponse(const ResponseStatus response);
+ RemoteCommandRequest scheduleErrorResponse(const TaskExecutor::ResponseStatus response);
RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi,
const Status& response);
RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi,
Date_t when,
const Status& response);
-
/**
* Swallows "noi", causing the network interface to not respond to it until
* shutdown() is called.
@@ -277,7 +279,7 @@ public:
*/
void _interruptWithResponse_inlock(const TaskExecutor::CallbackHandle& cbHandle,
const std::vector<NetworkOperationList*> queuesToCheck,
- const ResponseStatus& response);
+ const TaskExecutor::ResponseStatus& response);
private:
/**
@@ -308,15 +310,6 @@ private:
void _startup_inlock();
/**
- * Returns information about the state of this mock for diagnostic purposes.
- */
- std::string _getDiagnosticString_inlock() const;
-
- /**
- * Logs the contents of the queues for diagnostics.
- */
- void _logQueues_inlock() const;
- /**
* Returns the current virtualized time.
*/
Date_t _now_inlock() const {
@@ -446,7 +439,7 @@ public:
/**
* Sets the response and thet virtual time at which it will be delivered.
*/
- void setResponse(Date_t responseDate, const ResponseStatus& response);
+ void setResponse(Date_t responseDate, const TaskExecutor::ResponseStatus& response);
/**
* Predicate that returns true if cbHandle equals the executor's handle for this network
@@ -506,7 +499,7 @@ private:
Date_t _responseDate;
TaskExecutor::CallbackHandle _cbHandle;
RemoteCommandRequest _request;
- ResponseStatus _response;
+ TaskExecutor::ResponseStatus _response;
RemoteCommandCompletionFn _onFinish;
};
diff --git a/src/mongo/s/query/results_merger_test_fixture.h b/src/mongo/s/query/results_merger_test_fixture.h
index 7a0f46c9058..d80b8cb3ecf 100644
--- a/src/mongo/s/query/results_merger_test_fixture.h
+++ b/src/mongo/s/query/results_merger_test_fixture.h
@@ -195,7 +195,7 @@ protected:
return guard->hasReadyRequests();
}
- void scheduleErrorResponse(executor::ResponseStatus rs) {
+ void scheduleErrorResponse(executor::TaskExecutor::ResponseStatus rs) {
invariant(!rs.isOK());
rs.elapsedMillis = Milliseconds(0);
executor::NetworkInterfaceMock* net = network();
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index a63374f501d..941a4e7d876 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -67,6 +67,7 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/sharding_task_executor.h"
#include "mongo/stdx/memory.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -120,6 +121,21 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service
return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager));
}
+std::unique_ptr<executor::TaskExecutor> makeShardingFixedTaskExecutor(
+ std::unique_ptr<NetworkInterface> net) {
+ auto executor =
+ stdx::make_unique<ThreadPoolTaskExecutor>(stdx::make_unique<ThreadPool>([] {
+ ThreadPool::Options opts;
+ opts.poolName = "Sharding-Fixed";
+ opts.maxThreads =
+ ThreadPool::Options::kUnlimited;
+ return opts;
+ }()),
+ std::move(net));
+
+ return stdx::make_unique<executor::ShardingTaskExecutor>(std::move(executor));
+}
+
std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool(
std::unique_ptr<NetworkInterface> fixedNet,
rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder,
@@ -140,7 +156,7 @@ std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool(
}
// Add executor used to perform non-performance critical work.
- auto fixedExec = makeShardingTaskExecutor(std::move(fixedNet));
+ auto fixedExec = makeShardingFixedTaskExecutor(std::move(fixedNet));
auto executorPool = stdx::make_unique<TaskExecutorPool>();
executorPool->addExecutors(std::move(executors), std::move(fixedExec));