summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2016-04-11 13:03:02 -0400
committerRandolph Tan <randolph@10gen.com>2016-04-20 16:45:23 -0400
commit879d562c1fee80e84d6705efba7e47e714614505 (patch)
treed5f2e4d5ad7eab1fa1b2b0a64b7c880742379278 /src
parent0e45dbdbfda0ff381308b37d75235cad1da3db54 (diff)
downloadmongo-879d562c1fee80e84d6705efba7e47e714614505.tar.gz
SERVER-22647 opObserver on shard mongod to initialize sharding state on insert to admin.system.version
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp55
-rw-r--r--src/mongo/db/s/sharding_state.cpp85
-rw-r--r--src/mongo/db/s/sharding_state.h22
-rw-r--r--src/mongo/db/s/sharding_state_test.cpp154
-rw-r--r--src/mongo/db/s/type_shard_identity.cpp46
-rw-r--r--src/mongo/db/s/type_shard_identity.h11
-rw-r--r--src/mongo/db/s/type_shard_identity_test.cpp28
-rw-r--r--src/mongo/s/catalog/catalog_manager.h2
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.cpp2
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.h2
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp2
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.h2
-rw-r--r--src/mongo/s/server.cpp9
-rw-r--r--src/mongo/s/sharding_initialization.cpp56
-rw-r--r--src/mongo/s/sharding_initialization.h12
-rw-r--r--src/mongo/s/sharding_test_fixture.cpp2
-rw-r--r--src/mongo/util/time_support.h4
17 files changed, 315 insertions, 179 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index eaed991ad25..2a7299b3b25 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -42,11 +42,37 @@
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/type_shard_identity.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/stale_exception.h"
namespace mongo {
+namespace {
+
+/**
+ * Used to perform shard identity initialization once it is certain that the document is committed.
+ */
+class ShardIdentityLopOpHandler final : public RecoveryUnit::Change {
+public:
+ ShardIdentityLopOpHandler(OperationContext* txn, ShardIdentityType shardIdentity)
+ : _txn(txn), _shardIdentity(std::move(shardIdentity)) {}
+
+ void commit() override {
+ fassertNoTrace(
+ 40071,
+ ShardingState::get(_txn)->initializeFromShardIdentity(_shardIdentity, Date_t::max()));
+ }
+
+ void rollback() override {}
+
+private:
+ OperationContext* _txn;
+ const ShardIdentityType _shardIdentity;
+};
+
+} // unnamed namespace
+
using std::string;
CollectionShardingState::CollectionShardingState(
@@ -133,6 +159,17 @@ bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn,
void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& insertedDoc) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
+ if (serverGlobalParams.clusterRole == ClusterRole::ShardServer &&
+ _nss == NamespaceString::kConfigCollectionNamespace) {
+ if (auto idElem = insertedDoc["_id"]) {
+ if (idElem.str() == ShardIdentityType::IdName) {
+ auto shardIdentityDoc = uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc));
+ txn->recoveryUnit()->registerChange(
+ new ShardIdentityLopOpHandler(txn, std::move(shardIdentityDoc)));
+ }
+ }
+ }
+
checkShardVersionOrThrow(txn);
if (_sourceMgr) {
@@ -143,6 +180,15 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i
void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
+ if (txn->writesAreReplicated() && serverGlobalParams.clusterRole == ClusterRole::ShardServer &&
+ _nss == NamespaceString::kConfigCollectionNamespace) {
+ if (auto idElem = updatedDoc["_id"]) {
+ uassert(40069,
+ "cannot update shardIdentity document while in --shardsvr mode",
+ idElem.str() != ShardIdentityType::IdName);
+ }
+ }
+
checkShardVersionOrThrow(txn);
if (_sourceMgr) {
@@ -153,6 +199,15 @@ void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& u
void CollectionShardingState::onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
+ if (txn->writesAreReplicated() && serverGlobalParams.clusterRole == ClusterRole::ShardServer &&
+ _nss == NamespaceString::kConfigCollectionNamespace) {
+ if (auto idElem = deletedDocId["_id"]) {
+ uassert(40070,
+ "cannot delete shardIdentity document while in --shardsvr mode",
+ idElem.str() != ShardIdentityType::IdName);
+ }
+ }
+
checkShardVersionOrThrow(txn);
if (_sourceMgr) {
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index bd79cf69884..e7a75d4122b 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -57,6 +57,12 @@
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+
+#include <iostream>
+#include <iomanip>
+#include <ctime>
+#include <chrono>
+
namespace mongo {
using std::shared_ptr;
@@ -112,6 +118,21 @@ VersionChoice chooseNewestVersion(ChunkVersion prevLocalVersion,
return VersionChoice::Remote;
}
+Date_t getDeadlineFromMaxTimeMS(OperationContext* txn) {
+ auto remainingTime = txn->getRemainingMaxTimeMicros();
+ log() << "REN: remaining: " << remainingTime;
+ if (remainingTime == 0) {
+ return Date_t::max();
+ }
+
+ if (remainingTime == 1) {
+ // 1 means maxTimeMS has exceeded.
+ return Date_t::now();
+ }
+
+ return Date_t::now() + Microseconds(remainingTime);
+}
+
} // namespace
//
@@ -351,8 +372,8 @@ void ShardingState::initializeFromConfigConnString(OperationContext* txn, const
}
}
- uassertStatusOK(_waitForInitialization(txn));
-
+ uassertStatusOK(_waitForInitialization(getDeadlineFromMaxTimeMS(txn)));
+ uassertStatusOK(reloadShardRegistryUntilSuccess(txn));
updateConfigServerOpTimeFromMetadata(txn);
}
@@ -382,31 +403,25 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn) {
return parseStatus.getStatus();
}
- return initializeFromShardIdentity(txn, parseStatus.getValue());
+ auto status =
+ initializeFromShardIdentity(parseStatus.getValue(), getDeadlineFromMaxTimeMS(txn));
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return reloadShardRegistryUntilSuccess(txn);
}
-Status ShardingState::initializeFromShardIdentity(OperationContext* txn,
- const ShardIdentityType& shardIdentity) {
- invariant(!txn->lockState()->isLocked());
+// NOTE: This method can be called inside a database lock so it should never take any database
+// locks, perform I/O, or any long running operations.
+Status ShardingState::initializeFromShardIdentity(const ShardIdentityType& shardIdentity,
+ Date_t deadline) {
if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) {
return Status::OK();
}
log() << "initializing sharding state with: " << shardIdentity;
- auto parsedConfigConnStrStatus =
- ConnectionString::parse(shardIdentity.getConfigsvrConnString());
- if (!parsedConfigConnStrStatus.isOK()) {
- return parsedConfigConnStrStatus.getStatus();
- }
-
- auto configSvrConnStr = parsedConfigConnStrStatus.getValue();
- if (configSvrConnStr.type() != ConnectionString::SET) {
- return {ErrorCodes::UnsupportedFormat,
- str::stream() << "config server connection string can only be replica sets: "
- << configSvrConnStr.toString()};
- }
-
stdx::unique_lock<stdx::mutex> lk(_mutex);
// TODO: remove after v3.4.
@@ -414,7 +429,7 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn,
// commands/setShardVersion. As well as all assignments to _initializationStatus and
// _setInitializationState_inlock in this method.
if (_getInitializationState() == InitializationState::kInitializing) {
- auto waitStatus = _waitForInitialization_inlock(txn, lk);
+ auto waitStatus = _waitForInitialization_inlock(deadline, lk);
if (!waitStatus.isOK()) {
return waitStatus;
}
@@ -427,6 +442,8 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn,
<< causedBy(_initializationStatus)};
}
+ auto configSvrConnStr = shardIdentity.getConfigsvrConnString();
+
if (_getInitializationState() == InitializationState::kInitialized) {
if (_shardName != shardIdentity.getShardName()) {
return {ErrorCodes::InconsistentShardIdentity,
@@ -468,7 +485,7 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn,
&ConfigServer::replicaSetChangeShardRegistryUpdateHook);
try {
- Status status = _globalInit(txn, configSvrConnStr);
+ Status status = _globalInit(configSvrConnStr);
// For backwards compatibility with old style inits from metadata commands.
if (status.isOK()) {
@@ -504,37 +521,33 @@ void ShardingState::_initializeImpl(ConnectionString configSvr) {
&ConfigServer::replicaSetChangeShardRegistryUpdateHook);
try {
- Status status = _globalInit(txn.get(), configSvr);
+ Status status = _globalInit(configSvr);
_signalInitializationComplete(status);
} catch (const DBException& ex) {
_signalInitializationComplete(ex.toStatus());
}
}
-Status ShardingState::_waitForInitialization(OperationContext* txn) {
+Status ShardingState::_waitForInitialization(Date_t deadline) {
if (enabled())
return Status::OK();
stdx::unique_lock<stdx::mutex> lk(_mutex);
- return _waitForInitialization_inlock(txn, lk);
+ return _waitForInitialization_inlock(deadline, lk);
}
-Status ShardingState::_waitForInitialization_inlock(OperationContext* txn,
+Status ShardingState::_waitForInitialization_inlock(Date_t deadline,
stdx::unique_lock<stdx::mutex>& lk) {
{
- const Microseconds timeRemaining(txn->getRemainingMaxTimeMicros());
while (_getInitializationState() == InitializationState::kInitializing ||
_getInitializationState() == InitializationState::kNew) {
- if (timeRemaining.count()) {
- const auto deadline = stdx::chrono::system_clock::now() + timeRemaining;
-
- if (stdx::cv_status::timeout ==
- _initializationFinishedCondition.wait_until(lk, deadline)) {
- return Status(ErrorCodes::ExceededTimeLimit,
- "Initializing sharding state exceeded time limit");
- }
- } else {
+ if (deadline == Date_t::max()) {
_initializationFinishedCondition.wait(lk);
+ } else if (stdx::cv_status::timeout ==
+ _initializationFinishedCondition.wait_until(lk,
+ deadline.toSystemTimePoint())) {
+ return Status(ErrorCodes::ExceededTimeLimit,
+ "Initializing sharding state exceeded time limit");
}
}
}
@@ -586,7 +599,7 @@ Status ShardingState::_refreshMetadata(OperationContext* txn,
ChunkVersion* latestShardVersion) {
invariant(!txn->lockState()->isLocked());
- Status status = _waitForInitialization(txn);
+ Status status = _waitForInitialization(getDeadlineFromMaxTimeMS(txn));
if (!status.isOK())
return status;
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index 79c2906284e..672642eedde 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -40,6 +40,7 @@
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/ticketholder.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -86,7 +87,7 @@ public:
OperationContext* const _txn;
};
- using GlobalInitFunc = stdx::function<Status(OperationContext*, const ConnectionString&)>;
+ using GlobalInitFunc = stdx::function<Status(const ConnectionString&)>;
ShardingState();
~ShardingState();
@@ -120,23 +121,30 @@ public:
*
* Throws if initialization fails for any reason and the sharding state object becomes unusable
* afterwards. Any sharding state operations afterwards will fail.
+ *
+ * Note that this will also try to connect to the config servers and will block until it
+ * succeeds.
*/
void initializeFromConfigConnString(OperationContext* txn, const std::string& configSvr);
/**
* Initializes the sharding state of this server from the shard identity document from local
* storage.
+ *
+ * Note that this will also try to connect to the config servers and will block until it
+ * succeeds.
*/
Status initializeFromShardIdentity(OperationContext* txn);
/**
* Initializes the sharding state of this server from the shard identity document argument.
* This is the more genaralized form of the initializeFromShardIdentity(OperationContext*)
- * method that can accept the shard identity from any source.
- * This method currently blocks for network and should not be called with database locks held.
+ * method that can accept the shard identity from any source. Note that shardIdentity must
+ * be valid.
+ *
+ * Returns ErrorCodes::ExceededTimeLimit if deadline has passed.
*/
- Status initializeFromShardIdentity(OperationContext* txn,
- const ShardIdentityType& shardIdentity);
+ Status initializeFromShardIdentity(const ShardIdentityType& shardIdentity, Date_t deadline);
/**
* Shuts down sharding machinery on the shard.
@@ -283,8 +291,8 @@ private:
* Blocking method, which waits for the initialization state to become kInitialized or kError
* and returns the initialization status.
*/
- Status _waitForInitialization(OperationContext* txn);
- Status _waitForInitialization_inlock(OperationContext* txn, stdx::unique_lock<stdx::mutex>& lk);
+ Status _waitForInitialization(Date_t deadline);
+ Status _waitForInitialization_inlock(Date_t deadline, stdx::unique_lock<stdx::mutex>& lk);
/**
* Simple wrapper to cast the initialization state atomic uint64 to InitializationState value
diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp
index 664f3961211..5bcee931b75 100644
--- a/src/mongo/db/s/sharding_state_test.cpp
+++ b/src/mongo/db/s/sharding_state_test.cpp
@@ -97,11 +97,10 @@ public:
_client = _service.makeClient("ShardingStateTest");
_opCtx = _client->makeOperationContext();
- _shardingState.setGlobalInitMethodForTest(
- [](OperationContext* txn, const ConnectionString& connStr) {
- initGrid(txn, connStr);
- return Status::OK();
- });
+ _shardingState.setGlobalInitMethodForTest([this](const ConnectionString& connStr) {
+ initGrid(_opCtx.get(), connStr);
+ return Status::OK();
+ });
}
void tearDown() override {
@@ -133,61 +132,40 @@ private:
TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) {
ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
ASSERT_TRUE(shardingState()->enabled());
ASSERT_EQ("a", shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString());
}
-TEST_F(ShardingStateTest, InvalidConfigServerConnStringDoesNotParse) {
- ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString("invalid:x");
- shardIdentity.setShardName("a");
- shardIdentity.setClusterId(OID::gen());
-
- auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity);
- ASSERT_EQ(ErrorCodes::FailedToParse, status);
- ASSERT_FALSE(shardingState()->enabled());
-}
-
-TEST_F(ShardingStateTest, CannotHaveNonReplConfigServerConnString) {
- ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString("a:1");
- shardIdentity.setShardName("a");
- shardIdentity.setClusterId(OID::gen());
-
- auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity);
- ASSERT_EQ(ErrorCodes::UnsupportedFormat, status);
- ASSERT_FALSE(shardingState()->enabled());
-}
-
TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) {
ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
- shardingState()->setGlobalInitMethodForTest(
- [](OperationContext* txn, const ConnectionString& connStr) {
- return Status{ErrorCodes::ShutdownInProgress, "shutting down"};
- });
+ shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
+ return Status{ErrorCodes::ShutdownInProgress, "shutting down"};
+ });
{
- auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity);
+ auto status = shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max());
ASSERT_EQ(ErrorCodes::ShutdownInProgress, status);
}
// ShardingState is now in error state, attempting to call it again will still result in error.
shardingState()->setGlobalInitMethodForTest(
- [](OperationContext* txn, const ConnectionString& connStr) { return Status::OK(); });
+ [](const ConnectionString& connStr) { return Status::OK(); });
{
- auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity);
+ auto status = shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max());
ASSERT_EQ(ErrorCodes::ManualInterventionRequired, status);
}
@@ -197,23 +175,24 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) {
TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
auto clusterID = OID::gen();
ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
- shardIdentity2.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity2.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(clusterID);
- shardingState()->setGlobalInitMethodForTest(
- [](OperationContext* txn, const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()));
ASSERT_TRUE(shardingState()->enabled());
ASSERT_EQ("a", shardingState()->getShardName());
@@ -223,23 +202,24 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
auto clusterID = OID::gen();
ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
- shardIdentity2.setConfigsvrConnString("config/b:2,c:3");
+ shardIdentity2.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "b:2,c:3", "config"));
shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(clusterID);
- shardingState()->setGlobalInitMethodForTest(
- [](OperationContext* txn, const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()));
ASSERT_TRUE(shardingState()->enabled());
ASSERT_EQ("a", shardingState()->getShardName());
@@ -249,23 +229,24 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) {
auto clusterID = OID::gen();
ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
- shardIdentity2.setConfigsvrConnString("configRS/a:1,b:2");
+ shardIdentity2.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "configRS"));
shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(clusterID);
- shardingState()->setGlobalInitMethodForTest(
- [](OperationContext* txn, const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity2);
+ auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max());
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
@@ -276,23 +257,24 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) {
TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) {
auto clusterID = OID::gen();
ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
- shardIdentity2.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity2.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity2.setShardName("b");
shardIdentity2.setClusterId(clusterID);
- shardingState()->setGlobalInitMethodForTest(
- [](OperationContext* txn, const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity2);
+ auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max());
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
@@ -302,23 +284,24 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) {
TEST_F(ShardingStateTest, InitializeAgainWithPreviouslyUnsetClusterIdSucceeds) {
ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID());
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
- shardIdentity2.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity2.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(OID::gen());
- shardingState()->setGlobalInitMethodForTest(
- [](OperationContext* txn, const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()));
ASSERT_TRUE(shardingState()->enabled());
ASSERT_EQ("a", shardingState()->getShardName());
@@ -327,23 +310,24 @@ TEST_F(ShardingStateTest, InitializeAgainWithPreviouslyUnsetClusterIdSucceeds) {
TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) {
ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
- shardIdentity2.setConfigsvrConnString("config/a:1,b:2");
+ shardIdentity2.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(OID::gen());
- shardingState()->setGlobalInitMethodForTest(
- [](OperationContext* txn, const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- auto status = shardingState()->initializeFromShardIdentity(txn(), shardIdentity2);
+ auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max());
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
diff --git a/src/mongo/db/s/type_shard_identity.cpp b/src/mongo/db/s/type_shard_identity.cpp
index 5c7f54441a0..7ebee3528b5 100644
--- a/src/mongo/db/s/type_shard_identity.cpp
+++ b/src/mongo/db/s/type_shard_identity.cpp
@@ -26,14 +26,16 @@
* then also delete it in the license file.
*/
+#include "mongo/platform/basic.h"
+
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/bson/util/bson_extract.h"
+#include "mongo/util/assert_util.h"
namespace mongo {
-const std::string ShardIdentityType::ConfigNS = "admin.system.version";
-const std::string id("shardIdentity");
+const std::string ShardIdentityType::IdName("shardIdentity");
const BSONField<std::string> configsvrConnString("configsvrConnectionString");
const BSONField<std::string> shardName("shardName");
const BSONField<OID> clusterId("clusterId");
@@ -52,9 +54,9 @@ StatusWith<ShardIdentityType> ShardIdentityType::fromBSON(const BSONObj& source)
return status;
}
- if (docId != id) {
+ if (docId != IdName) {
return {ErrorCodes::FailedToParse,
- str::stream() << "got _id: " << docId << " instead of " << id};
+ str::stream() << "got _id: " << docId << " instead of " << IdName};
}
}
@@ -65,7 +67,25 @@ StatusWith<ShardIdentityType> ShardIdentityType::fromBSON(const BSONObj& source)
return status;
}
- shardIdentity.setConfigsvrConnString(connString);
+ try {
+ // Note: ConnectionString::parse can uassert from HostAndPort constructor.
+ auto parsedConfigConnStrStatus = ConnectionString::parse(connString);
+ if (!parsedConfigConnStrStatus.isOK()) {
+ return parsedConfigConnStrStatus.getStatus();
+ }
+
+ auto configSvrConnStr = parsedConfigConnStrStatus.getValue();
+ if (configSvrConnStr.type() != ConnectionString::SET) {
+ return Status(ErrorCodes::UnsupportedFormat,
+ str::stream()
+ << "config server connection string can only be replica sets: "
+ << configSvrConnStr.toString());
+ }
+
+ shardIdentity.setConfigsvrConnString(std::move(configSvrConnStr));
+ } catch (const UserException& parseException) {
+ return parseException.toStatus();
+ }
}
{
@@ -92,11 +112,17 @@ StatusWith<ShardIdentityType> ShardIdentityType::fromBSON(const BSONObj& source)
}
Status ShardIdentityType::validate() const {
- if (!_configsvrConnString || _configsvrConnString->empty()) {
+ if (!_configsvrConnString) {
return {ErrorCodes::NoSuchKey,
str::stream() << "missing " << configsvrConnString() << " field"};
}
+ if (_configsvrConnString->type() != ConnectionString::SET) {
+ return {ErrorCodes::UnsupportedFormat,
+ str::stream() << "config connection string can only be replica sets, got "
+ << ConnectionString::typeToString(_configsvrConnString->type())};
+ }
+
if (!_shardName || _shardName->empty()) {
return {ErrorCodes::NoSuchKey, str::stream() << "missing " << shardName() << " field"};
}
@@ -111,10 +137,10 @@ Status ShardIdentityType::validate() const {
BSONObj ShardIdentityType::toBSON() const {
BSONObjBuilder builder;
- builder.append("_id", id);
+ builder.append("_id", IdName);
if (_configsvrConnString) {
- builder << configsvrConnString(_configsvrConnString.get());
+ builder << configsvrConnString(_configsvrConnString->toString());
}
if (_shardName) {
@@ -136,12 +162,12 @@ bool ShardIdentityType::isConfigsvrConnStringSet() const {
return _configsvrConnString.is_initialized();
}
-const std::string& ShardIdentityType::getConfigsvrConnString() const {
+const ConnectionString& ShardIdentityType::getConfigsvrConnString() const {
invariant(_configsvrConnString);
return _configsvrConnString.get();
}
-void ShardIdentityType::setConfigsvrConnString(std::string connString) {
+void ShardIdentityType::setConfigsvrConnString(ConnectionString connString) {
_configsvrConnString = std::move(connString);
}
diff --git a/src/mongo/db/s/type_shard_identity.h b/src/mongo/db/s/type_shard_identity.h
index 12f9fd3996c..52d31c0c39f 100644
--- a/src/mongo/db/s/type_shard_identity.h
+++ b/src/mongo/db/s/type_shard_identity.h
@@ -30,6 +30,7 @@
#include <string>
+#include "mongo/client/connection_string.h"
#include "mongo/db/jsobj.h"
namespace mongo {
@@ -39,8 +40,8 @@ namespace mongo {
*/
class ShardIdentityType {
public:
- // Name of the collection where the ShardIdentityType document is stored in the shard.
- static const std::string ConfigNS;
+ // The _id value for this document type.
+ static const std::string IdName;
/**
* Constructs a new ShardIdentityType object from BSON.
@@ -65,8 +66,8 @@ public:
std::string toString() const;
bool isConfigsvrConnStringSet() const;
- const std::string& getConfigsvrConnString() const;
- void setConfigsvrConnString(std::string connString);
+ const ConnectionString& getConfigsvrConnString() const;
+ void setConfigsvrConnString(ConnectionString connString);
bool isShardNameSet() const;
const std::string& getShardName() const;
@@ -80,7 +81,7 @@ private:
// Convention: (M)andatory, (O)ptional, (S)pecial rule.
// (M) connection string to the config server.
- boost::optional<std::string> _configsvrConnString;
+ boost::optional<ConnectionString> _configsvrConnString;
// (M) contains the name of the shard.
boost::optional<std::string> _shardName;
// (M) contains the (unique) identifier of the cluster.
diff --git a/src/mongo/db/s/type_shard_identity_test.cpp b/src/mongo/db/s/type_shard_identity_test.cpp
index 9f97aa4c1b0..0b31e4388d3 100644
--- a/src/mongo/db/s/type_shard_identity_test.cpp
+++ b/src/mongo/db/s/type_shard_identity_test.cpp
@@ -54,7 +54,7 @@ TEST(ShardIdentityType, RoundTrip) {
auto shardIdentity = result.getValue();
ASSERT_TRUE(shardIdentity.isConfigsvrConnStringSet());
- ASSERT_EQ("test/a:123", shardIdentity.getConfigsvrConnString());
+ ASSERT_EQ("test/a:123", shardIdentity.getConfigsvrConnString().toString());
ASSERT_TRUE(shardIdentity.isShardNameSet());
ASSERT_EQ("s1", shardIdentity.getShardName());
ASSERT_TRUE(shardIdentity.isClusterIdSet());
@@ -108,5 +108,31 @@ TEST(ShardIdentityType, ParseMissingClusterId) {
ASSERT_NOT_OK(result.getStatus());
}
+TEST(ShardIdentityType, InvalidConnectionString) {
+ auto clusterId(OID::gen());
+ auto doc = BSON("_id"
+ << "shardIdentity"
+ << "configsvrConnectionString"
+ << "test/,,,"
+ << "shardName"
+ << "s1"
+ << "clusterId" << clusterId);
+
+ ASSERT_EQ(ErrorCodes::FailedToParse, ShardIdentityType::fromBSON(doc).getStatus());
+}
+
+TEST(ShardIdentityType, NonReplSetConnectionString) {
+ auto clusterId(OID::gen());
+ auto doc = BSON("_id"
+ << "shardIdentity"
+ << "configsvrConnectionString"
+ << "local:123"
+ << "shardName"
+ << "s1"
+ << "clusterId" << clusterId);
+
+ ASSERT_EQ(ErrorCodes::UnsupportedFormat, ShardIdentityType::fromBSON(doc).getStatus());
+}
+
} // namespace mongo
} // unnamed namespace
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h
index 26769abb780..f4b3c9fcb5f 100644
--- a/src/mongo/s/catalog/catalog_manager.h
+++ b/src/mongo/s/catalog/catalog_manager.h
@@ -100,7 +100,7 @@ public:
* has been installed into the global 'grid' object. Implementation do not need to guarantee
* thread safety so callers should employ proper synchronization when calling this method.
*/
- virtual Status startup(OperationContext* txn) = 0;
+ virtual Status startup() = 0;
/**
* Performs necessary cleanup when shutting down cleanly.
diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp
index a3b9603c565..49f5e3213cb 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/catalog_manager_mock.cpp
@@ -48,7 +48,7 @@ CatalogManagerMock::CatalogManagerMock() {
CatalogManagerMock::~CatalogManagerMock() = default;
-Status CatalogManagerMock::startup(OperationContext* txn) {
+Status CatalogManagerMock::startup() {
return {ErrorCodes::InternalError, "Method not implemented"};
}
diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h
index 45cdd36a4b4..48a5ba066d8 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.h
+++ b/src/mongo/s/catalog/catalog_manager_mock.h
@@ -41,7 +41,7 @@ public:
CatalogManagerMock();
~CatalogManagerMock();
- Status startup(OperationContext* txn) override;
+ Status startup() override;
void shutDown(OperationContext* txn) override;
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index 903fed5048f..128bb294fa9 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -324,7 +324,7 @@ CatalogManagerReplicaSet::CatalogManagerReplicaSet(
CatalogManagerReplicaSet::~CatalogManagerReplicaSet() = default;
-Status CatalogManagerReplicaSet::startup(OperationContext* txn) {
+Status CatalogManagerReplicaSet::startup() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_started) {
return Status::OK();
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
index 651200d467b..ccabbb958c9 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
@@ -53,7 +53,7 @@ public:
* Safe to call multiple times as long as the calls are externally synchronized to be
* non-overlapping.
*/
- Status startup(OperationContext* txn) override;
+ Status startup() override;
void shutDown(OperationContext* txn) override;
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 6bb4a34b759..67d08f91033 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -270,8 +270,13 @@ static void reloadSettings(OperationContext* txn) {
}
static Status initializeSharding(OperationContext* txn) {
- Status status = initializeGlobalShardingStateForMongos(
- txn, mongosGlobalParams.configdbs, mongosGlobalParams.maxChunkSizeBytes);
+ Status status = initializeGlobalShardingStateForMongos(mongosGlobalParams.configdbs,
+ mongosGlobalParams.maxChunkSizeBytes);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ status = reloadShardRegistryUntilSuccess(txn);
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 1be0d0ae45d..71640889e5f 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -133,8 +133,7 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(std::unique_ptr<NetworkIn
return executorPool;
}
-Status initializeGlobalShardingState(OperationContext* txn,
- const ConnectionString& configCS,
+Status initializeGlobalShardingState(const ConnectionString& configCS,
uint64_t maxChunkSizeBytes,
bool isMongos) {
if (configCS.type() == ConnectionString::INVALID) {
@@ -164,6 +163,7 @@ Status initializeGlobalShardingState(OperationContext* txn,
shardRegistry.get(),
HostAndPort(getHostName(), serverGlobalParams.port));
+ auto rawCatalogManager = catalogManager.get();
grid.init(
std::move(catalogManager),
stdx::make_unique<CatalogCache>(),
@@ -173,14 +173,38 @@ Status initializeGlobalShardingState(OperationContext* txn,
std::move(executorPool),
networkPtr);
+ Status status = rawCatalogManager->startup();
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return Status::OK();
+}
+
+} // namespace
+
+Status initializeGlobalShardingStateForMongos(const ConnectionString& configCS,
+ uint64_t maxChunkSizeBytes) {
+ return initializeGlobalShardingState(configCS, maxChunkSizeBytes, true);
+}
+
+Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS) {
+ return initializeGlobalShardingState(configCS, ChunkSizeSettingsType::kDefaultMaxChunkSizeBytes, false);
+}
+
+Status reloadShardRegistryUntilSuccess(OperationContext* txn) {
+ if (serverGlobalParams.configsvrMode != CatalogManager::ConfigServerMode::NONE) {
+ return Status::OK();
+ }
+
while (!inShutdown()) {
- try {
- Status status = grid.catalogManager(txn)->startup(txn);
- uassertStatusOK(status);
+ auto stopStatus = txn->checkForInterruptNoAssert();
+ if (!stopStatus.isOK()) {
+ return stopStatus;
+ }
- if (serverGlobalParams.configsvrMode == CatalogManager::ConfigServerMode::NONE) {
- grid.shardRegistry()->reload(txn);
- }
+ try {
+ grid.shardRegistry()->reload(txn);
return Status::OK();
} catch (const DBException& ex) {
Status status = ex.toStatus();
@@ -198,21 +222,7 @@ Status initializeGlobalShardingState(OperationContext* txn,
}
}
- return Status::OK();
-}
-
-} // namespace
-
-Status initializeGlobalShardingStateForMongos(OperationContext* txn,
- const ConnectionString& configCS,
- uint64_t maxChunkSizeBytes) {
- return initializeGlobalShardingState(txn, configCS, maxChunkSizeBytes, true);
-}
-
-Status initializeGlobalShardingStateForMongod(OperationContext* txn,
- const ConnectionString& configCS) {
- return initializeGlobalShardingState(
- txn, configCS, ChunkSizeSettingsType::kDefaultMaxChunkSizeBytes, false);
+ return {ErrorCodes::ShutdownInProgress, "aborting shard loading attempt"};
}
} // namespace mongo
diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h
index eeb1763df70..d7105768785 100644
--- a/src/mongo/s/sharding_initialization.h
+++ b/src/mongo/s/sharding_initialization.h
@@ -40,11 +40,15 @@ class Status;
* Takes in the connection string for reaching the config servers and initializes the global
* CatalogManager, ShardingRegistry, and grid objects.
*/
-Status initializeGlobalShardingStateForMongos(OperationContext* txn,
- const ConnectionString& configCS,
+Status initializeGlobalShardingStateForMongos(const ConnectionString& configCS,
uint64_t maxChunkSizeBytes);
-Status initializeGlobalShardingStateForMongod(OperationContext* txn,
- const ConnectionString& configCS);
+Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS);
+
+/**
+ * Tries to contact the config server and reload the shard registry until it succeeds or
+ * is interrupted.
+ */
+Status reloadShardRegistryUntilSuccess(OperationContext* txn);
} // namespace mongo
diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp
index 737a996e113..329bcb18c78 100644
--- a/src/mongo/s/sharding_test_fixture.cpp
+++ b/src/mongo/s/sharding_test_fixture.cpp
@@ -119,7 +119,7 @@ void ShardingTestFixture::setUp() {
std::unique_ptr<CatalogManagerReplicaSet> cm(stdx::make_unique<CatalogManagerReplicaSet>(
std::move(uniqueDistLockManager), std::move(specialExec)));
_catalogManagerRS = cm.get();
- cm->startup(_opCtx.get());
+ cm->startup();
ConnectionString configCS = ConnectionString::forReplicaSet(
"CatalogManagerReplSetTest", {HostAndPort{"TestHost1"}, HostAndPort{"TestHost2"}});
diff --git a/src/mongo/util/time_support.h b/src/mongo/util/time_support.h
index ca9fa2162f5..6f20025d832 100644
--- a/src/mongo/util/time_support.h
+++ b/src/mongo/util/time_support.h
@@ -181,6 +181,8 @@ public:
/*
* Returns a system clock time_point representing the same point in time as this Date_t.
+ * Warning: careful when using with Date_t::max() as it can have a value that is bigger than
+ * time_point can store.
*/
stdx::chrono::system_clock::time_point toSystemTimePoint() const;
@@ -195,6 +197,8 @@ public:
/**
* Implicit conversion operator to system clock time point. Enables use of Date_t with
* condition_variable::wait_until.
+ * Warning: careful when using with Date_t::max() as it can have a value that is bigger than
+ * time_point can store.
*/
operator stdx::chrono::system_clock::time_point() const {
return toSystemTimePoint();