diff options
author | Randolph Tan <randolph@10gen.com> | 2016-04-11 13:03:02 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2016-04-20 16:45:23 -0400 |
commit | 879d562c1fee80e84d6705efba7e47e714614505 (patch) | |
tree | d5f2e4d5ad7eab1fa1b2b0a64b7c880742379278 /src | |
parent | 0e45dbdbfda0ff381308b37d75235cad1da3db54 (diff) | |
download | mongo-879d562c1fee80e84d6705efba7e47e714614505.tar.gz |
SERVER-22647 opObserver on shard mongod to initialize sharding state on insert to admin.system.version
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 85 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 22 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state_test.cpp | 154 | ||||
-rw-r--r-- | src/mongo/db/s/type_shard_identity.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/s/type_shard_identity.h | 11 | ||||
-rw-r--r-- | src/mongo/db/s/type_shard_identity_test.cpp | 28 | ||||
-rw-r--r-- | src/mongo/s/catalog/catalog_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/catalog_manager_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/catalog_manager_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set.h | 2 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 9 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 56 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.h | 12 | ||||
-rw-r--r-- | src/mongo/s/sharding_test_fixture.cpp | 2 | ||||
-rw-r--r-- | src/mongo/util/time_support.h | 4 |
17 files changed, 315 insertions, 179 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index eaed991ad25..2a7299b3b25 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -42,11 +42,37 @@ #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/type_shard_identity.h" #include "mongo/s/chunk_version.h" #include "mongo/s/stale_exception.h" namespace mongo { +namespace { + +/** + * Used to perform shard identity initialization once it is certain that the document is committed. + */ +class ShardIdentityLopOpHandler final : public RecoveryUnit::Change { +public: + ShardIdentityLopOpHandler(OperationContext* txn, ShardIdentityType shardIdentity) + : _txn(txn), _shardIdentity(std::move(shardIdentity)) {} + + void commit() override { + fassertNoTrace( + 40071, + ShardingState::get(_txn)->initializeFromShardIdentity(_shardIdentity, Date_t::max())); + } + + void rollback() override {} + +private: + OperationContext* _txn; + const ShardIdentityType _shardIdentity; +}; + +} // unnamed namespace + using std::string; CollectionShardingState::CollectionShardingState( @@ -133,6 +159,17 @@ bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn, void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& insertedDoc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer && + _nss == NamespaceString::kConfigCollectionNamespace) { + if (auto idElem = insertedDoc["_id"]) { + if (idElem.str() == ShardIdentityType::IdName) { + auto shardIdentityDoc = uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc)); + txn->recoveryUnit()->registerChange( + new ShardIdentityLopOpHandler(txn, std::move(shardIdentityDoc))); + } + } + } + checkShardVersionOrThrow(txn); if (_sourceMgr) { @@ -143,6 +180,15 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + if (txn->writesAreReplicated() && serverGlobalParams.clusterRole == ClusterRole::ShardServer && + _nss == NamespaceString::kConfigCollectionNamespace) { + if (auto idElem = updatedDoc["_id"]) { + uassert(40069, + "cannot update shardIdentity document while in --shardsvr mode", + idElem.str() != ShardIdentityType::IdName); + } + } + checkShardVersionOrThrow(txn); if (_sourceMgr) { @@ -153,6 +199,15 @@ void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& u void CollectionShardingState::onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + if (txn->writesAreReplicated() && serverGlobalParams.clusterRole == ClusterRole::ShardServer && + _nss == NamespaceString::kConfigCollectionNamespace) { + if (auto idElem = deletedDocId["_id"]) { + uassert(40070, + "cannot delete shardIdentity document while in --shardsvr mode", + idElem.str() != ShardIdentityType::IdName); + } + } + checkShardVersionOrThrow(txn); if (_sourceMgr) { diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index bd79cf69884..e7a75d4122b 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -57,6 +57,12 @@ #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" + +#include <iostream> +#include <iomanip> +#include <ctime> +#include <chrono> + namespace mongo { using std::shared_ptr; @@ -112,6 +118,21 @@ VersionChoice chooseNewestVersion(ChunkVersion prevLocalVersion, return VersionChoice::Remote; } +Date_t getDeadlineFromMaxTimeMS(OperationContext* txn) { + auto remainingTime = txn->getRemainingMaxTimeMicros(); + log() << "REN: remaining: " << remainingTime; + if (remainingTime == 0) { + return Date_t::max(); + } + + if (remainingTime == 1) { + // 1 means maxTimeMS has exceeded. + return Date_t::now(); + } + + return Date_t::now() + Microseconds(remainingTime); +} + } // namespace // @@ -351,8 +372,8 @@ void ShardingState::initializeFromConfigConnString(OperationContext* txn, const } } - uassertStatusOK(_waitForInitialization(txn)); - + uassertStatusOK(_waitForInitialization(getDeadlineFromMaxTimeMS(txn))); + uassertStatusOK(reloadShardRegistryUntilSuccess(txn)); updateConfigServerOpTimeFromMetadata(txn); } @@ -382,31 +403,25 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn) { return parseStatus.getStatus(); } - return initializeFromShardIdentity(txn, parseStatus.getValue()); + auto status = + initializeFromShardIdentity(parseStatus.getValue(), getDeadlineFromMaxTimeMS(txn)); + if (!status.isOK()) { + return status; + } + + return reloadShardRegistryUntilSuccess(txn); } -Status ShardingState::initializeFromShardIdentity(OperationContext* txn, - const ShardIdentityType& shardIdentity) { - invariant(!txn->lockState()->isLocked()); +// NOTE: This method can be called inside a database lock so it should never take any database +// locks, perform I/O, or any long running operations. +Status ShardingState::initializeFromShardIdentity(const ShardIdentityType& shardIdentity, + Date_t deadline) { if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { return Status::OK(); } log() << "initializing sharding state with: " << shardIdentity; - auto parsedConfigConnStrStatus = - ConnectionString::parse(shardIdentity.getConfigsvrConnString()); - if (!parsedConfigConnStrStatus.isOK()) { - return parsedConfigConnStrStatus.getStatus(); - } - - auto configSvrConnStr = parsedConfigConnStrStatus.getValue(); - if (configSvrConnStr.type() != ConnectionString::SET) { - return {ErrorCodes::UnsupportedFormat, - str::stream() << "config server connection string can only be replica sets: " - << configSvrConnStr.toString()}; - } - stdx::unique_lock<stdx::mutex> lk(_mutex); // TODO: remove after v3.4. @@ -414,7 +429,7 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn, // commands/setShardVersion. As well as all assignments to _initializationStatus and // _setInitializationState_inlock in this method. if (_getInitializationState() == InitializationState::kInitializing) { - auto waitStatus = _waitForInitialization_inlock(txn, lk); + auto waitStatus = _waitForInitialization_inlock(deadline, lk); if (!waitStatus.isOK()) { return waitStatus; } @@ -427,6 +442,8 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn, << causedBy(_initializationStatus)}; } + auto configSvrConnStr = shardIdentity.getConfigsvrConnString(); + if (_getInitializationState() == InitializationState::kInitialized) { if (_shardName != shardIdentity.getShardName()) { return {ErrorCodes::InconsistentShardIdentity, @@ -468,7 +485,7 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn, &ConfigServer::replicaSetChangeShardRegistryUpdateHook); try { - Status status = _globalInit(txn, configSvrConnStr); + Status status = _globalInit(configSvrConnStr); // For backwards compatibility with old style inits from metadata commands. if (status.isOK()) { @@ -504,37 +521,33 @@ void ShardingState::_initializeImpl(ConnectionString configSvr) { &ConfigServer::replicaSetChangeShardRegistryUpdateHook); try { - Status status = _globalInit(txn.get(), configSvr); + Status status = _globalInit(configSvr); _signalInitializationComplete(status); } catch (const DBException& ex) { _signalInitializationComplete(ex.toStatus()); } } -Status ShardingState::_waitForInitialization(OperationContext* txn) { +Status ShardingState::_waitForInitialization(Date_t deadline) { if (enabled()) return Status::OK(); stdx::unique_lock<stdx::mutex> lk(_mutex); - return _waitForInitialization_inlock(txn, lk); + return _waitForInitialization_inlock(deadline, lk); } -Status ShardingState::_waitForInitialization_inlock(OperationContext* txn, +Status ShardingState::_waitForInitialization_inlock(Date_t deadline, stdx::unique_lock<stdx::mutex>& lk) { { - const Microseconds timeRemaining(txn->getRemainingMaxTimeMicros()); while (_getInitializationState() == InitializationState::kInitializing || _getInitializationState() == InitializationState::kNew) { - if (timeRemaining.count()) { - const auto deadline = stdx::chrono::system_clock::now() + timeRemaining; - - if (stdx::cv_status::timeout == - _initializationFinishedCondition.wait_until(lk, deadline)) { - return Status(ErrorCodes::ExceededTimeLimit, - "Initializing sharding state exceeded time limit"); - } - } else { + if (deadline == Date_t::max()) { _initializationFinishedCondition.wait(lk); + } else if (stdx::cv_status::timeout == + _initializationFinishedCondition.wait_until(lk, + deadline.toSystemTimePoint())) { + return Status(ErrorCodes::ExceededTimeLimit, + "Initializing sharding state exceeded time limit"); } } } @@ -586,7 +599,7 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, ChunkVersion* latestShardVersion) { invariant(!txn->lockState()->isLocked()); - Status status = _waitForInitialization(txn); + Status status = _waitForInitialization(getDeadlineFromMaxTimeMS(txn)); if (!status.isOK()) return status; diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 79c2906284e..672642eedde 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -40,6 +40,7 @@ #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/ticketholder.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -86,7 +87,7 @@ public: OperationContext* const _txn; }; - using GlobalInitFunc = stdx::function<Status(OperationContext*, const ConnectionString&)>; + using GlobalInitFunc = stdx::function<Status(const ConnectionString&)>; ShardingState(); ~ShardingState(); @@ -120,23 +121,30 @@ public: * * Throws if initialization fails for any reason and the sharding state object becomes unusable * afterwards. Any sharding state operations afterwards will fail. + * + * Note that this will also try to connect to the config servers and will block until it + * succeeds. */ void initializeFromConfigConnString(OperationContext* txn, const std::string& configSvr); /** * Initializes the sharding state of this server from the shard identity document from local * storage. + * + * Note that this will also try to connect to the config servers and will block until it + * succeeds. */ Status initializeFromShardIdentity(OperationContext* txn); /** * Initializes the sharding state of this server from the shard identity document argument. * This is the more genaralized form of the initializeFromShardIdentity(OperationContext*) - * method that can accept the shard identity from any source. - * This method currently blocks for network and should not be called with database locks held. + * method that can accept the shard identity from any source. Note that shardIdentity must + * be valid. + * + * Returns ErrorCodes::ExceededTimeLimit if deadline has passed. */ - Status initializeFromShardIdentity(OperationContext* txn, - const ShardIdentityType& shardIdentity); + Status initializeFromShardIdentity(const ShardIdentityType& shardIdentity, Date_t deadline); /** * Shuts down sharding machinery on the shard. @@ -283,8 +291,8 @@ private: * Blocking method, which waits for the initialization state to become kInitialized or kError * and returns the initialization status. */ - Status _waitForInitialization(OperationContext* txn); - Status _waitForInitialization_inlock(OperationContext* txn, stdx::unique_lock<stdx::mutex>& lk); + Status _waitForInitialization(Date_t deadline); + Status _waitForInitialization_inlock(Date_t deadline, stdx::unique_lock<stdx::mutex>& lk); /** * Simple wrapper to cast the initialization state atomic uint64 to InitializationState value diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp index 664f3961211..5bcee931b75 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_state_test.cpp @@ -97,11 +97,10 @@ public: _client = _service.makeClient("ShardingStateTest"); _opCtx = _client->makeOperationContext(); - _shardingState.setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr) { - initGrid(txn, connStr); - return Status::OK(); - }); + _shardingState.setGlobalInitMethodForTest([this](const ConnectionString& connStr) { + initGrid(_opCtx.get(), connStr); + return Status::OK(); + }); } void tearDown() override { @@ -133,61 +132,40 @@ private: TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) { ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity)); + ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString()); } -TEST_F(ShardingStateTest, InvalidConfigServerConnStringDoesNotParse) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString("invalid:x"); - shardIdentity.setShardName("a"); - shardIdentity.setClusterId(OID::gen()); - - auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity); - ASSERT_EQ(ErrorCodes::FailedToParse, status); - ASSERT_FALSE(shardingState()->enabled()); -} - -TEST_F(ShardingStateTest, CannotHaveNonReplConfigServerConnString) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString("a:1"); - shardIdentity.setShardName("a"); - shardIdentity.setClusterId(OID::gen()); - - auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity); - ASSERT_EQ(ErrorCodes::UnsupportedFormat, status); - ASSERT_FALSE(shardingState()->enabled()); -} - TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr) { - return Status{ErrorCodes::ShutdownInProgress, "shutting down"}; - }); + shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { + return Status{ErrorCodes::ShutdownInProgress, "shutting down"}; + }); { - auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity); + auto status = shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()); ASSERT_EQ(ErrorCodes::ShutdownInProgress, status); } // ShardingState is now in error state, attempting to call it again will still result in error. shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr) { return Status::OK(); }); + [](const ConnectionString& connStr) { return Status::OK(); }); { - auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity); + auto status = shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()); ASSERT_EQ(ErrorCodes::ManualInterventionRequired, status); } @@ -197,23 +175,24 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { auto clusterID = OID::gen(); ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity)); + ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; - shardIdentity2.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity2.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2)); + ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); @@ -223,23 +202,24 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { auto clusterID = OID::gen(); ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity)); + ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; - shardIdentity2.setConfigsvrConnString("config/b:2,c:3"); + shardIdentity2.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "b:2,c:3", "config")); shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2)); + ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); @@ -249,23 +229,24 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { auto clusterID = OID::gen(); ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity)); + ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; - shardIdentity2.setConfigsvrConnString("configRS/a:1,b:2"); + shardIdentity2.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "configRS")); shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity2); + auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()); ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); @@ -276,23 +257,24 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) { auto clusterID = OID::gen(); ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity)); + ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; - shardIdentity2.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity2.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity2.setShardName("b"); shardIdentity2.setClusterId(clusterID); - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity2); + auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()); ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); @@ -302,23 +284,24 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) { TEST_F(ShardingStateTest, InitializeAgainWithPreviouslyUnsetClusterIdSucceeds) { ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID()); - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity)); + ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; - shardIdentity2.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity2.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(OID::gen()); - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2)); + ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); @@ -327,23 +310,24 @@ TEST_F(ShardingStateTest, InitializeAgainWithPreviouslyUnsetClusterIdSucceeds) { TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity)); + ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; - shardIdentity2.setConfigsvrConnString("config/a:1,b:2"); + shardIdentity2.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(OID::gen()); - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity2); + auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()); ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); diff --git a/src/mongo/db/s/type_shard_identity.cpp b/src/mongo/db/s/type_shard_identity.cpp index 5c7f54441a0..7ebee3528b5 100644 --- a/src/mongo/db/s/type_shard_identity.cpp +++ b/src/mongo/db/s/type_shard_identity.cpp @@ -26,14 +26,16 @@ * then also delete it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/db/s/type_shard_identity.h" #include "mongo/bson/util/bson_extract.h" +#include "mongo/util/assert_util.h" namespace mongo { -const std::string ShardIdentityType::ConfigNS = "admin.system.version"; -const std::string id("shardIdentity"); +const std::string ShardIdentityType::IdName("shardIdentity"); const BSONField<std::string> configsvrConnString("configsvrConnectionString"); const BSONField<std::string> shardName("shardName"); const BSONField<OID> clusterId("clusterId"); @@ -52,9 +54,9 @@ StatusWith<ShardIdentityType> ShardIdentityType::fromBSON(const BSONObj& source) return status; } - if (docId != id) { + if (docId != IdName) { return {ErrorCodes::FailedToParse, - str::stream() << "got _id: " << docId << " instead of " << id}; + str::stream() << "got _id: " << docId << " instead of " << IdName}; } } @@ -65,7 +67,25 @@ StatusWith<ShardIdentityType> ShardIdentityType::fromBSON(const BSONObj& source) return status; } - shardIdentity.setConfigsvrConnString(connString); + try { + // Note: ConnectionString::parse can uassert from HostAndPort constructor. + auto parsedConfigConnStrStatus = ConnectionString::parse(connString); + if (!parsedConfigConnStrStatus.isOK()) { + return parsedConfigConnStrStatus.getStatus(); + } + + auto configSvrConnStr = parsedConfigConnStrStatus.getValue(); + if (configSvrConnStr.type() != ConnectionString::SET) { + return Status(ErrorCodes::UnsupportedFormat, + str::stream() + << "config server connection string can only be replica sets: " + << configSvrConnStr.toString()); + } + + shardIdentity.setConfigsvrConnString(std::move(configSvrConnStr)); + } catch (const UserException& parseException) { + return parseException.toStatus(); + } } { @@ -92,11 +112,17 @@ StatusWith<ShardIdentityType> ShardIdentityType::fromBSON(const BSONObj& source) } Status ShardIdentityType::validate() const { - if (!_configsvrConnString || _configsvrConnString->empty()) { + if (!_configsvrConnString) { return {ErrorCodes::NoSuchKey, str::stream() << "missing " << configsvrConnString() << " field"}; } + if (_configsvrConnString->type() != ConnectionString::SET) { + return {ErrorCodes::UnsupportedFormat, + str::stream() << "config connection string can only be replica sets, got " + << ConnectionString::typeToString(_configsvrConnString->type())}; + } + if (!_shardName || _shardName->empty()) { return {ErrorCodes::NoSuchKey, str::stream() << "missing " << shardName() << " field"}; } @@ -111,10 +137,10 @@ Status ShardIdentityType::validate() const { BSONObj ShardIdentityType::toBSON() const { BSONObjBuilder builder; - builder.append("_id", id); + builder.append("_id", IdName); if (_configsvrConnString) { - builder << configsvrConnString(_configsvrConnString.get()); + builder << configsvrConnString(_configsvrConnString->toString()); } if (_shardName) { @@ -136,12 +162,12 @@ bool ShardIdentityType::isConfigsvrConnStringSet() const { return _configsvrConnString.is_initialized(); } -const std::string& ShardIdentityType::getConfigsvrConnString() const { +const ConnectionString& ShardIdentityType::getConfigsvrConnString() const { invariant(_configsvrConnString); return _configsvrConnString.get(); } -void ShardIdentityType::setConfigsvrConnString(std::string connString) { +void ShardIdentityType::setConfigsvrConnString(ConnectionString connString) { _configsvrConnString = std::move(connString); } diff --git a/src/mongo/db/s/type_shard_identity.h b/src/mongo/db/s/type_shard_identity.h index 12f9fd3996c..52d31c0c39f 100644 --- a/src/mongo/db/s/type_shard_identity.h +++ b/src/mongo/db/s/type_shard_identity.h @@ -30,6 +30,7 @@ #include <string> +#include "mongo/client/connection_string.h" #include "mongo/db/jsobj.h" namespace mongo { @@ -39,8 +40,8 @@ namespace mongo { */ class ShardIdentityType { public: - // Name of the collection where the ShardIdentityType document is stored in the shard. - static const std::string ConfigNS; + // The _id value for this document type. + static const std::string IdName; /** * Constructs a new ShardIdentityType object from BSON. @@ -65,8 +66,8 @@ public: std::string toString() const; bool isConfigsvrConnStringSet() const; - const std::string& getConfigsvrConnString() const; - void setConfigsvrConnString(std::string connString); + const ConnectionString& getConfigsvrConnString() const; + void setConfigsvrConnString(ConnectionString connString); bool isShardNameSet() const; const std::string& getShardName() const; @@ -80,7 +81,7 @@ private: // Convention: (M)andatory, (O)ptional, (S)pecial rule. // (M) connection string to the config server. - boost::optional<std::string> _configsvrConnString; + boost::optional<ConnectionString> _configsvrConnString; // (M) contains the name of the shard. boost::optional<std::string> _shardName; // (M) contains the (unique) identifier of the cluster. diff --git a/src/mongo/db/s/type_shard_identity_test.cpp b/src/mongo/db/s/type_shard_identity_test.cpp index 9f97aa4c1b0..0b31e4388d3 100644 --- a/src/mongo/db/s/type_shard_identity_test.cpp +++ b/src/mongo/db/s/type_shard_identity_test.cpp @@ -54,7 +54,7 @@ TEST(ShardIdentityType, RoundTrip) { auto shardIdentity = result.getValue(); ASSERT_TRUE(shardIdentity.isConfigsvrConnStringSet()); - ASSERT_EQ("test/a:123", shardIdentity.getConfigsvrConnString()); + ASSERT_EQ("test/a:123", shardIdentity.getConfigsvrConnString().toString()); ASSERT_TRUE(shardIdentity.isShardNameSet()); ASSERT_EQ("s1", shardIdentity.getShardName()); ASSERT_TRUE(shardIdentity.isClusterIdSet()); @@ -108,5 +108,31 @@ TEST(ShardIdentityType, ParseMissingClusterId) { ASSERT_NOT_OK(result.getStatus()); } +TEST(ShardIdentityType, InvalidConnectionString) { + auto clusterId(OID::gen()); + auto doc = BSON("_id" + << "shardIdentity" + << "configsvrConnectionString" + << "test/,,," + << "shardName" + << "s1" + << "clusterId" << clusterId); + + ASSERT_EQ(ErrorCodes::FailedToParse, ShardIdentityType::fromBSON(doc).getStatus()); +} + +TEST(ShardIdentityType, NonReplSetConnectionString) { + auto clusterId(OID::gen()); + auto doc = BSON("_id" + << "shardIdentity" + << "configsvrConnectionString" + << "local:123" + << "shardName" + << "s1" + << "clusterId" << clusterId); + + ASSERT_EQ(ErrorCodes::UnsupportedFormat, ShardIdentityType::fromBSON(doc).getStatus()); +} + } // namespace mongo } // unnamed namespace diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h index 26769abb780..f4b3c9fcb5f 100644 --- a/src/mongo/s/catalog/catalog_manager.h +++ b/src/mongo/s/catalog/catalog_manager.h @@ -100,7 +100,7 @@ public: * has been installed into the global 'grid' object. Implementation do not need to guarantee * thread safety so callers should employ proper synchronization when calling this method. */ - virtual Status startup(OperationContext* txn) = 0; + virtual Status startup() = 0; /** * Performs necessary cleanup when shutting down cleanly. diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp index a3b9603c565..49f5e3213cb 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/catalog_manager_mock.cpp @@ -48,7 +48,7 @@ CatalogManagerMock::CatalogManagerMock() { CatalogManagerMock::~CatalogManagerMock() = default; -Status CatalogManagerMock::startup(OperationContext* txn) { +Status CatalogManagerMock::startup() { return {ErrorCodes::InternalError, "Method not implemented"}; } diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h index 45cdd36a4b4..48a5ba066d8 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.h +++ b/src/mongo/s/catalog/catalog_manager_mock.h @@ -41,7 +41,7 @@ public: CatalogManagerMock(); ~CatalogManagerMock(); - Status startup(OperationContext* txn) override; + Status startup() override; void shutDown(OperationContext* txn) override; diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 903fed5048f..128bb294fa9 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -324,7 +324,7 @@ CatalogManagerReplicaSet::CatalogManagerReplicaSet( CatalogManagerReplicaSet::~CatalogManagerReplicaSet() = default; -Status CatalogManagerReplicaSet::startup(OperationContext* txn) { +Status CatalogManagerReplicaSet::startup() { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_started) { return Status::OK(); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index 651200d467b..ccabbb958c9 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -53,7 +53,7 @@ public: * Safe to call multiple times as long as the calls are externally synchronized to be * non-overlapping. */ - Status startup(OperationContext* txn) override; + Status startup() override; void shutDown(OperationContext* txn) override; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 6bb4a34b759..67d08f91033 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -270,8 +270,13 @@ static void reloadSettings(OperationContext* txn) { } static Status initializeSharding(OperationContext* txn) { - Status status = initializeGlobalShardingStateForMongos( - txn, mongosGlobalParams.configdbs, mongosGlobalParams.maxChunkSizeBytes); + Status status = initializeGlobalShardingStateForMongos(mongosGlobalParams.configdbs, + mongosGlobalParams.maxChunkSizeBytes); + if (!status.isOK()) { + return status; + } + + status = reloadShardRegistryUntilSuccess(txn); if (!status.isOK()) { return status; } diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 1be0d0ae45d..71640889e5f 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -133,8 +133,7 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(std::unique_ptr<NetworkIn return executorPool; } -Status initializeGlobalShardingState(OperationContext* txn, - const ConnectionString& configCS, +Status initializeGlobalShardingState(const ConnectionString& configCS, uint64_t maxChunkSizeBytes, bool isMongos) { if (configCS.type() == ConnectionString::INVALID) { @@ -164,6 +163,7 @@ Status initializeGlobalShardingState(OperationContext* txn, shardRegistry.get(), HostAndPort(getHostName(), serverGlobalParams.port)); + auto rawCatalogManager = catalogManager.get(); grid.init( std::move(catalogManager), stdx::make_unique<CatalogCache>(), @@ -173,14 +173,38 @@ Status initializeGlobalShardingState(OperationContext* txn, std::move(executorPool), networkPtr); + Status status = rawCatalogManager->startup(); + if (!status.isOK()) { + return status; + } + + return Status::OK(); +} + +} // namespace + +Status initializeGlobalShardingStateForMongos(const ConnectionString& configCS, + uint64_t maxChunkSizeBytes) { + return initializeGlobalShardingState(configCS, maxChunkSizeBytes, true); +} + +Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS) { + return initializeGlobalShardingState(configCS, ChunkSizeSettingsType::kDefaultMaxChunkSizeBytes, false); +} + +Status reloadShardRegistryUntilSuccess(OperationContext* txn) { + if (serverGlobalParams.configsvrMode != CatalogManager::ConfigServerMode::NONE) { + return Status::OK(); + } + while (!inShutdown()) { - try { - Status status = grid.catalogManager(txn)->startup(txn); - uassertStatusOK(status); + auto stopStatus = txn->checkForInterruptNoAssert(); + if (!stopStatus.isOK()) { + return stopStatus; + } - if (serverGlobalParams.configsvrMode == CatalogManager::ConfigServerMode::NONE) { - grid.shardRegistry()->reload(txn); - } + try { + grid.shardRegistry()->reload(txn); return Status::OK(); } catch (const DBException& ex) { Status status = ex.toStatus(); @@ -198,21 +222,7 @@ Status initializeGlobalShardingState(OperationContext* txn, } } - return Status::OK(); -} - -} // namespace - -Status initializeGlobalShardingStateForMongos(OperationContext* txn, - const ConnectionString& configCS, - uint64_t maxChunkSizeBytes) { - return initializeGlobalShardingState(txn, configCS, maxChunkSizeBytes, true); -} - -Status initializeGlobalShardingStateForMongod(OperationContext* txn, - const ConnectionString& configCS) { - return initializeGlobalShardingState( - txn, configCS, ChunkSizeSettingsType::kDefaultMaxChunkSizeBytes, false); + return {ErrorCodes::ShutdownInProgress, "aborting shard loading attempt"}; } } // namespace mongo diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h index eeb1763df70..d7105768785 100644 --- a/src/mongo/s/sharding_initialization.h +++ b/src/mongo/s/sharding_initialization.h @@ -40,11 +40,15 @@ class Status; * Takes in the connection string for reaching the config servers and initializes the global * CatalogManager, ShardingRegistry, and grid objects. */ -Status initializeGlobalShardingStateForMongos(OperationContext* txn, - const ConnectionString& configCS, +Status initializeGlobalShardingStateForMongos(const ConnectionString& configCS, uint64_t maxChunkSizeBytes); -Status initializeGlobalShardingStateForMongod(OperationContext* txn, - const ConnectionString& configCS); +Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS); + +/** + * Tries to contact the config server and reload the shard registry until it succeeds or + * is interrupted. + */ +Status reloadShardRegistryUntilSuccess(OperationContext* txn); } // namespace mongo diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index 737a996e113..329bcb18c78 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -119,7 +119,7 @@ void ShardingTestFixture::setUp() { std::unique_ptr<CatalogManagerReplicaSet> cm(stdx::make_unique<CatalogManagerReplicaSet>( std::move(uniqueDistLockManager), std::move(specialExec))); _catalogManagerRS = cm.get(); - cm->startup(_opCtx.get()); + cm->startup(); ConnectionString configCS = ConnectionString::forReplicaSet( "CatalogManagerReplSetTest", {HostAndPort{"TestHost1"}, HostAndPort{"TestHost2"}}); diff --git a/src/mongo/util/time_support.h b/src/mongo/util/time_support.h index ca9fa2162f5..6f20025d832 100644 --- a/src/mongo/util/time_support.h +++ b/src/mongo/util/time_support.h @@ -181,6 +181,8 @@ public: /* * Returns a system clock time_point representing the same point in time as this Date_t. + * Warning: careful when using with Date_t::max() as it can have a value that is bigger than + * time_point can store. */ stdx::chrono::system_clock::time_point toSystemTimePoint() const; @@ -195,6 +197,8 @@ public: /** * Implicit conversion operator to system clock time point. Enables use of Date_t with * condition_variable::wait_until. + * Warning: careful when using with Date_t::max() as it can have a value that is bigger than + * time_point can store. */ operator stdx::chrono::system_clock::time_point() const { return toSystemTimePoint(); |