diff options
author | Spencer T Brody <spencer@mongodb.com> | 2015-09-18 15:53:41 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2015-09-24 13:36:59 -0400 |
commit | 03c414e87e5c3bc34230421163cdd06c9451389d (patch) | |
tree | 1fc24482ada7905ee380a2c0fd1c58a7fb89ed78 /src/mongo | |
parent | 3a7b1a9800f75706f35a290a8dee198bb29e3366 (diff) | |
download | mongo-03c414e87e5c3bc34230421163cdd06c9451389d.tar.gz |
SERVER-20498 Send config server optime to shards automatically on all commands via the OP_COMMAND metadata
Diffstat (limited to 'src/mongo')
35 files changed, 292 insertions, 452 deletions
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index ffa32b1c68a..d720b9da239 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -214,7 +214,7 @@ bool checkShardVersion(OperationContext* txn, return false; } - ChunkVersion requestShardVersion = request.getShardVersion().getVersion(); + ChunkVersion requestShardVersion = request.getShardVersion(); if (ChunkVersion::isIgnoredVersion(requestShardVersion)) { return true; } @@ -310,8 +310,7 @@ void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request, // TODO(spencer): Remove this after 3.2 ships. OperationShardVersion& operationShardVersion = OperationShardVersion::get(_txn); if (request.hasShardVersion() && !operationShardVersion.hasShardVersion()) { - operationShardVersion.setShardVersion(request.getTargetingNSS(), - request.getShardVersion().getVersion()); + operationShardVersion.setShardVersion(request.getTargetingNSS(), request.getShardVersion()); } // diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 452670ddf23..aa13e8095d5 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -90,8 +90,7 @@ #include "mongo/rpc/request_interface.h" #include "mongo/rpc/reply_builder_interface.h" #include "mongo/rpc/metadata.h" -#include "mongo/rpc/metadata/config_server_request_metadata.h" -#include "mongo/rpc/metadata/config_server_response_metadata.h" +#include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/rpc/metadata/sharding_metadata.h" #include "mongo/s/client/shard_registry.h" @@ -1262,33 +1261,20 @@ void Command::execCommand(OperationContext* txn, auto commandNS = NamespaceString(command->parseNs(dbname, request.getCommandArgs())); operationShardVersion.initializeFromCommand(commandNS, request.getCommandArgs()); - auto requestMetadataStatus = - rpc::ConfigServerRequestMetadata::readFromCommand(request.getCommandArgs()); - auto optime = uassertStatusOK(requestMetadataStatus).getOpTime(); - if (optime.is_initialized()) { - if (ShardingState::get(txn)->enabled()) { - // TODO(spencer): Do this unconditionally once all nodes are sharding aware - // by default. - grid.shardRegistry()->advanceConfigOpTime(optime.get()); - } else { - massert( - 28807, - "Received a command with sharding chunk information but this node is not " - "sharding aware", - command->name == "setShardVersion"); - } + auto shardingState = ShardingState::get(txn); + if (shardingState->enabled()) { + // TODO(spencer): Do this unconditionally once all nodes are sharding aware + // by default. + shardingState->updateConfigServerOpTimeFromMetadata(txn); } else { - // If there was top-level shard version information then there must have been - // config optime information as well. a 3.0 mongos won't have shard version info - // at the top level (they have it in a nested "metadata" field) so it won't cause - // a problem here. - massert(28818, - str::stream() - << "Received command with chunk version information but no config " - "server optime: " << request.getCommandArgs().jsonString(), - !operationShardVersion.hasShardVersion() || - ChunkVersion::isIgnoredVersion( - operationShardVersion.getShardVersion(commandNS))); + massert( + 28807, + str::stream() + << "Received a command with sharding chunk version information but this " + "node is not sharding aware: " << request.getCommandArgs().jsonString(), + !operationShardVersion.hasShardVersion() || + ChunkVersion::isIgnoredVersion( + operationShardVersion.getShardVersion(commandNS))); } } @@ -1418,7 +1404,7 @@ bool Command::run(OperationContext* txn, if (isShardingAware) { auto opTime = grid.shardRegistry()->getConfigOpTime(); - rpc::ConfigServerResponseMetadata(opTime).writeToMetadata(&metadataBob); + rpc::ConfigServerMetadata(opTime).writeToMetadata(&metadataBob); } auto cmdResponse = replyBuilderBob.done(); diff --git a/src/mongo/db/query/lite_parsed_query.cpp b/src/mongo/db/query/lite_parsed_query.cpp index 55ad4f7aa4c..90eed61ff95 100644 --- a/src/mongo/db/query/lite_parsed_query.cpp +++ b/src/mongo/db/query/lite_parsed_query.cpp @@ -96,7 +96,6 @@ const char kAwaitDataField[] = "awaitData"; const char kPartialResultsField[] = "allowPartialResults"; const char kTermField[] = "term"; const char kOptionsField[] = "options"; -const char kConfigOpTimeField[] = "configsvrOpTime"; } // namespace @@ -361,8 +360,6 @@ StatusWith<unique_ptr<LiteParsedQuery>> LiteParsedQuery::makeFromFindCommand(Nam } } else if (str::equals(fieldName, kShardVersionField)) { // Shard version parsing is handled elsewhere. - } else if (str::equals(fieldName, kConfigOpTimeField)) { - // Config server optime parsing is handled along with shard versioning elsewhere. } else if (str::equals(fieldName, kTermField)) { Status status = checkFieldType(el, NumberLong); if (!status.isOK()) { diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index 4512570f1ec..1773bc9e842 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -153,11 +153,8 @@ public: } // step 2 - ChunkVersionAndOpTime verAndOpTime = - uassertStatusOK(ChunkVersionAndOpTime::parseFromBSONForSetShardVersion(cmdObj)); - const auto& version = verAndOpTime.getVersion(); - - grid.shardRegistry()->advanceConfigOpTime(verAndOpTime.getOpTime()); + ChunkVersion version = + uassertStatusOK(ChunkVersion::parseFromBSONForSetShardVersion(cmdObj)); // step 3 const ChunkVersion oldVersion = info->getVersion(ns); diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 26de039eed0..24e6d5ca19c 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -43,6 +43,7 @@ #include "mongo/db/s/metadata_loader.h" #include "mongo/db/s/operation_shard_version.h" #include "mongo/db/s/sharded_connection_info.h" +#include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard_registry.h" @@ -115,7 +116,9 @@ bool isMongos() { return false; } -ShardingState::ShardingState() : _configServerTickets(kMaxConfigServerRefreshThreads) {} +ShardingState::ShardingState() + : _initializationState(InitializationState::kUninitialized), + _configServerTickets(kMaxConfigServerRefreshThreads) {} ShardingState::~ShardingState() = default; @@ -129,19 +132,19 @@ ShardingState* ShardingState::get(OperationContext* operationContext) { bool ShardingState::enabled() { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _enabled; + return _initializationState == InitializationState::kInitialized; } ConnectionString ShardingState::getConfigServer(OperationContext* txn) { stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_enabled); + invariant(_initializationState == InitializationState::kInitialized); return grid.shardRegistry()->getConfigServerConnectionString(); } string ShardingState::getShardName() { stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_enabled); + invariant(_initializationState == InitializationState::kInitialized); return _shardName; } @@ -151,22 +154,62 @@ void ShardingState::initialize(OperationContext* txn, const string& server) { "Unable to obtain host name during sharding initialization.", !getHostName().empty()); - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_enabled) { + if (_initializationState == InitializationState::kInitialized) { // TODO: Do we need to throw exception if the config servers have changed from what we // already have in place? How do we test for that? return; } + if (_initializationState == InitializationState::kInitializing) { + while (_initializationState == InitializationState::kInitializing) { + _initializationFinishedCondition.wait(lk); + } + invariant(_initializationState == InitializationState::kInitialized); + return; + } + + invariant(_initializationState == InitializationState::kUninitialized); + _initializationState = InitializationState::kInitializing; + ShardedConnectionInfo::addHook(); ReplicaSetMonitor::setSynchronousConfigChangeHook( &ConfigServer::replicaSetChangeShardRegistryUpdateHook); + lk.unlock(); + + // Initialize sharding state outside the lock to prevent doing network traffic traffic to the + // config servers in the lock (and deadlocking dbtest). ConnectionString configServerCS = uassertStatusOK(ConnectionString::parse(server)); uassertStatusOK(initializeGlobalShardingState(txn, configServerCS, false)); - _enabled = true; + lk.lock(); + + invariant(_initializationState == InitializationState::kInitializing); + _updateConfigServerOpTimeFromMetadata_inlock(txn); + _initializationState = InitializationState::kInitialized; + _initializationFinishedCondition.notify_all(); +} + +void ShardingState::updateConfigServerOpTimeFromMetadata(OperationContext* txn) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_initializationState == InitializationState::kInitialized); + + _updateConfigServerOpTimeFromMetadata_inlock(txn); +} + +void ShardingState::_updateConfigServerOpTimeFromMetadata_inlock(OperationContext* txn) { + if (serverGlobalParams.configsvrMode != CatalogManager::ConfigServerMode::NONE) { + // Nothing to do if we're a config server ourselves. + return; + } + + boost::optional<repl::OpTime> opTime = rpc::ConfigServerMetadata::get(txn).getOpTime(); + + if (opTime) { + grid.shardRegistry()->advanceConfigOpTime(*opTime); + } } void ShardingState::setShardName(const string& name) { @@ -500,7 +543,7 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, stdx::lock_guard<stdx::mutex> lk(_mutex); // We can't reload if sharding is not enabled - i.e. without a config server location - if (!_enabled) { + if (_initializationState != InitializationState::kInitialized) { string errMsg = str::stream() << "cannot refresh metadata for " << ns << " before sharding has been enabled"; @@ -622,7 +665,7 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, stdx::lock_guard<stdx::mutex> lk(_mutex); // Don't reload if our config server has changed or sharding is no longer enabled - if (!_enabled) { + if (_initializationState != InitializationState::kInitialized) { string errMsg = str::stream() << "could not refresh metadata for " << ns << ", sharding is no longer enabled"; @@ -762,8 +805,9 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) { stdx::lock_guard<stdx::mutex> lk(_mutex); - builder.appendBool("enabled", _enabled); - if (!_enabled) { + bool enabled = _initializationState == InitializationState::kInitialized; + builder.appendBool("enabled", enabled); + if (!enabled) { return; } @@ -782,8 +826,10 @@ void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) { versionB.done(); } -bool ShardingState::needCollectionMetadata(OperationContext* txn, const string& ns) const { - if (!_enabled) +bool ShardingState::needCollectionMetadata(OperationContext* txn, const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_initializationState != InitializationState::kInitialized) return false; Client* client = txn->getClient(); diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index dbee5597ae9..59ad264f3ab 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -99,12 +99,17 @@ public: void initialize(OperationContext* txn, const std::string& server); /** + * Updates the ShardRegistry's stored notion of the config server optime based on the + * ConfigServerMetadata decoration attached to the OperationContext. + */ + void updateConfigServerOpTimeFromMetadata(OperationContext* txn); + + /** * Assigns a shard name to this MongoD instance. - * * TODO: The only reason we need this method and cannot merge it together with the initialize - * call is the setShardVersion request being sent by the config coordinator to the config - * server instances. This is the only command, which does not include shard name and once we - * get rid of the legacy style config servers, we can merge these methods. + * call is the setShardVersion request being sent by the config coordinator to the config server + * instances. This is the only command, which does not include shard name and once we get rid of + * the legacy style config servers, we can merge these methods. * * Throws an error if shard name has always been set and the newly specified value does not * match what was previously installed. @@ -171,7 +176,7 @@ public: void appendInfo(OperationContext* txn, BSONObjBuilder& b); - bool needCollectionMetadata(OperationContext* txn, const std::string& ns) const; + bool needCollectionMetadata(OperationContext* txn, const std::string& ns); std::shared_ptr<CollectionMetadata> getCollectionMetadata(const std::string& ns); @@ -319,6 +324,8 @@ private: bool useRequestedVersion, ChunkVersion* latestShardVersion); + void _updateConfigServerOpTimeFromMetadata_inlock(OperationContext* txn); + // Manages the state of the migration donor shard MigrationSourceManager _migrationSourceManager; @@ -328,8 +335,16 @@ private: // Protects state below stdx::mutex _mutex; - // Whether ::initialize has been called - bool _enabled{false}; + enum class InitializationState { + kUninitialized, + kInitializing, + kInitialized, + }; + + InitializationState _initializationState; + + // Signaled when ::initialize finishes. + stdx::condition_variable _initializationFinishedCondition; // Sets the shard name for this host (comes through setShardVersion) std::string _shardName; diff --git a/src/mongo/dbtests/framework.cpp b/src/mongo/dbtests/framework.cpp index 6a084185c9b..83f583c6790 100644 --- a/src/mongo/dbtests/framework.cpp +++ b/src/mongo/dbtests/framework.cpp @@ -39,6 +39,7 @@ #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/service_context.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/dbtests/config_server_fixture.h" #include "mongo/dbtests/dbtests.h" #include "mongo/dbtests/framework_options.h" #include "mongo/s/catalog/catalog_manager.h" @@ -48,6 +49,7 @@ #include "mongo/util/assert_util.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/version.h" namespace mongo { @@ -69,6 +71,9 @@ int runDbTests(int argc, char** argv) { auto txn = cc().makeOperationContext(); // Initialize the sharding state so we can run sharding tests in isolation + auto connectHook = stdx::make_unique<CustomConnectHook>(txn.get()); + ConnectionString::setConnectionHook(connectHook.get()); + ON_BLOCK_EXIT([] { ConnectionString::setConnectionHook(nullptr); }); ShardingState::get(txn.get())->initialize(txn.get(), "$dummy:10000"); } diff --git a/src/mongo/rpc/SConscript b/src/mongo/rpc/SConscript index c15c4c69a68..2b171fe0ef9 100644 --- a/src/mongo/rpc/SConscript +++ b/src/mongo/rpc/SConscript @@ -139,8 +139,7 @@ env.Library( source=[ 'metadata.cpp', 'metadata/audit_metadata.cpp', - 'metadata/config_server_request_metadata.cpp', - 'metadata/config_server_response_metadata.cpp', + 'metadata/config_server_metadata.cpp', 'metadata/server_selection_metadata.cpp', 'metadata/sharding_metadata.cpp', 'metadata/repl_set_metadata.cpp', @@ -195,9 +194,9 @@ env.CppUnitTest( ) env.CppUnitTest( - target='config_server_response_metadata_test', + target='config_server_metadata_test', source=[ - 'metadata/config_server_response_metadata_test.cpp', + 'metadata/config_server_metadata_test.cpp', ], LIBDEPS=['metadata'] ) diff --git a/src/mongo/rpc/metadata.cpp b/src/mongo/rpc/metadata.cpp index 30b435e2872..f934e8afa2f 100644 --- a/src/mongo/rpc/metadata.cpp +++ b/src/mongo/rpc/metadata.cpp @@ -33,6 +33,7 @@ #include "mongo/client/dbclientinterface.h" #include "mongo/db/jsobj.h" #include "mongo/rpc/metadata/audit_metadata.h" +#include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/sharding_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" @@ -56,6 +57,12 @@ Status readRequestMetadata(OperationContext* txn, const BSONObj& metadataObj) { } AuditMetadata::get(txn) = std::move(swAuditMetadata.getValue()); + auto configServerMetadata = ConfigServerMetadata::readFromMetadata(metadataObj); + if (!configServerMetadata.isOK()) { + return configServerMetadata.getStatus(); + } + ConfigServerMetadata::get(txn) = std::move(configServerMetadata.getValue()); + return Status::OK(); } diff --git a/src/mongo/rpc/metadata/config_server_response_metadata.cpp b/src/mongo/rpc/metadata/config_server_metadata.cpp index 14608f8d915..5119ed05c20 100644 --- a/src/mongo/rpc/metadata/config_server_response_metadata.cpp +++ b/src/mongo/rpc/metadata/config_server_metadata.cpp @@ -26,7 +26,7 @@ * it in the license file. */ -#include "mongo/rpc/metadata/config_server_response_metadata.h" +#include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/bson/util/bson_check.h" #include "mongo/bson/util/bson_extract.h" @@ -45,17 +45,19 @@ const char kOpTimeFieldName[] = "opTime"; } // unnamed namespace -ConfigServerResponseMetadata::ConfigServerResponseMetadata(OpTime opTime) - : _opTime(std::move(opTime)) {} +const OperationContext::Decoration<ConfigServerMetadata> ConfigServerMetadata::get = + OperationContext::declareDecoration<ConfigServerMetadata>(); -StatusWith<ConfigServerResponseMetadata> ConfigServerResponseMetadata::readFromMetadata( +ConfigServerMetadata::ConfigServerMetadata(OpTime opTime) : _opTime(std::move(opTime)) {} + +StatusWith<ConfigServerMetadata> ConfigServerMetadata::readFromMetadata( const BSONObj& metadataObj) { BSONElement configMetadataElement; Status status = bsonExtractTypedField(metadataObj, kRootFieldName, Object, &configMetadataElement); if (status == ErrorCodes::NoSuchKey) { - return ConfigServerResponseMetadata{}; + return ConfigServerMetadata{}; } else if (!status.isOK()) { return status; } @@ -68,13 +70,13 @@ StatusWith<ConfigServerResponseMetadata> ConfigServerResponseMetadata::readFromM return status; } - return ConfigServerResponseMetadata(std::move(opTime)); + return ConfigServerMetadata(std::move(opTime)); } -void ConfigServerResponseMetadata::writeToMetadata(BSONObjBuilder* builder) const { - invariant(_opTime.is_initialized()); +void ConfigServerMetadata::writeToMetadata(BSONObjBuilder* builder) const { + invariant(_opTime); BSONObjBuilder configMetadataBuilder(builder->subobjStart(kRootFieldName)); - _opTime.get().append(&configMetadataBuilder, kOpTimeFieldName); + _opTime->append(&configMetadataBuilder, kOpTimeFieldName); } } // namespace rpc diff --git a/src/mongo/rpc/metadata/config_server_response_metadata.h b/src/mongo/rpc/metadata/config_server_metadata.h index 7d96fdc663d..86eabfddf9b 100644 --- a/src/mongo/rpc/metadata/config_server_response_metadata.h +++ b/src/mongo/rpc/metadata/config_server_metadata.h @@ -28,6 +28,7 @@ #pragma once +#include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" namespace mongo { @@ -38,28 +39,31 @@ class BSONObjBuilder; namespace rpc { /** - * This class encapsulates the response that mongod will return to mongos on every - * command, containing metadata information about the config servers. + * This class encapsulates the metadata sent between shard mongods and mongos on every command + * request and response, containing metadata information about the config servers. * * format: * configsvr: { * opTime: {ts: Timestamp(0, 0), t: 0} * } */ -class ConfigServerResponseMetadata { +class ConfigServerMetadata { public: - ConfigServerResponseMetadata() = default; - explicit ConfigServerResponseMetadata(repl::OpTime opTime); + static const OperationContext::Decoration<ConfigServerMetadata> get; + + ConfigServerMetadata() = default; + explicit ConfigServerMetadata(repl::OpTime opTime); /** - * Parses the response metadata from the given metadata object. + * Parses the metadata from the given metadata object. * Returns a non-ok status on parse error. - * If no metadata is found, returns a default-constructed ConfigServerResponseMetadata. + * If no metadata is found, returns a default-constructed ConfigServerMetadata. */ - static StatusWith<ConfigServerResponseMetadata> readFromMetadata(const BSONObj& doc); + static StatusWith<ConfigServerMetadata> readFromMetadata(const BSONObj& doc); /** - * Writes the request metadata to the given BSONObjBuilder for building a command request. + * Writes the metadata to the given BSONObjBuilder for building a command request or response + * metadata. * Only valid to call if _opTime is initialized. */ void writeToMetadata(BSONObjBuilder* builder) const; diff --git a/src/mongo/rpc/metadata/config_server_response_metadata_test.cpp b/src/mongo/rpc/metadata/config_server_metadata_test.cpp index 7e70749dc25..7f0a66a12d7 100644 --- a/src/mongo/rpc/metadata/config_server_response_metadata_test.cpp +++ b/src/mongo/rpc/metadata/config_server_metadata_test.cpp @@ -27,7 +27,7 @@ */ #include "mongo/db/jsobj.h" -#include "mongo/rpc/metadata/config_server_response_metadata.h" +#include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -38,7 +38,7 @@ using repl::OpTime; TEST(ConfigSvrMetadataTest, Roundtrip) { OpTime opTime(Timestamp(1234, 100), 5); - ConfigServerResponseMetadata metadata(opTime); + ConfigServerMetadata metadata(opTime); ASSERT_EQ(opTime, metadata.getOpTime().get()); @@ -52,7 +52,7 @@ TEST(ConfigSvrMetadataTest, Roundtrip) { BSONObj serializedObj = builder.obj(); ASSERT_EQ(expectedObj, serializedObj); - auto cloneStatus = ConfigServerResponseMetadata::readFromMetadata(serializedObj); + auto cloneStatus = ConfigServerMetadata::readFromMetadata(serializedObj); ASSERT_OK(cloneStatus.getStatus()); const auto& clonedMetadata = cloneStatus.getValue(); diff --git a/src/mongo/rpc/metadata/config_server_request_metadata.cpp b/src/mongo/rpc/metadata/config_server_request_metadata.cpp deleted file mode 100644 index 0bdad35037f..00000000000 --- a/src/mongo/rpc/metadata/config_server_request_metadata.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/rpc/metadata/config_server_request_metadata.h" - -#include "mongo/bson/util/bson_extract.h" -#include "mongo/db/jsobj.h" -#include "mongo/rpc/metadata.h" - -namespace mongo { -namespace rpc { - -using repl::OpTime; - -namespace { -const char kConfigsvrOpTimeFieldName[] = "configsvrOpTime"; -} // namespace - -ConfigServerRequestMetadata::ConfigServerRequestMetadata(OpTime opTime) - : _opTime(std::move(opTime)) {} - -StatusWith<ConfigServerRequestMetadata> ConfigServerRequestMetadata::readFromCommand( - const BSONObj& cmdObj) { - repl::OpTime opTime; - Status status = bsonExtractOpTimeField(cmdObj, kConfigsvrOpTimeFieldName, &opTime); - if (status == ErrorCodes::NoSuchKey) { - return ConfigServerRequestMetadata{}; - } else if (!status.isOK()) { - return status; - } - return ConfigServerRequestMetadata(opTime); -} - -void ConfigServerRequestMetadata::writeToCommand(BSONObjBuilder* builder) const { - invariant(_opTime.is_initialized()); - _opTime->append(builder, kConfigsvrOpTimeFieldName); -} - -} // namespace rpc -} // namespace mongo diff --git a/src/mongo/rpc/metadata/config_server_request_metadata.h b/src/mongo/rpc/metadata/config_server_request_metadata.h deleted file mode 100644 index 33d74d7f64d..00000000000 --- a/src/mongo/rpc/metadata/config_server_request_metadata.h +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <boost/optional.hpp> - -#include "mongo/db/repl/optime.h" - -namespace mongo { - -class BSONObj; -template <typename T> -class StatusWith; - -namespace rpc { - -/** - * This class encapsulates the extra information that mongos may attach to commands it sends to - * mongods, containing metadata information about the config servers. - * - * format: - * configsvrOpTime: {ts: Timestamp(0, 0), t: 0} - * - * TODO(SERVER-20442): Currently this extracts the config server information from the main command - * description rather than the actual OP_COMMAND metadata section. Ideally this information - * should be in the metadata, but we currently have no good way to add metadata to all commands - * being *sent* to another server. - */ -class ConfigServerRequestMetadata { -public: - ConfigServerRequestMetadata() = default; - explicit ConfigServerRequestMetadata(repl::OpTime opTime); - - /** - * Parses the request metadata from the given command object. - * Returns a non-ok status on parse error. - * If no metadata is found, returns a default-constructed ConfigServerRequestMetadata. - */ - static StatusWith<ConfigServerRequestMetadata> readFromCommand(const BSONObj& doc); - - /** - * Writes the request metadata to the given BSONObjBuilder for building a command request. - * Only valid to call if _opTime is initialized. - */ - void writeToCommand(BSONObjBuilder* builder) const; - - /** - * Returns the OpTime of the most recent operation on the config servers that this - * shard has seen. - */ - boost::optional<repl::OpTime> getOpTime() const { - return _opTime; - } - -private: - const boost::optional<repl::OpTime> _opTime = boost::none; -}; - -} // namespace rpc -} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index 26e39e9306b..1d9596f4f8e 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -628,7 +628,7 @@ Status CatalogManagerLegacy::dropCollection(OperationContext* txn, const Namespa shardEntry.getName(), fassertStatusOK(28753, ConnectionString::parse(shardEntry.getHost())), ns, - ChunkVersionAndOpTime(ChunkVersion::DROPPED()), + ChunkVersion::DROPPED(), true); auto ssvResult = shardRegistry->runCommandWithNotMasterRetries( diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index c20b7075b14..1c334e80b98 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -216,7 +216,7 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn, dbPrimaryShardId, primaryShard->getConnString(), NamespaceString(ns), - ChunkVersionAndOpTime(manager->getVersion(), manager->getConfigOpTime()), + manager->getVersion(), true); auto ssvStatus = grid.shardRegistry()->runCommandWithNotMasterRetries( @@ -549,19 +549,13 @@ Status CatalogManagerReplicaSet::dropCollection(OperationContext* txn, const Nam LOG(1) << "dropCollection " << ns << " collection marked as dropped"; - // We just called updateCollection above and this would have advanced the config op time, so use - // the latest value. On the MongoD side, we need to load the latest config metadata, which - // indicates that the collection was dropped. - const ChunkVersionAndOpTime droppedVersion(ChunkVersion::DROPPED(), - grid.shardRegistry()->getConfigOpTime()); - for (const auto& shardEntry : allShards) { SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( grid.shardRegistry()->getConfigServerConnectionString(), shardEntry.getName(), fassertStatusOK(28781, ConnectionString::parse(shardEntry.getHost())), ns, - droppedVersion, + ChunkVersion::DROPPED(), true); auto ssvResult = shardRegistry->runCommandWithNotMasterRetries( diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index 320ba5a4fb8..5993b4e68af 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -320,8 +320,7 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* txn, << "; no metadata found"); } - *endpoint = - new ShardEndpoint(_primary->getId(), ChunkVersionAndOpTime(ChunkVersion::UNSHARDED())); + *endpoint = new ShardEndpoint(_primary->getId(), ChunkVersion::UNSHARDED()); return Status::OK(); } } @@ -497,10 +496,7 @@ Status ChunkManagerTargeter::targetQuery(const BSONObj& query, for (const ShardId& shardId : shardIds) { endpoints->push_back(new ShardEndpoint( - shardId, - _manager - ? ChunkVersionAndOpTime(_manager->getVersion(shardId), _manager->getConfigOpTime()) - : ChunkVersionAndOpTime(ChunkVersion::UNSHARDED()))); + shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); } return Status::OK(); @@ -520,9 +516,7 @@ Status ChunkManagerTargeter::targetShardKey(OperationContext* txn, _stats.chunkSizeDelta[chunk->getMin()] += estDataSize; } - *endpoint = new ShardEndpoint(chunk->getShardId(), - ChunkVersionAndOpTime(_manager->getVersion(chunk->getShardId()), - _manager->getConfigOpTime())); + *endpoint = new ShardEndpoint(chunk->getShardId(), _manager->getVersion(chunk->getShardId())); return Status::OK(); } @@ -543,10 +537,7 @@ Status ChunkManagerTargeter::targetCollection(vector<ShardEndpoint*>* endpoints) for (const ShardId& shardId : shardIds) { endpoints->push_back(new ShardEndpoint( - shardId, - _manager - ? ChunkVersionAndOpTime(_manager->getVersion(shardId), _manager->getConfigOpTime()) - : ChunkVersionAndOpTime(ChunkVersion::UNSHARDED()))); + shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); } return Status::OK(); @@ -564,10 +555,7 @@ Status ChunkManagerTargeter::targetAllShards(vector<ShardEndpoint*>* endpoints) for (const ShardId& shardId : shardIds) { endpoints->push_back(new ShardEndpoint( - shardId, - _manager - ? ChunkVersionAndOpTime(_manager->getVersion(shardId), _manager->getConfigOpTime()) - : ChunkVersionAndOpTime(ChunkVersion::UNSHARDED()))); + shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); } return Status::OK(); diff --git a/src/mongo/s/chunk_version.cpp b/src/mongo/s/chunk_version.cpp index 36f85b2e1b5..2006071760c 100644 --- a/src/mongo/s/chunk_version.cpp +++ b/src/mongo/s/chunk_version.cpp @@ -34,7 +34,6 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/base/status_with.h" #include "mongo/bson/util/bson_extract.h" -#include "mongo/rpc/metadata/config_server_request_metadata.h" #include "mongo/util/mongoutils/str.h" namespace mongo { @@ -97,61 +96,12 @@ StatusWith<ChunkVersion> ChunkVersion::parseFromBSONForSetShardVersion(const BSO return chunkVersion; } - -ChunkVersionAndOpTime::ChunkVersionAndOpTime(ChunkVersion chunkVersion) - : _verAndOpT(chunkVersion) {} - -ChunkVersionAndOpTime::ChunkVersionAndOpTime(ChunkVersion chunkVersion, repl::OpTime ts) - : _verAndOpT(chunkVersion, ts) {} - -StatusWith<ChunkVersionAndOpTime> ChunkVersionAndOpTime::parseFromBSONForCommands( - const BSONObj& obj) { - const auto chunkVersionStatus = ChunkVersion::parseFromBSONForCommands(obj); - if (!chunkVersionStatus.isOK()) - return chunkVersionStatus.getStatus(); - - const ChunkVersion& chunkVersion = chunkVersionStatus.getValue(); - - const auto requestMetadataStatus = rpc::ConfigServerRequestMetadata::readFromCommand(obj); - if (!requestMetadataStatus.isOK()) { - return requestMetadataStatus.getStatus(); - } - auto opTime = requestMetadataStatus.getValue().getOpTime(); - if (opTime.is_initialized()) { - return ChunkVersionAndOpTime(chunkVersion, opTime.get()); - } else { - return ChunkVersionAndOpTime(chunkVersion); - } -} - -StatusWith<ChunkVersionAndOpTime> ChunkVersionAndOpTime::parseFromBSONForSetShardVersion( - const BSONObj& obj) { - const auto chunkVersionStatus = ChunkVersion::parseFromBSONForSetShardVersion(obj); - if (!chunkVersionStatus.isOK()) - return chunkVersionStatus.getStatus(); - - const ChunkVersion& chunkVersion = chunkVersionStatus.getValue(); - - const auto requestMetadataStatus = rpc::ConfigServerRequestMetadata::readFromCommand(obj); - if (!requestMetadataStatus.isOK()) { - return requestMetadataStatus.getStatus(); - } - auto opTime = requestMetadataStatus.getValue().getOpTime(); - if (opTime.is_initialized()) { - return ChunkVersionAndOpTime(chunkVersion, opTime.get()); - } else { - return ChunkVersionAndOpTime(chunkVersion); - } -} - -void ChunkVersionAndOpTime::appendForSetShardVersion(BSONObjBuilder* builder) const { - _verAndOpT.value.addToBSON(*builder, kVersion); - rpc::ConfigServerRequestMetadata(_verAndOpT.opTime).writeToCommand(builder); +void ChunkVersion::appendForSetShardVersion(BSONObjBuilder* builder) const { + addToBSON(*builder, kVersion); } -void ChunkVersionAndOpTime::appendForCommands(BSONObjBuilder* builder) const { - builder->appendArray(kShardVersion, _verAndOpT.value.toBSON()); - rpc::ConfigServerRequestMetadata(_verAndOpT.opTime).writeToCommand(builder); +void ChunkVersion::appendForCommands(BSONObjBuilder* builder) const { + builder->appendArray(kShardVersion, toBSON()); } } // namespace mongo diff --git a/src/mongo/s/chunk_version.h b/src/mongo/s/chunk_version.h index 25034d4c5f4..fc2f637be10 100644 --- a/src/mongo/s/chunk_version.h +++ b/src/mongo/s/chunk_version.h @@ -372,6 +372,17 @@ public: b.appendElements(toBSONWithPrefix(prefix)); } + /** + * Appends the contents to the specified builder in the format expected by the setShardVersion + * command. + */ + void appendForSetShardVersion(BSONObjBuilder* builder) const; + + /** + * Appends the contents to the specified builder in the format expected by the write commands. + */ + void appendForCommands(BSONObjBuilder* builder) const; + BSONObj toBSON() const { // ChunkVersion wants to be an array. BSONArrayBuilder b; @@ -404,51 +415,4 @@ inline std::ostream& operator<<(std::ostream& s, const ChunkVersion& v) { return s; } - -/** - * Represents a chunk version along with the optime from when it was retrieved. Provides logic to - * serialize and deserialize the combo to BSON. - */ -class ChunkVersionAndOpTime { -public: - ChunkVersionAndOpTime(ChunkVersion chunkVersion); - ChunkVersionAndOpTime(ChunkVersion chunkVersion, repl::OpTime ts); - - const ChunkVersion& getVersion() const { - return _verAndOpT.value; - } - - const repl::OpTime& getOpTime() const { - return _verAndOpT.opTime; - } - - /** - * Interprets the contents of the BSON documents as having been constructed in the format for - * write commands. The optime component is optional for backwards compatibility and if not - * present, the optime will be default initialized. - */ - static StatusWith<ChunkVersionAndOpTime> parseFromBSONForCommands(const BSONObj& obj); - - /** - * Interprets the contents of the BSON document as having been constructed in the format for the - * setShardVersion command. The optime component is optional for backwards compatibility and if - * not present, the optime will be default initialized. - */ - static StatusWith<ChunkVersionAndOpTime> parseFromBSONForSetShardVersion(const BSONObj& obj); - - /** - * Appends the contents to the specified builder in the format expected by the setShardVersion - * command. - */ - void appendForSetShardVersion(BSONObjBuilder* builder) const; - - /** - * Appends the contents to the specified builder in the format expected by the write commands. - */ - void appendForCommands(BSONObjBuilder* builder) const; - -private: - OpTimePair<ChunkVersion> _verAndOpT; -}; - } // namespace mongo diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 2fd476bc7de..91a6aaa09d2 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -42,7 +42,7 @@ #include "mongo/db/client.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/rpc/metadata/config_server_response_metadata.h" +#include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_shard.h" @@ -287,6 +287,11 @@ void ShardRegistry::_updateLookupMapsForShard_inlock(shared_ptr<Shard> shard, if (newConnString.type() == ConnectionString::SET) { _rsLookup[newConnString.getSetName()] = shard; + } else if (newConnString.type() == ConnectionString::CUSTOM) { + // CUSTOM connection strings (ie "$dummy:10000) become DBDirectClient connections which + // always return "localhost" as their resposne to getServerAddress(). This is just for + // making dbtest work. + _lookup["localhost"] = shard; } // TODO: The only reason to have the shard host names in the lookup table is for the @@ -425,7 +430,7 @@ StatusWith<ShardRegistry::QueryResponse> ShardRegistry::exhaustiveFindOnConfigNo } Status ShardRegistry::_advanceConfigOpTimeFromMetadata(const BSONObj& metadata) { - auto configMetadata = rpc::ConfigServerResponseMetadata::readFromMetadata(metadata); + auto configMetadata = rpc::ConfigServerMetadata::readFromMetadata(metadata); if (!configMetadata.isOK()) { return configMetadata.getStatus(); } diff --git a/src/mongo/s/client/sharding_connection_hook.cpp b/src/mongo/s/client/sharding_connection_hook.cpp index 7a178055353..24cd459db27 100644 --- a/src/mongo/s/client/sharding_connection_hook.cpp +++ b/src/mongo/s/client/sharding_connection_hook.cpp @@ -42,7 +42,7 @@ #include "mongo/db/client.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/audit_metadata.h" -#include "mongo/rpc/metadata/config_server_response_metadata.h" +#include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/s/client/scc_fast_query_handler.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_last_error_info.h" @@ -55,6 +55,12 @@ namespace mongo { using std::string; namespace { + +// A hook that parses the reply metadata from every response to a command sent from a DBClient +// created by mongos or a sharding aware mongod and being used for sharded operations. +// Used by mongos to capture the GLE stats so that we can target the correct node when subsequent +// getLastError calls are made, as well as by both mongod and mongos to update the stored config +// server optime. Status _shardingReplyMetadataReader(const BSONObj& metadataObj, StringData hostString) { saveGLEStats(metadataObj, hostString); @@ -64,7 +70,7 @@ Status _shardingReplyMetadataReader(const BSONObj& metadataObj, StringData hostS } // If this host is a known shard of ours, look for a config server optime in the response // metadata to use to update our notion of the current config server optime. - auto responseStatus = rpc::ConfigServerResponseMetadata::readFromMetadata(metadataObj); + auto responseStatus = rpc::ConfigServerMetadata::readFromMetadata(metadataObj); if (!responseStatus.isOK()) { return responseStatus.getStatus(); } @@ -74,6 +80,32 @@ Status _shardingReplyMetadataReader(const BSONObj& metadataObj, StringData hostS } return Status::OK(); } + +// A hook that will append impersonated users to the metadata of every runCommand run by a DBClient +// created by mongos or a sharding aware mongod. mongos uses this information to send information +// to mongod so that the mongod can produce auditing records attributed to the proper authenticated +// user(s). +// Additionally, if the connection is sharding-aware, also appends the stored config server optime. +Status _shardingRequestMetadataWriter(bool shardedConn, + BSONObjBuilder* metadataBob, + StringData hostStringData) { + audit::writeImpersonatedUsersToMetadata(metadataBob); + if (!shardedConn) { + return Status::OK(); + } + + // Add config server optime to metadata sent to shards. + std::string hostString = hostStringData.toString(); + auto shard = grid.shardRegistry()->getShardNoReload(hostString); + invariant(shard); + if (shard->isConfig()) { + return Status::OK(); + } + rpc::ConfigServerMetadata(grid.shardRegistry()->getConfigOpTime()).writeToMetadata(metadataBob); + + return Status::OK(); +} + } // namespace ShardingConnectionHook::ShardingConnectionHook(bool shardedConnections) @@ -93,19 +125,13 @@ void ShardingConnectionHook::onCreate(DBClientBase* conn) { } if (_shardedConnections) { - // For every DBClient created by mongos, add a hook that will capture the response from - // commands we pass along from the client, so that we can target the correct node when - // subsequent getLastError calls are made by mongos. conn->setReplyMetadataReader(_shardingReplyMetadataReader); } - // For every DBClient created by mongos, add a hook that will append impersonated users - // to the end of every runCommand. mongod uses this information to produce auditing - // records attributed to the proper authenticated user(s). - conn->setRequestMetadataWriter([](BSONObjBuilder* metadataBob, StringData) -> Status { - audit::writeImpersonatedUsersToMetadata(metadataBob); - return Status::OK(); - }); + conn->setRequestMetadataWriter( + [this](BSONObjBuilder* metadataBob, StringData hostStringData) -> Status { + return _shardingRequestMetadataWriter(_shardedConnections, metadataBob, hostStringData); + }); // For every SCC created, add a hook that will allow fastest-config-first config reads if // the appropriate server options are set. diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h index 79c6d517445..44bc5be18c2 100644 --- a/src/mongo/s/mock_ns_targeter.h +++ b/src/mongo/s/mock_ns_targeter.h @@ -211,11 +211,8 @@ private: inline void assertEndpointsEqual(const ShardEndpoint& endpointA, const ShardEndpoint& endpointB) { ASSERT_EQUALS(endpointA.shardName, endpointB.shardName); - ASSERT_EQUALS(endpointA.shardVersion.getVersion().toLong(), - endpointB.shardVersion.getVersion().toLong()); - ASSERT_EQUALS(endpointA.shardVersion.getVersion().epoch(), - endpointB.shardVersion.getVersion().epoch()); - ASSERT_EQUALS(endpointA.shardVersion.getOpTime(), endpointB.shardVersion.getOpTime()); + ASSERT_EQUALS(endpointA.shardVersion.toLong(), endpointB.shardVersion.toLong()); + ASSERT_EQUALS(endpointA.shardVersion.epoch(), endpointB.shardVersion.epoch()); } } // namespace mongo diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index 1e6182b67b7..56dfefc290b 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -157,11 +157,11 @@ struct ShardEndpoint { ShardEndpoint(const ShardEndpoint& other) : shardName(other.shardName), shardVersion(other.shardVersion) {} - ShardEndpoint(const std::string& shardName, const ChunkVersionAndOpTime& shardVersion) + ShardEndpoint(const std::string& shardName, const ChunkVersion& shardVersion) : shardName(shardName), shardVersion(shardVersion) {} const std::string shardName; - const ChunkVersionAndOpTime shardVersion; + const ChunkVersion shardVersion; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index ec59ddd9aad..a30b9a55655 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -259,12 +259,11 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, lpqToForward->asFindCommand(&cmdBuilder); if (chunkManager) { - ChunkVersionAndOpTime versionAndOpTime(chunkManager->getVersion(shard->getId()), - chunkManager->getConfigOpTime()); - versionAndOpTime.appendForCommands(&cmdBuilder); + ChunkVersion version(chunkManager->getVersion(shard->getId())); + version.appendForCommands(&cmdBuilder); } else if (!query.nss().isOnInternalDb()) { - ChunkVersionAndOpTime versionAndOpTime(ChunkVersion::UNSHARDED()); - versionAndOpTime.appendForCommands(&cmdBuilder); + ChunkVersion version(ChunkVersion::UNSHARDED()); + version.appendForCommands(&cmdBuilder); } params.remotes.emplace_back( diff --git a/src/mongo/s/set_shard_version_request.cpp b/src/mongo/s/set_shard_version_request.cpp index 90bb3f9fb00..b99fcc06089 100644 --- a/src/mongo/s/set_shard_version_request.cpp +++ b/src/mongo/s/set_shard_version_request.cpp @@ -63,7 +63,7 @@ SetShardVersionRequest::SetShardVersionRequest(ConnectionString configServer, std::string shardName, ConnectionString shardConnectionString, NamespaceString nss, - ChunkVersionAndOpTime version, + ChunkVersion version, bool isAuthoritative) : _init(false), _isAuthoritative(isAuthoritative), @@ -96,7 +96,7 @@ SetShardVersionRequest SetShardVersionRequest::makeForVersioning( const std::string& shardName, const ConnectionString& shardConnectionString, const NamespaceString& nss, - const ChunkVersionAndOpTime& nssVersion, + const ChunkVersion& nssVersion, bool isAuthoritative) { return SetShardVersionRequest( configServer, shardName, shardConnectionString, nss, nssVersion, isAuthoritative); @@ -107,7 +107,7 @@ SetShardVersionRequest SetShardVersionRequest::makeForVersioningNoPersist( const std::string& shardName, const ConnectionString& shard, const NamespaceString& nss, - const ChunkVersionAndOpTime& nssVersion, + const ChunkVersion& nssVersion, bool isAuthoritative) { auto ssv = makeForVersioning(configServer, shardName, shard, nss, nssVersion, isAuthoritative); ssv._noConnectionVersioning = true; @@ -193,7 +193,7 @@ StatusWith<SetShardVersionRequest> SetShardVersionRequest::parseFromBSON(const B } { - auto versionStatus = ChunkVersionAndOpTime::parseFromBSONForSetShardVersion(cmdObj); + auto versionStatus = ChunkVersion::parseFromBSONForSetShardVersion(cmdObj); if (!versionStatus.isOK()) return versionStatus.getStatus(); @@ -231,7 +231,7 @@ const NamespaceString& SetShardVersionRequest::getNS() const { const ChunkVersion SetShardVersionRequest::getNSVersion() const { invariant(!_init); - return _version.get().getVersion(); + return _version.get(); } } // namespace mongo diff --git a/src/mongo/s/set_shard_version_request.h b/src/mongo/s/set_shard_version_request.h index aeb6e6ef267..af86db021e9 100644 --- a/src/mongo/s/set_shard_version_request.h +++ b/src/mongo/s/set_shard_version_request.h @@ -84,7 +84,7 @@ public: const std::string& shardName, const ConnectionString& shard, const NamespaceString& nss, - const ChunkVersionAndOpTime& nssVersion, + const ChunkVersion& nssVersion, bool isAuthoritative); /** @@ -95,13 +95,12 @@ public: * with operations that do per-operation versioning, and do not depend on the connection being * marked as sharded. */ - static SetShardVersionRequest makeForVersioningNoPersist( - const ConnectionString& configServer, - const std::string& shardName, - const ConnectionString& shard, - const NamespaceString& nss, - const ChunkVersionAndOpTime& nssVersion, - bool isAuthoritative); + static SetShardVersionRequest makeForVersioningNoPersist(const ConnectionString& configServer, + const std::string& shardName, + const ConnectionString& shard, + const NamespaceString& nss, + const ChunkVersion& nssVersion, + bool isAuthoritative); /** * Parses an SSV request from a set shard version command. @@ -172,7 +171,7 @@ private: std::string shardName, ConnectionString shardConnectionString, NamespaceString nss, - ChunkVersionAndOpTime version, + ChunkVersion version, bool isAuthoritative); SetShardVersionRequest(); @@ -188,7 +187,7 @@ private: // These values are only set if _init is false boost::optional<NamespaceString> _nss; - boost::optional<ChunkVersionAndOpTime> _version; + boost::optional<ChunkVersion> _version; }; } // namespace mongo diff --git a/src/mongo/s/set_shard_version_request_test.cpp b/src/mongo/s/set_shard_version_request_test.cpp index c29f749aa64..0a001583d89 100644 --- a/src/mongo/s/set_shard_version_request_test.cpp +++ b/src/mongo/s/set_shard_version_request_test.cpp @@ -253,8 +253,7 @@ TEST(SetShardVersionRequest, ToSSVCommandInitNoConnectionVersioning) { } TEST(SetShardVersionRequest, ToSSVCommandFull) { - const ChunkVersionAndOpTime chunkVersion(ChunkVersion(1, 2, OID::gen()), - repl::OpTime(Timestamp(10), 20LL)); + const ChunkVersion chunkVersion(1, 2, OID::gen()); SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioning( configCS, "TestShard", shardCS, NamespaceString("db.coll"), chunkVersion, false); @@ -267,7 +266,7 @@ TEST(SetShardVersionRequest, ToSSVCommandFull) { ASSERT_EQ(ssv.getShardConnectionString().toString(), shardCS.toString()); ASSERT_EQ(ssv.getNS().ns(), "db.coll"); ASSERT_EQ(ssv.getNSVersion().toBSONWithPrefix("version"), - chunkVersion.getVersion().toBSONWithPrefix("version")); + chunkVersion.toBSONWithPrefix("version")); ASSERT_EQ(ssv.toBSON(), BSON("setShardVersion" @@ -276,14 +275,11 @@ TEST(SetShardVersionRequest, ToSSVCommandFull) { << configCS.toString() << "shard" << "TestShard" << "shardHost" << shardCS.toString() << "version" - << Timestamp(chunkVersion.getVersion().toLong()) << "versionEpoch" - << chunkVersion.getVersion().epoch() << "configsvrOpTime" - << BSON("ts" << Timestamp(10) << "t" << 20LL))); + << Timestamp(chunkVersion.toLong()) << "versionEpoch" << chunkVersion.epoch())); } TEST(SetShardVersionRequest, ToSSVCommandFullAuthoritative) { - const ChunkVersionAndOpTime chunkVersion(ChunkVersion(1, 2, OID::gen()), - repl::OpTime(Timestamp(10), 20LL)); + const ChunkVersion chunkVersion(1, 2, OID::gen()); SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioning( configCS, "TestShard", shardCS, NamespaceString("db.coll"), chunkVersion, true); @@ -296,7 +292,7 @@ TEST(SetShardVersionRequest, ToSSVCommandFullAuthoritative) { ASSERT_EQ(ssv.getShardConnectionString().toString(), shardCS.toString()); ASSERT_EQ(ssv.getNS().ns(), "db.coll"); ASSERT_EQ(ssv.getNSVersion().toBSONWithPrefix("version"), - chunkVersion.getVersion().toBSONWithPrefix("version")); + chunkVersion.toBSONWithPrefix("version")); ASSERT_EQ(ssv.toBSON(), BSON("setShardVersion" @@ -305,14 +301,11 @@ TEST(SetShardVersionRequest, ToSSVCommandFullAuthoritative) { << configCS.toString() << "shard" << "TestShard" << "shardHost" << shardCS.toString() << "version" - << Timestamp(chunkVersion.getVersion().toLong()) << "versionEpoch" - << chunkVersion.getVersion().epoch() << "configsvrOpTime" - << BSON("ts" << Timestamp(10) << "t" << 20LL))); + << Timestamp(chunkVersion.toLong()) << "versionEpoch" << chunkVersion.epoch())); } TEST(SetShardVersionRequest, ToSSVCommandFullNoConnectionVersioning) { - const ChunkVersionAndOpTime chunkVersion(ChunkVersion(1, 2, OID::gen()), - repl::OpTime(Timestamp(10), 20LL)); + const ChunkVersion chunkVersion(1, 2, OID::gen()); SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( configCS, "TestShard", shardCS, NamespaceString("db.coll"), chunkVersion, true); @@ -325,7 +318,7 @@ TEST(SetShardVersionRequest, ToSSVCommandFullNoConnectionVersioning) { ASSERT_EQ(ssv.getShardConnectionString().toString(), shardCS.toString()); ASSERT_EQ(ssv.getNS().ns(), "db.coll"); ASSERT_EQ(ssv.getNSVersion().toBSONWithPrefix("version"), - chunkVersion.getVersion().toBSONWithPrefix("version")); + chunkVersion.toBSONWithPrefix("version")); ASSERT_EQ(ssv.toBSON(), BSON("setShardVersion" @@ -334,10 +327,8 @@ TEST(SetShardVersionRequest, ToSSVCommandFullNoConnectionVersioning) { << configCS.toString() << "shard" << "TestShard" << "shardHost" << shardCS.toString() << "version" - << Timestamp(chunkVersion.getVersion().toLong()) << "versionEpoch" - << chunkVersion.getVersion().epoch() << "configsvrOpTime" - << BSON("ts" << Timestamp(10) << "t" << 20LL) << "noConnectionVersioning" - << true)); + << Timestamp(chunkVersion.toLong()) << "versionEpoch" << chunkVersion.epoch() + << "noConnectionVersioning" << true)); } } // namespace diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index dc298243a42..1cdfa7373ce 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -43,7 +43,9 @@ #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/metadata_hook.h" +#include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/s/catalog/forwarding_catalog_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_network_connection_hook.h" @@ -71,9 +73,20 @@ std::unique_ptr<ThreadPoolTaskExecutor> makeTaskExecutor(std::unique_ptr<Network // Same logic as sharding_connection_hook.cpp. class ShardingEgressMetadataHook final : public rpc::EgressMetadataHook { public: - Status writeRequestMetadata(const HostAndPort&, BSONObjBuilder* metadataBob) override { + Status writeRequestMetadata(const HostAndPort& target, BSONObjBuilder* metadataBob) override { try { audit::writeImpersonatedUsersToMetadata(metadataBob); + + // Add config server optime to metadata sent to shards. + std::string targetStr = target.toString(); + auto shard = grid.shardRegistry()->getShardNoReload(targetStr); + invariant(shard); + if (shard->isConfig()) { + return Status::OK(); + } + rpc::ConfigServerMetadata(grid.shardRegistry()->getConfigOpTime()) + .writeToMetadata(metadataBob); + return Status::OK(); } catch (...) { return exceptionToStatus(); @@ -83,6 +96,21 @@ public: Status readReplyMetadata(const HostAndPort& replySource, const BSONObj& metadataObj) override { try { saveGLEStats(metadataObj, replySource.toString()); + + auto shard = grid.shardRegistry()->getShardNoReload(replySource.toString()); + if (!shard) { + return Status::OK(); + } + // If this host is a known shard of ours, look for a config server optime in the + // response metadata to use to update our notion of the current config server optime. + auto responseStatus = rpc::ConfigServerMetadata::readFromMetadata(metadataObj); + if (!responseStatus.isOK()) { + return responseStatus.getStatus(); + } + auto opTime = responseStatus.getValue().getOpTime(); + if (opTime.is_initialized()) { + grid.shardRegistry()->advanceConfigOpTime(opTime.get()); + } return Status::OK(); } catch (...) { return exceptionToStatus(); @@ -131,6 +159,10 @@ Status initializeGlobalShardingState(OperationContext* txn, return status; } + if (serverGlobalParams.configsvrMode == CatalogManager::ConfigServerMode::NONE) { + grid.shardRegistry()->reload(txn); + } + return Status::OK(); } diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp index 8aa785b6d20..99b2d175d08 100644 --- a/src/mongo/s/version_manager.cpp +++ b/src/mongo/s/version_manager.cpp @@ -131,12 +131,8 @@ bool setShardVersion(OperationContext* txn, SetShardVersionRequest::makeForInit(configServer, shardId, shardCS); cmd = ssv.toBSON(); } else { - const ChunkVersionAndOpTime verAndOpT = manager - ? ChunkVersionAndOpTime(version, manager->getConfigOpTime()) - : ChunkVersionAndOpTime(version); - SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioning( - configServer, shardId, shardCS, NamespaceString(ns), verAndOpT, authoritative); + configServer, shardId, shardCS, NamespaceString(ns), version, authoritative); cmd = ssv.toBSON(); } diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index d67537714b0..3acdabb664c 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -87,13 +87,11 @@ static int compareEndpoints(const ShardEndpoint* endpointA, const ShardEndpoint* if (shardNameDiff != 0) return shardNameDiff; - long shardVersionDiff = endpointA->shardVersion.getVersion().toLong() - - endpointB->shardVersion.getVersion().toLong(); + long shardVersionDiff = endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong(); if (shardVersionDiff != 0) return shardVersionDiff; - int shardEpochDiff = endpointA->shardVersion.getVersion().epoch().compare( - endpointB->shardVersion.getVersion().epoch()); + int shardEpochDiff = endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()); return shardEpochDiff; } diff --git a/src/mongo/s/write_ops/batched_command_request.cpp b/src/mongo/s/write_ops/batched_command_request.cpp index 914d01b444f..803b0ae6739 100644 --- a/src/mongo/s/write_ops/batched_command_request.cpp +++ b/src/mongo/s/write_ops/batched_command_request.cpp @@ -256,15 +256,15 @@ bool BatchedCommandRequest::parseBSON(StringData dbName, return false; } - auto verAndOpT = ChunkVersionAndOpTime::parseFromBSONForCommands(metadataObj); - if (verAndOpT.isOK()) { - _shardVersion = verAndOpT.getValue(); + auto chunkVersion = ChunkVersion::parseFromBSONForCommands(metadataObj); + if (chunkVersion.isOK()) { + _shardVersion = chunkVersion.getValue(); return true; - } else if ((verAndOpT == ErrorCodes::NoSuchKey) && !required) { + } else if ((chunkVersion == ErrorCodes::NoSuchKey) && !required) { return true; } - *errMsg = causedBy(verAndOpT.getStatus()); + *errMsg = causedBy(chunkVersion.getStatus()); return false; } diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h index 1fc3cc6d273..1d4d51d8405 100644 --- a/src/mongo/s/write_ops/batched_command_request.h +++ b/src/mongo/s/write_ops/batched_command_request.h @@ -131,7 +131,7 @@ public: bool isOrderedSet() const; bool getOrdered() const; - void setShardVersion(ChunkVersionAndOpTime shardVersion) { + void setShardVersion(ChunkVersion shardVersion) { _shardVersion = std::move(shardVersion); } @@ -139,7 +139,7 @@ public: return _shardVersion.is_initialized(); } - const ChunkVersionAndOpTime& getShardVersion() const { + const ChunkVersion& getShardVersion() const { return _shardVersion.get(); } @@ -184,7 +184,7 @@ public: private: BatchType _batchType; - boost::optional<ChunkVersionAndOpTime> _shardVersion; + boost::optional<ChunkVersion> _shardVersion; std::unique_ptr<BatchedInsertRequest> _insertReq; std::unique_ptr<BatchedUpdateRequest> _updateReq; diff --git a/src/mongo/s/write_ops/batched_command_request_test.cpp b/src/mongo/s/write_ops/batched_command_request_test.cpp index d0ce524d3bc..192d066d449 100644 --- a/src/mongo/s/write_ops/batched_command_request_test.cpp +++ b/src/mongo/s/write_ops/batched_command_request_test.cpp @@ -70,9 +70,7 @@ TEST(BatchedCommandRequest, InsertWithShardVersion) { ASSERT_EQ("TestDB.test", insertRequest.getInsertRequest()->getNS().toString()); ASSERT(insertRequest.hasShardVersion()); - ASSERT_EQ(ChunkVersion(1, 2, epoch).toString(), - insertRequest.getShardVersion().getVersion().toString()); - ASSERT_EQ(repl::OpTime(Timestamp(3, 4), 5), insertRequest.getShardVersion().getOpTime()); + ASSERT_EQ(ChunkVersion(1, 2, epoch).toString(), insertRequest.getShardVersion().toString()); } TEST(BatchedCommandRequest, InsertWithShardVersionInLegacyMetadata) { @@ -96,9 +94,7 @@ TEST(BatchedCommandRequest, InsertWithShardVersionInLegacyMetadata) { ASSERT_EQ("TestDB.test", insertRequest.getInsertRequest()->getNS().toString()); ASSERT(insertRequest.hasShardVersion()); - ASSERT_EQ(ChunkVersion(1, 2, epoch).toString(), - insertRequest.getShardVersion().getVersion().toString()); - ASSERT_EQ(repl::OpTime(Timestamp(3, 4), 5), insertRequest.getShardVersion().getOpTime()); + ASSERT_EQ(ChunkVersion(1, 2, epoch).toString(), insertRequest.getShardVersion().toString()); } TEST(BatchedCommandRequest, InsertClone) { diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index 8cd20fa4865..25f9f13b3aa 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -127,8 +127,7 @@ Status WriteOp::targetWrites(OperationContext* txn, if (endpoints.size() == 1u) { targetedWrites->push_back(new TargetedWrite(*endpoint, ref)); } else { - ShardEndpoint broadcastEndpoint(endpoint->shardName, - ChunkVersionAndOpTime(ChunkVersion::IGNORED())); + ShardEndpoint broadcastEndpoint(endpoint->shardName, ChunkVersion::IGNORED()); targetedWrites->push_back(new TargetedWrite(broadcastEndpoint, ref)); } diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp index 233aa824337..e488c068e47 100644 --- a/src/mongo/s/write_ops/write_op_test.cpp +++ b/src/mongo/s/write_ops/write_op_test.cpp @@ -218,11 +218,11 @@ TEST(WriteOpTests, TargetMultiAllShards) { ASSERT_EQUALS(targeted.size(), 3u); sortByEndpoint(&targeted); ASSERT_EQUALS(targeted[0]->endpoint.shardName, endpointA.shardName); - ASSERT(ChunkVersion::isIgnoredVersion(targeted[0]->endpoint.shardVersion.getVersion())); + ASSERT(ChunkVersion::isIgnoredVersion(targeted[0]->endpoint.shardVersion)); ASSERT_EQUALS(targeted[1]->endpoint.shardName, endpointB.shardName); - ASSERT(ChunkVersion::isIgnoredVersion(targeted[1]->endpoint.shardVersion.getVersion())); + ASSERT(ChunkVersion::isIgnoredVersion(targeted[1]->endpoint.shardVersion)); ASSERT_EQUALS(targeted[2]->endpoint.shardName, endpointC.shardName); - ASSERT(ChunkVersion::isIgnoredVersion(targeted[2]->endpoint.shardVersion.getVersion())); + ASSERT(ChunkVersion::isIgnoredVersion(targeted[2]->endpoint.shardVersion)); writeOp.noteWriteComplete(*targeted[0]); writeOp.noteWriteComplete(*targeted[1]); |