diff options
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/SConscript | 5 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 1 | ||||
-rw-r--r-- | src/mongo/s/grid.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/grid.h | 11 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/sharding_egress_metadata_hook.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/sharding_egress_metadata_hook.h | 27 | ||||
-rw-r--r-- | src/mongo/s/sharding_egress_metadata_hook_for_mongod.cpp | 67 | ||||
-rw-r--r-- | src/mongo/s/sharding_egress_metadata_hook_for_mongod.h | 48 | ||||
-rw-r--r-- | src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/sharding_egress_metadata_hook_for_mongos.h | 8 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 27 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.h | 11 | ||||
-rw-r--r-- | src/mongo/s/sharding_test_fixture.cpp | 8 |
14 files changed, 195 insertions, 47 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 62105849406..f3d165806a0 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -17,7 +17,6 @@ env.Library( target='sharding_initialization', source=[ 'sharding_initialization.cpp', - 'sharding_egress_metadata_hook_for_mongos.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/executor/network_interface_factory', @@ -226,6 +225,7 @@ env.Library( 'mongos_options.cpp', 's_only.cpp', 's_sharding_server_status.cpp', + 'sharding_egress_metadata_hook_for_mongos.cpp', 'version_mongos.cpp', ], LIBDEPS=[ @@ -260,7 +260,8 @@ env.Library( target='serveronly', source=[ 'd_sharding_server_status.cpp', - "d_state.cpp", + 'd_state.cpp', + 'sharding_egress_metadata_hook_for_mongod.cpp', ], LIBDEPS=[ "coreshard", diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index db556719c31..232b4f1a446 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -244,7 +244,6 @@ void ShardRegistryData::addConfigShard(std::shared_ptr<Shard> shard) { _addShard_inlock(shard); } - shared_ptr<Shard> ShardRegistryData::findByRSName(const string& name) const { stdx::lock_guard<stdx::mutex> lk(_mutex); auto i = _rsLookup.find(name); diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index 57538b1f6bb..e69e296e6cc 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -32,6 +32,7 @@ #include "mongo/s/grid.h" +#include "mongo/db/server_options.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/balancer/balancer_configuration.h" @@ -89,11 +90,15 @@ void Grid::setAllowLocalHost(bool allow) { } repl::OpTime Grid::configOpTime() const { + invariant(serverGlobalParams.clusterRole != ClusterRole::ConfigServer); + stdx::lock_guard<stdx::mutex> lk(_mutex); return _configOpTime; } void Grid::advanceConfigOpTime(repl::OpTime opTime) { + invariant(serverGlobalParams.clusterRole != ClusterRole::ConfigServer); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (_configOpTime < opTime) { _configOpTime = opTime; diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index 4316bca0f0d..02d5798d80d 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -122,8 +122,18 @@ public: return _balancerConfig.get(); } + /** + * Returns the the last optime that a shard or config server has reported as the current + * committed optime on the config server. + * NOTE: This is not valid to call on a config server instance. + */ repl::OpTime configOpTime() const; + /** + * Called whenever a mongos or shard gets a response from a config server or shard and updates + * what we've seen as the last config server optime. + * NOTE: This is not valid to call on a config server instance. + */ void advanceConfigOpTime(repl::OpTime opTime); /** @@ -156,6 +166,7 @@ private: mutable stdx::mutex _mutex; // Last known highest opTime from the config server that should be used when doing reads. + // This value is updated any time a shard or mongos talks to a config server or a shard. repl::OpTime _configOpTime; // Deprecated. This is only used on mongos, and once addShard is solely handled by the configs, diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 809e9354116..a6755a82082 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -78,6 +78,7 @@ #include "mongo/s/grid.h" #include "mongo/s/mongos_options.h" #include "mongo/s/query/cluster_cursor_cleanup_job.h" +#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" #include "mongo/s/sharding_initialization.h" #include "mongo/s/version_mongos.h" #include "mongo/s/query/cluster_cursor_manager.h" @@ -298,10 +299,11 @@ static Status initializeSharding(OperationContext* txn) { auto shardFactory = stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); - Status status = initializeGlobalShardingState(mongosGlobalParams.configdbs, - mongosGlobalParams.maxChunkSizeBytes, - std::move(shardFactory), - true); + Status status = initializeGlobalShardingState( + mongosGlobalParams.configdbs, + mongosGlobalParams.maxChunkSizeBytes, + std::move(shardFactory), + []() { return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(); }); if (!status.isOK()) { return status; diff --git a/src/mongo/s/sharding_egress_metadata_hook.cpp b/src/mongo/s/sharding_egress_metadata_hook.cpp index 2e1b6793346..2040955f605 100644 --- a/src/mongo/s/sharding_egress_metadata_hook.cpp +++ b/src/mongo/s/sharding_egress_metadata_hook.cpp @@ -57,7 +57,7 @@ Status ShardingEgressMetadataHook::writeRequestMetadata(bool shardedConnection, if (!shardedConnection) { return Status::OK(); } - rpc::ConfigServerMetadata(grid.configOpTime()).writeToMetadata(metadataBob); + rpc::ConfigServerMetadata(_getConfigServerOpTime()).writeToMetadata(metadataBob); return Status::OK(); } catch (...) { return exceptionToStatus(); @@ -68,7 +68,7 @@ Status ShardingEgressMetadataHook::writeRequestMetadata(const HostAndPort& targe BSONObjBuilder* metadataBob) { try { audit::writeImpersonatedUsersToMetadata(metadataBob); - rpc::ConfigServerMetadata(grid.configOpTime()).writeToMetadata(metadataBob); + rpc::ConfigServerMetadata(_getConfigServerOpTime()).writeToMetadata(metadataBob); return Status::OK(); } catch (...) { return exceptionToStatus(); @@ -135,7 +135,5 @@ Status ShardingEgressMetadataHook::_advanceConfigOptimeFromShard(ShardId shardId } } -void ShardingEgressMetadataHook::_saveGLEStats(const BSONObj& metadata, StringData hostString) {} - } // namespace rpc } // namespace mongo diff --git a/src/mongo/s/sharding_egress_metadata_hook.h b/src/mongo/s/sharding_egress_metadata_hook.h index 06fe41b1ebe..c510aefaab4 100644 --- a/src/mongo/s/sharding_egress_metadata_hook.h +++ b/src/mongo/s/sharding_egress_metadata_hook.h @@ -31,6 +31,7 @@ #include <memory> #include "mongo/base/string_data.h" +#include "mongo/db/repl/optime.h" #include "mongo/rpc/metadata/metadata_hook.h" #include "mongo/s/client/shard.h" @@ -42,6 +43,8 @@ namespace rpc { class ShardingEgressMetadataHook : public rpc::EgressMetadataHook { public: + virtual ~ShardingEgressMetadataHook() = default; + Status readReplyMetadata(const HostAndPort& replySource, const BSONObj& metadataObj) override; Status writeRequestMetadata(const HostAndPort& target, BSONObjBuilder* metadataBob) override; @@ -55,10 +58,28 @@ public: const StringData target, BSONObjBuilder* metadataBob); -private: - virtual void _saveGLEStats(const BSONObj& metadata, StringData hostString); +protected: + /** + * On mongod this is a no-op. + * On mongos it looks for $gleStats in a command's reply metadata, and fills in the + * ClusterLastErrorInfo for this thread's associated Client with the data, if found. + * This data will be used by subsequent GLE calls, to ensure we look for the correct write on + * the correct PRIMARY. + */ + virtual void _saveGLEStats(const BSONObj& metadata, StringData hostString) = 0; + + /** + * Called by writeRequestMetadata() to find the config server optime that should be sent as part + * of the ConfigServerMetadata. + */ + virtual repl::OpTime _getConfigServerOpTime() = 0; - Status _advanceConfigOptimeFromShard(ShardId shardId, const BSONObj& metadataObj); + /** + * On config servers this is a no-op. + * On shards and mongoses this advances the Grid's stored config server optime based on the + * metadata in the response object from running a command. + */ + virtual Status _advanceConfigOptimeFromShard(ShardId shardId, const BSONObj& metadataObj); }; } // namespace rpc diff --git a/src/mongo/s/sharding_egress_metadata_hook_for_mongod.cpp b/src/mongo/s/sharding_egress_metadata_hook_for_mongod.cpp new file mode 100644 index 00000000000..2c8dc215847 --- /dev/null +++ b/src/mongo/s/sharding_egress_metadata_hook_for_mongod.cpp @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/sharding_egress_metadata_hook_for_mongod.h" + +#include "mongo/base/status.h" +#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/server_options.h" +#include "mongo/s/grid.h" +#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" + +namespace mongo { + +namespace rpc { + +void ShardingEgressMetadataHookForMongod::_saveGLEStats(const BSONObj& metadata, + StringData hostString) {} + +repl::OpTime ShardingEgressMetadataHookForMongod::_getConfigServerOpTime() { + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return repl::getGlobalReplicationCoordinator()->getCurrentCommittedSnapshotOpTime(); + } else { + // TODO uncomment as part of SERVER-22663 + // invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); + return grid.configOpTime(); + } +} + +Status ShardingEgressMetadataHookForMongod::_advanceConfigOptimeFromShard( + ShardId shardId, const BSONObj& metadataObj) { + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return Status::OK(); + } + return ShardingEgressMetadataHook::_advanceConfigOptimeFromShard(shardId, metadataObj); +} + +} // namespace rpc +} // namespace mongo diff --git a/src/mongo/s/sharding_egress_metadata_hook_for_mongod.h b/src/mongo/s/sharding_egress_metadata_hook_for_mongod.h new file mode 100644 index 00000000000..56272d97cd2 --- /dev/null +++ b/src/mongo/s/sharding_egress_metadata_hook_for_mongod.h @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/s/sharding_egress_metadata_hook.h" + +namespace mongo { + +namespace rpc { + +class ShardingEgressMetadataHookForMongod final : public ShardingEgressMetadataHook { +public: + ShardingEgressMetadataHookForMongod() = default; + +private: + void _saveGLEStats(const BSONObj& metadata, StringData hostString) override; + repl::OpTime _getConfigServerOpTime() override; + Status _advanceConfigOptimeFromShard(ShardId shardId, const BSONObj& metadataObj) override; +}; + +} // namespace rpc +} // namespace mongo diff --git a/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp b/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp index ab74ecf35d7..466d9306de9 100644 --- a/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp +++ b/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp @@ -30,10 +30,12 @@ #include "mongo/platform/basic.h" +#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" + #include "mongo/db/client.h" #include "mongo/rpc/metadata/sharding_metadata.h" #include "mongo/s/cluster_last_error_info.h" -#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" +#include "mongo/s/grid.h" #include "mongo/util/log.h" namespace mongo { @@ -76,5 +78,9 @@ void ShardingEgressMetadataHookForMongos::_saveGLEStats(const BSONObj& metadata, HostOpTime(shardingMetadata.getLastOpTime(), shardingMetadata.getLastElectionId())); } +repl::OpTime ShardingEgressMetadataHookForMongos::_getConfigServerOpTime() { + return grid.configOpTime(); +} + } // namespace rpc } // namespace mongo diff --git a/src/mongo/s/sharding_egress_metadata_hook_for_mongos.h b/src/mongo/s/sharding_egress_metadata_hook_for_mongos.h index 11a753e7506..92fd9cf9eef 100644 --- a/src/mongo/s/sharding_egress_metadata_hook_for_mongos.h +++ b/src/mongo/s/sharding_egress_metadata_hook_for_mongos.h @@ -36,14 +36,8 @@ namespace rpc { class ShardingEgressMetadataHookForMongos final : public ShardingEgressMetadataHook { private: - /** - * Looks for $gleStats in a command's reply metadata, and fills in the ClusterLastErrorInfo for - * this thread's associated Client with the data, if found. This data will be used by subsequent - * GLE calls, to ensure we look for the correct write on the correct PRIMARY. - * - * Returns the result from calling runCommand - */ void _saveGLEStats(const BSONObj& metadata, StringData hostString) override; + repl::OpTime _getConfigServerOpTime() override; }; } // namespace rpc diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index d3019a5ae61..b66a7571a9e 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -51,13 +51,12 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_network_connection_hook.h" #include "mongo/s/grid.h" -#include "mongo/s/sharding_egress_metadata_hook.h" -#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/replset/catalog_manager_replica_set.h" #include "mongo/s/catalog/replset/dist_lock_catalog_impl.h" #include "mongo/s/catalog/replset/replset_dist_lock_manager.h" #include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/sharding_egress_metadata_hook.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -101,16 +100,11 @@ std::unique_ptr<CatalogManager> makeCatalogManager(ServiceContext* service, executor::makeNetworkInterface("NetworkInterfaceASIO-AddShard-TaskExecutor"))); } -std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(std::unique_ptr<NetworkInterface> fixedNet, - bool isMongos) { +std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool( + std::unique_ptr<NetworkInterface> fixedNet, + std::unique_ptr<rpc::EgressMetadataHook> metadataHook) { std::vector<std::unique_ptr<executor::TaskExecutor>> executors; for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) { - std::unique_ptr<rpc::EgressMetadataHook> metadataHook; - if (isMongos) { - metadataHook = stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(); - } else { - metadataHook = stdx::make_unique<rpc::ShardingEgressMetadataHook>(); - }; auto net = executor::makeNetworkInterface( "NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i), stdx::make_unique<ShardingNetworkConnectionHook>(), @@ -137,24 +131,17 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(std::unique_ptr<NetworkIn Status initializeGlobalShardingState(const ConnectionString& configCS, uint64_t maxChunkSizeBytes, std::unique_ptr<ShardFactory> shardFactory, - bool isMongos) { + rpc::ShardingEgressMetadataHookBuilder hookBuilder) { if (configCS.type() == ConnectionString::INVALID) { return {ErrorCodes::BadValue, "Unrecognized connection string."}; } - std::unique_ptr<rpc::EgressMetadataHook> metadataHook; - if (isMongos) { - metadataHook = stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(); - } else { - metadataHook = stdx::make_unique<rpc::ShardingEgressMetadataHook>(); - } - auto network = executor::makeNetworkInterface("NetworkInterfaceASIO-ShardRegistry", stdx::make_unique<ShardingNetworkConnectionHook>(), - std::move(metadataHook)); + hookBuilder()); auto networkPtr = network.get(); - auto executorPool = makeTaskExecutorPool(std::move(network), isMongos); + auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder()); executorPool->startup(); auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS)); diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h index a4508870088..d31aabf261d 100644 --- a/src/mongo/s/sharding_initialization.h +++ b/src/mongo/s/sharding_initialization.h @@ -31,6 +31,8 @@ #include <cstdint> #include <memory> +#include "mongo/stdx/functional.h" + namespace mongo { class ConnectionString; @@ -38,6 +40,12 @@ class OperationContext; class ShardFactory; class Status; +namespace rpc { +class ShardingEgressMetadataHook; +using ShardingEgressMetadataHookBuilder = + stdx::function<std::unique_ptr<ShardingEgressMetadataHook>()>; +} // namespace rpc + /** * Takes in the connection string for reaching the config servers and initializes the global * CatalogManager, ShardingRegistry, and grid objects. @@ -45,7 +53,8 @@ class Status; Status initializeGlobalShardingState(const ConnectionString& configCS, uint64_t maxChunkSizeBytes, std::unique_ptr<ShardFactory> shardFactory, - bool isMongos); + rpc::ShardingEgressMetadataHookBuilder hookBuilder); + /** * Tries to contact the config server and reload the shard registry until it succeeds or * is interrupted. diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index d2287936993..86921c8c75b 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -60,7 +60,7 @@ #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/set_shard_version_request.h" -#include "mongo/s/sharding_egress_metadata_hook.h" +#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" @@ -72,7 +72,7 @@ using executor::NetworkInterfaceMock; using executor::NetworkTestEnv; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; -using rpc::ShardingEgressMetadataHook; +using rpc::ShardingEgressMetadataHookForMongos; using unittest::assertGet; using std::string; @@ -95,14 +95,14 @@ void ShardingTestFixture::setUp() { // Set up executor pool used for most operations. auto fixedNet = stdx::make_unique<executor::NetworkInterfaceMock>(); - fixedNet->setEgressMetadataHook(stdx::make_unique<ShardingEgressMetadataHook>()); + fixedNet->setEgressMetadataHook(stdx::make_unique<ShardingEgressMetadataHookForMongos>()); _mockNetwork = fixedNet.get(); auto fixedExec = makeThreadPoolTestExecutor(std::move(fixedNet)); _networkTestEnv = stdx::make_unique<NetworkTestEnv>(fixedExec.get(), _mockNetwork); _executor = fixedExec.get(); auto netForPool = stdx::make_unique<executor::NetworkInterfaceMock>(); - netForPool->setEgressMetadataHook(stdx::make_unique<ShardingEgressMetadataHook>()); + netForPool->setEgressMetadataHook(stdx::make_unique<ShardingEgressMetadataHookForMongos>()); auto execForPool = makeThreadPoolTestExecutor(std::move(netForPool)); std::vector<std::unique_ptr<executor::TaskExecutor>> executorsForPool; executorsForPool.emplace_back(std::move(execForPool)); |