diff options
-rw-r--r-- | jstests/sharding/shard_aware_primary_failover.js | 5 | ||||
-rw-r--r-- | src/mongo/executor/network_interface.h | 3 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 34 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 79 | ||||
-rw-r--r-- | src/mongo/s/query/results_merger_test_fixture.h | 2 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 18 |
6 files changed, 69 insertions, 72 deletions
diff --git a/jstests/sharding/shard_aware_primary_failover.js b/jstests/sharding/shard_aware_primary_failover.js index 91b16d5686d..abbfb47c1cf 100644 --- a/jstests/sharding/shard_aware_primary_failover.js +++ b/jstests/sharding/shard_aware_primary_failover.js @@ -1,7 +1,6 @@ /** * Test that a new primary that gets elected will properly perform shard initialization. */ - (function() { "use strict"; @@ -9,6 +8,7 @@ var replTest = new ReplSetTest({nodes: 3}); replTest.startSet({shardsvr: ''}); + var nodes = replTest.nodeList(); replTest.initiate({ _id: replTest.name, @@ -41,7 +41,7 @@ shardIdentityQuery, shardIdentityUpdate, {upsert: true, writeConcern: {w: 'majority'}})); replTest.stopMaster(); - replTest.waitForMaster(); + replTest.waitForMaster(30000); primaryConn = replTest.getPrimary(); @@ -55,5 +55,4 @@ replTest.stopSet(); st.stop(); - })(); diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index c55a7925bdc..62012162bf6 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -42,9 +42,6 @@ #include "mongo/util/future.h" namespace mongo { - -class BSONObjBuilder; - namespace executor { MONGO_FAIL_POINT_DECLARE(networkInterfaceDiscardCommandsBeforeAcquireConn); diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index c7c5178761a..ed9d67ea536 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -67,34 +67,17 @@ NetworkInterfaceMock::~NetworkInterfaceMock() { void NetworkInterfaceMock::logQueues() { stdx::unique_lock<stdx::mutex> lk(_mutex); - _logQueues_inlock(); -} - -std::string NetworkInterfaceMock::getDiagnosticString() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - return _getDiagnosticString_inlock(); -} - -std::string NetworkInterfaceMock::_getDiagnosticString_inlock() const { - return str::stream() << "NetworkInterfaceMock -- waitingToRunMask:" << _waitingToRunMask - << ", now:" << _now_inlock().toString() << ", hasStarted:" << _hasStarted - << ", inShutdown: " << _inShutdown.load() - << ", processing: " << _processing.size() - << ", scheduled: " << _scheduled.size() - << ", blackHoled: " << _blackHoled.size() - << ", unscheduled: " << _unscheduled.size(); -} - -void NetworkInterfaceMock::_logQueues_inlock() const { - std::vector<std::pair<std::string, const NetworkOperationList*>> queues{ + const std::vector<std::pair<std::string, const NetworkOperationList*>> queues{ {"unscheduled", &_unscheduled}, {"scheduled", &_scheduled}, {"processing", &_processing}, {"blackholes", &_blackHoled}}; + for (auto&& queue : queues) { if (queue.second->empty()) { continue; } + log() << "**** queue: " << queue.first << " ****"; for (auto&& item : *queue.second) { log() << "\t\t " << item.getDiagnosticString(); @@ -102,7 +85,16 @@ void NetworkInterfaceMock::_logQueues_inlock() const { } } -void NetworkInterfaceMock::appendConnectionStats(ConnectionPoolStats* stats) const {} +std::string NetworkInterfaceMock::getDiagnosticString() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + return str::stream() << "NetworkInterfaceMock -- waitingToRunMask:" << _waitingToRunMask + << ", now:" << _now_inlock().toString() << ", hasStarted:" << _hasStarted + << ", inShutdown: " << _inShutdown.load() + << ", processing: " << _processing.size() + << ", scheduled: " << _scheduled.size() + << ", blackHoled: " << _blackHoled.size() + << ", unscheduled: " << _unscheduled.size(); +} Date_t NetworkInterfaceMock::now() { stdx::lock_guard<stdx::mutex> lk(_mutex); diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 7023101fd59..621a44a02c5 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -52,7 +52,6 @@ class BSONObj; namespace executor { -using ResponseStatus = TaskExecutor::ResponseStatus; class NetworkConnectionHook; /** @@ -84,16 +83,11 @@ public: NetworkInterfaceMock(); virtual ~NetworkInterfaceMock(); - virtual void appendConnectionStats(ConnectionPoolStats* stats) const; - virtual std::string getDiagnosticString(); - Counters getCounters() const override { - return Counters(); - } /** * Logs the contents of the queues for diagnostics. */ - virtual void logQueues(); + void logQueues(); //////////////////////////////////////////////////////////////////////////////// // @@ -101,20 +95,26 @@ public: // //////////////////////////////////////////////////////////////////////////////// - virtual void startup(); - virtual void shutdown(); - virtual bool inShutdown() const; - virtual void waitForWork(); - virtual void waitForWorkUntil(Date_t when); - virtual void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook); - virtual void setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook); - virtual void signalWorkAvailable(); - virtual Date_t now(); - virtual std::string getHostName(); - virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, - RemoteCommandRequest& request, - RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton = nullptr); + void appendConnectionStats(ConnectionPoolStats* stats) const override {} + + std::string getDiagnosticString() override; + + Counters getCounters() const override { + return Counters(); + } + + void startup() override; + void shutdown() override; + bool inShutdown() const override; + void waitForWork() override; + void waitForWorkUntil(Date_t when) override; + void signalWorkAvailable() override; + Date_t now() override; + std::string getHostName() override; + Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + RemoteCommandCompletionFn&& onFinish, + const transport::BatonHandle& baton = nullptr) override; /** * If the network operation is in the _unscheduled or _processing queues, moves the operation @@ -122,21 +122,20 @@ public: * the _scheduled queue, does nothing. The latter simulates the case where cancelCommand() is * called after the task has already completed, but its callback has not yet been run. */ - virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton = nullptr); + void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton = nullptr) override; /** * Not implemented. */ - virtual Status setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton = nullptr); + Status setAlarm(Date_t when, + unique_function<void()> action, + const transport::BatonHandle& baton = nullptr) override; - virtual bool onNetworkThread(); + bool onNetworkThread() override; void dropConnections(const HostAndPort&) override {} - //////////////////////////////////////////////////////////////////////////////// // // Methods for simulating network operations and the passage of time. @@ -151,6 +150,10 @@ public: */ class InNetworkGuard; + void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook); + + void setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook); + /** * Causes the currently running (non-executor) thread to assume the mantle of the network * simulation thread. @@ -199,7 +202,7 @@ public: */ void scheduleResponse(NetworkOperationIterator noi, Date_t when, - const ResponseStatus& response); + const TaskExecutor::ResponseStatus& response); /** * Schedules a successful "response" to "noi" at virtual time "when". @@ -221,14 +224,13 @@ public: * "when" defaults to now(). */ RemoteCommandRequest scheduleErrorResponse(const Status& response); - RemoteCommandRequest scheduleErrorResponse(const ResponseStatus response); + RemoteCommandRequest scheduleErrorResponse(const TaskExecutor::ResponseStatus response); RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi, const Status& response); RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi, Date_t when, const Status& response); - /** * Swallows "noi", causing the network interface to not respond to it until * shutdown() is called. @@ -277,7 +279,7 @@ public: */ void _interruptWithResponse_inlock(const TaskExecutor::CallbackHandle& cbHandle, const std::vector<NetworkOperationList*> queuesToCheck, - const ResponseStatus& response); + const TaskExecutor::ResponseStatus& response); private: /** @@ -308,15 +310,6 @@ private: void _startup_inlock(); /** - * Returns information about the state of this mock for diagnostic purposes. - */ - std::string _getDiagnosticString_inlock() const; - - /** - * Logs the contents of the queues for diagnostics. - */ - void _logQueues_inlock() const; - /** * Returns the current virtualized time. */ Date_t _now_inlock() const { @@ -446,7 +439,7 @@ public: /** * Sets the response and thet virtual time at which it will be delivered. */ - void setResponse(Date_t responseDate, const ResponseStatus& response); + void setResponse(Date_t responseDate, const TaskExecutor::ResponseStatus& response); /** * Predicate that returns true if cbHandle equals the executor's handle for this network @@ -506,7 +499,7 @@ private: Date_t _responseDate; TaskExecutor::CallbackHandle _cbHandle; RemoteCommandRequest _request; - ResponseStatus _response; + TaskExecutor::ResponseStatus _response; RemoteCommandCompletionFn _onFinish; }; diff --git a/src/mongo/s/query/results_merger_test_fixture.h b/src/mongo/s/query/results_merger_test_fixture.h index 7a0f46c9058..d80b8cb3ecf 100644 --- a/src/mongo/s/query/results_merger_test_fixture.h +++ b/src/mongo/s/query/results_merger_test_fixture.h @@ -195,7 +195,7 @@ protected: return guard->hasReadyRequests(); } - void scheduleErrorResponse(executor::ResponseStatus rs) { + void scheduleErrorResponse(executor::TaskExecutor::ResponseStatus rs) { invariant(!rs.isOK()); rs.elapsedMillis = Milliseconds(0); executor::NetworkInterfaceMock* net = network(); diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index a63374f501d..941a4e7d876 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -67,6 +67,7 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/sharding_task_executor.h" #include "mongo/stdx/memory.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -120,6 +121,21 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager)); } +std::unique_ptr<executor::TaskExecutor> makeShardingFixedTaskExecutor( + std::unique_ptr<NetworkInterface> net) { + auto executor = + stdx::make_unique<ThreadPoolTaskExecutor>(stdx::make_unique<ThreadPool>([] { + ThreadPool::Options opts; + opts.poolName = "Sharding-Fixed"; + opts.maxThreads = + ThreadPool::Options::kUnlimited; + return opts; + }()), + std::move(net)); + + return stdx::make_unique<executor::ShardingTaskExecutor>(std::move(executor)); +} + std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool( std::unique_ptr<NetworkInterface> fixedNet, rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder, @@ -140,7 +156,7 @@ std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool( } // Add executor used to perform non-performance critical work. - auto fixedExec = makeShardingTaskExecutor(std::move(fixedNet)); + auto fixedExec = makeShardingFixedTaskExecutor(std::move(fixedNet)); auto executorPool = stdx::make_unique<TaskExecutorPool>(); executorPool->addExecutors(std::move(executors), std::move(fixedExec)); |