summaryrefslogtreecommitdiff
path: root/src/mongo/s/catalog
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2016-05-25 17:01:10 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2016-06-10 16:19:42 -0400
commit1e26998e7fda52c226385fae4069ebbc384c294a (patch)
treefb26938216376e21a295c9163ac53a80b43d8d8f /src/mongo/s/catalog
parentf3f756132c74c1b44696d30faf6ef806fc7de860 (diff)
downloadmongo-1e26998e7fda52c226385fae4069ebbc384c294a.tar.gz
SERVER-24126 Add step to _cfgsvrAddShard command where it inserts the shardIdentity document to the new shard
Diffstat (limited to 'src/mongo/s/catalog')
-rw-r--r--src/mongo/s/catalog/replset/SConscript1
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp95
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.h8
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp120
4 files changed, 189 insertions, 35 deletions
diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript
index 582b9a26aea..3d482b50403 100644
--- a/src/mongo/s/catalog/replset/SConscript
+++ b/src/mongo/s/catalog/replset/SConscript
@@ -66,6 +66,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/repl/read_concern_args',
+ '$BUILD_DIR/mongo/db/s/type_shard_identity',
'$BUILD_DIR/mongo/executor/network_interface',
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager',
'$BUILD_DIR/mongo/s/client/sharding_client',
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index 3c9b7ce9809..8237359140e 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/s/type_shard_identity.h"
#include "mongo/executor/network_interface.h"
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -197,9 +198,10 @@ StatusWith<ShardType> CatalogManagerReplicaSet::_validateHostAsShard(
// Check for mongos and older version mongod connections, and whether the hosts
// can be found for the user specified replset.
- auto cmdStatus = _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1));
- if (!cmdStatus.isOK()) {
- if (cmdStatus == ErrorCodes::RPCProtocolNegotiationFailed) {
+ auto swCommandResponse =
+ _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1));
+ if (!swCommandResponse.isOK()) {
+ if (swCommandResponse.getStatus() == ErrorCodes::RPCProtocolNegotiationFailed) {
// Mongos to mongos commands are no longer supported in the wire protocol
// (because mongos does not support OP_COMMAND), similarly for a new mongos
// and an old mongod. So the call will fail in such cases.
@@ -211,19 +213,20 @@ StatusWith<ShardType> CatalogManagerReplicaSet::_validateHostAsShard(
<< " likely because it contains a node that is a mongos or an old"
<< " version of mongod."};
} else {
- return cmdStatus.getStatus();
+ return swCommandResponse.getStatus();
}
}
// Check for a command response error
- BSONObj resIsMaster = cmdStatus.getValue();
- Status resIsMasterStatus = getStatusFromCommandResult(resIsMaster);
+ auto resIsMasterStatus = std::move(swCommandResponse.getValue().commandStatus);
if (!resIsMasterStatus.isOK()) {
return {resIsMasterStatus.code(),
str::stream() << "Error running isMaster against " << shardConn->toString() << ": "
<< causedBy(resIsMasterStatus)};
}
+ auto resIsMaster = std::move(swCommandResponse.getValue().response);
+
// Check whether there is a master. If there isn't, the replica set may not have been
// initiated. If the connection is a standalone, it will return true for isMaster.
bool isMaster;
@@ -350,19 +353,19 @@ StatusWith<std::vector<std::string>> CatalogManagerReplicaSet::_getDBNamesListFr
shardRegistry->createConnection(connectionString).release()};
invariant(shardConn);
- auto cmdStatus = _runCommandForAddShard(
+ auto swCommandResponse = _runCommandForAddShard(
txn, shardConn->getTargeter().get(), "admin", BSON("listDatabases" << 1));
- if (!cmdStatus.isOK()) {
- return cmdStatus.getStatus();
+ if (!swCommandResponse.isOK()) {
+ return swCommandResponse.getStatus();
}
- const BSONObj& cmdResult = cmdStatus.getValue();
-
- Status cmdResultStatus = getStatusFromCommandResult(cmdResult);
- if (!cmdResultStatus.isOK()) {
- return cmdResultStatus;
+ auto cmdStatus = std::move(swCommandResponse.getValue().commandStatus);
+ if (!cmdStatus.isOK()) {
+ return cmdStatus;
}
+ auto cmdResult = std::move(swCommandResponse.getValue().response);
+
vector<string> dbNames;
for (const auto& dbEntry : cmdResult["databases"].Obj()) {
@@ -408,7 +411,7 @@ void CatalogManagerReplicaSet::shutDown(OperationContext* txn) {
_executorForAddShard->join();
}
-StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandForAddShard(
+StatusWith<Shard::CommandResponse> CatalogManagerReplicaSet::_runCommandForAddShard(
OperationContext* txn,
RemoteCommandTargeter* targeter,
const std::string& dbName,
@@ -421,12 +424,12 @@ StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandForAddShard(
executor::RemoteCommandRequest request(
host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30));
- StatusWith<executor::RemoteCommandResponse> responseStatus =
+ StatusWith<executor::RemoteCommandResponse> swResponse =
Status(ErrorCodes::InternalError, "Internal error running command");
auto callStatus = _executorForAddShard->scheduleRemoteCommand(
- request, [&responseStatus](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
- responseStatus = args.response;
+ request, [&swResponse](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+ swResponse = args.response;
});
if (!callStatus.isOK()) {
return callStatus.getStatus();
@@ -435,11 +438,22 @@ StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandForAddShard(
// Block until the command is carried out
_executorForAddShard->wait(callStatus.getValue());
- if (!responseStatus.isOK()) {
- return responseStatus.getStatus();
+ if (!swResponse.isOK()) {
+ if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) {
+ LOG(0) << "Operation for addShard timed out with status " << swResponse.getStatus();
+ }
+ return swResponse.getStatus();
}
- return responseStatus.getValue().data.getOwned();
+ BSONObj responseObj = swResponse.getValue().data.getOwned();
+ BSONObj responseMetadata = swResponse.getValue().metadata.getOwned();
+ Status commandStatus = getStatusFromCommandResult(responseObj);
+ Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj);
+
+ return Shard::CommandResponse(std::move(responseObj),
+ std::move(responseMetadata),
+ std::move(commandStatus),
+ std::move(writeConcernStatus));
}
StatusWith<string> CatalogManagerReplicaSet::addShard(OperationContext* txn,
@@ -497,7 +511,44 @@ StatusWith<string> CatalogManagerReplicaSet::addShard(OperationContext* txn,
shardType.setMaxSizeMB(maxSize);
}
- log() << "going to add shard: " << shardType.toString();
+ ShardIdentityType shardIdentity;
+ shardIdentity.setConfigsvrConnString(
+ Grid::get(txn)->shardRegistry()->getConfigServerConnectionString());
+ shardIdentity.setShardName(shardType.getName());
+ shardIdentity.setClusterId(Grid::get(txn)->shardRegistry()->getClusterId());
+ auto validateStatus = shardIdentity.validate();
+ if (!validateStatus.isOK()) {
+ return validateStatus;
+ }
+
+ log() << "going to insert shardIdentity document into shard: " << shardIdentity.toString();
+
+ auto updateRequest = shardIdentity.createUpsertForAddShard();
+ BatchedCommandRequest commandRequest(updateRequest.release());
+ commandRequest.setNS(NamespaceString::kConfigCollectionNamespace);
+ commandRequest.setWriteConcern(kMajorityWriteConcern.toBSON());
+
+ const std::shared_ptr<Shard> shardConn{
+ Grid::get(txn)->shardRegistry()->createConnection(shardConnectionString)};
+ invariant(shardConn);
+ auto targeter = shardConn->getTargeter();
+
+ auto swCommandResponse =
+ _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest.toBSON());
+
+ if (!swCommandResponse.isOK()) {
+ return swCommandResponse.getStatus();
+ }
+
+ auto commandResponse = std::move(swCommandResponse.getValue());
+
+ BatchedCommandResponse batchResponse;
+ auto batchResponseStatus = _processBatchWriteResponse(commandResponse, &batchResponse);
+ if (!batchResponseStatus.isOK()) {
+ return batchResponseStatus;
+ }
+
+ log() << "going to insert new entry for shard into config.shards: " << shardType.toString();
Status result = insertConfigDocument(txn, ShardType::ConfigNS, shardType.toBSON());
if (!result.isOK()) {
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
index b7251dc2a37..a26bf941ab2 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
@@ -276,10 +276,10 @@ private:
* Runs a command against a "shard" that is not yet in the cluster and thus not present in the
* ShardRegistry.
*/
- StatusWith<BSONObj> _runCommandForAddShard(OperationContext* txn,
- RemoteCommandTargeter* targeter,
- const std::string& dbName,
- const BSONObj& cmdObj);
+ StatusWith<Shard::CommandResponse> _runCommandForAddShard(OperationContext* txn,
+ RemoteCommandTargeter* targeter,
+ const std::string& dbName,
+ const BSONObj& cmdObj);
StatusWith<repl::OpTimeWith<std::vector<BSONObj>>> _exhaustiveFindOnConfig(
OperationContext* txn,
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
index 54f414c79cb..5c3e1ccfb06 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
@@ -32,10 +32,12 @@
#include <vector>
+#include "mongo/client/connection_string.h"
#include "mongo/client/remote_command_targeter_factory_mock.h"
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands.h"
#include "mongo/db/query/query_request.h"
+#include "mongo/db/s/type_shard_identity.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/s/catalog/replset/catalog_manager_replica_set.h"
@@ -75,7 +77,16 @@ protected:
CatalogManagerReplSetTestFixture::setUp();
getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
- configTargeter()->setFindHostReturnValue(configHost);
+
+ configTargeter()->setConnectionStringReturnValue(_configConnStr);
+
+ _configHost = _configConnStr.getServers().front();
+ configTargeter()->setFindHostReturnValue(_configHost);
+
+ // TODO SERVER-23096: Change this to OID::gen() once clusterId is loaded from the config
+ // servers into the ShardRegistry instead of created by the ShardRegistry within each
+ // process.
+ _clusterId = OID();
}
/**
@@ -127,12 +138,77 @@ protected:
}
/**
+ * Waits for a request for the shardIdentity document to be upserted into a shard from the
+ * config server on addShard.
+ */
+ void expectShardIdentityUpsert(const HostAndPort& expectedHost,
+ const std::string& expectedShardName) {
+
+ ShardIdentityType expectedShardIdentity;
+ expectedShardIdentity.setShardName(expectedShardName);
+ expectedShardIdentity.setClusterId(_clusterId);
+ expectedShardIdentity.setConfigsvrConnString(_configConnStr);
+ invariant(expectedShardIdentity.validate().isOK());
+
+ auto updateRequest = expectedShardIdentity.createUpsertForAddShard();
+ expectUpdates(expectedHost,
+ NamespaceString(NamespaceString::kConfigCollectionNamespace),
+ updateRequest.get());
+ }
+
+ /**
+ * Waits for a set of batched updates and ensures that the host, namespace, and updates exactly
+ * match what's expected. Responds with a success status.
+ */
+ void expectUpdates(const HostAndPort& expectedHost,
+ const NamespaceString& expectedNss,
+ BatchedUpdateRequest* expectedBatchedUpdates) {
+ onCommandForAddShard([&](const RemoteCommandRequest& request) {
+
+ ASSERT_EQUALS(expectedHost, request.target);
+
+ // Check that the db name in the request matches the expected db name.
+ ASSERT_EQUALS(expectedNss.db(), request.dbname);
+
+ BatchedUpdateRequest actualBatchedUpdates;
+ std::string errmsg;
+ ASSERT_TRUE(actualBatchedUpdates.parseBSON(request.dbname, request.cmdObj, &errmsg));
+
+ // Check that the db and collection names in the BatchedUpdateRequest match the
+ // expected.
+ ASSERT_EQUALS(expectedNss, actualBatchedUpdates.getNS());
+
+ auto expectedUpdates = expectedBatchedUpdates->getUpdates();
+ auto actualUpdates = actualBatchedUpdates.getUpdates();
+
+ ASSERT_EQUALS(expectedUpdates.size(), actualUpdates.size());
+
+ auto itExpected = expectedUpdates.begin();
+ auto itActual = actualUpdates.begin();
+
+ for (; itActual != actualUpdates.end(); itActual++, itExpected++) {
+ ASSERT_EQ((*itExpected)->getUpsert(), (*itActual)->getUpsert());
+ ASSERT_EQ((*itExpected)->getMulti(), (*itActual)->getMulti());
+ ASSERT_EQ((*itExpected)->getQuery(), (*itActual)->getQuery());
+ ASSERT_EQ((*itExpected)->getUpdateExpr(), (*itActual)->getUpdateExpr());
+ }
+
+ BatchedCommandResponse response;
+ response.setOk(true);
+ response.setNModified(1);
+
+ return response.toBSON();
+ });
+ }
+
+
+ /**
* Wait for a single update request and ensure that the items being updated exactly match the
* expected items. Responds with a success status.
*/
void expectDatabaseUpdate(const DatabaseType& dbtExpected) {
onCommand([this, &dbtExpected](const RemoteCommandRequest& request) {
- ASSERT_EQ(request.target, configHost);
+ ASSERT_EQ(request.target, _configHost);
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
BatchedUpdateRequest actualBatchedUpdate;
@@ -162,10 +238,10 @@ protected:
*/
void expectAddShardChangeLog(const std::string& shardName, const std::string& shardHost) {
// Expect the change log collection to be created
- expectChangeLogCreate(configHost, BSON("ok" << 1));
+ expectChangeLogCreate(_configHost, BSON("ok" << 1));
// Expect the change log operation
- expectChangeLogInsert(configHost,
+ expectChangeLogInsert(_configHost,
network()->now(),
"addShard",
"",
@@ -177,7 +253,7 @@ protected:
// Do it twice when there is no response set because getDatabase retries if it can't
// find a database
onFindCommand([&](const RemoteCommandRequest& request) {
- ASSERT_EQ(request.target, configHost);
+ ASSERT_EQ(request.target, _configHost);
if (i == 0) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
} else if (i == 1) {
@@ -205,7 +281,11 @@ protected:
}
}
- const HostAndPort configHost{HostAndPort("ConfigHost:23456")};
+ const ConnectionString _configConnStr{ConnectionString::forReplicaSet(
+ "configRS",
+ {HostAndPort("host1:23456"), HostAndPort("host2:23456"), HostAndPort("host3:23456")})};
+ HostAndPort _configHost;
+ OID _clusterId;
};
TEST_F(AddShardTest, Standalone) {
@@ -250,7 +330,10 @@ TEST_F(AddShardTest, Standalone) {
expectGetDatabase("TestDB1", boost::none);
expectGetDatabase("TestDB2", boost::none);
- // The new shard is being inserted
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType expectedShard;
expectedShard.setName(expectedShardName);
expectedShard.setHost("StandaloneHost:12345");
@@ -347,7 +430,10 @@ TEST_F(AddShardTest, StandaloneGenerateName) {
return vector<BSONObj>{existingShard.toBSON()};
});
- // The new shard is being inserted
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType expectedShard;
expectedShard.setName(expectedShardName);
expectedShard.setHost(shardTarget.toString());
@@ -784,6 +870,10 @@ TEST_F(AddShardTest, ReAddExistingShard) {
expectGetDatabase("shardDB", boost::none);
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType newShard;
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);
@@ -849,6 +939,10 @@ TEST_F(AddShardTest, SuccessfullyAddReplicaSet) {
expectGetDatabase("shardDB", boost::none);
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType newShard;
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);
@@ -906,6 +1000,10 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) {
expectGetDatabase("shardDB", boost::none);
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType newShard;
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);
@@ -922,7 +1020,7 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) {
// Ensure that even if upserting the database discovered on the shard fails, the addShard
// operation succeeds.
onCommand([this, &shardDB](const RemoteCommandRequest& request) {
- ASSERT_EQ(request.target, configHost);
+ ASSERT_EQ(request.target, _configHost);
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
BatchedUpdateRequest actualBatchedUpdate;
@@ -983,6 +1081,10 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) {
expectListDatabases(shardTarget, {});
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType newShard;
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);