summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2016-06-16 01:04:29 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2016-07-14 16:32:10 -0400
commit2ea2c7700d9ccca1150e49c181c97b948889df5e (patch)
tree1e8d89f6506ea1a7fee2a172d7138e866dd1c111
parentaf5daa51506541d9526ce576f2432809003d2432 (diff)
downloadmongo-2ea2c7700d9ccca1150e49c181c97b948889df5e.tar.gz
SERVER-22660 OpObserver on config server for inserts to config.shards from old mongos
-rw-r--r--jstests/sharding/shard_aware_on_add_shard.js34
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp50
-rw-r--r--src/mongo/db/s/type_shard_identity.cpp35
-rw-r--r--src/mongo/db/s/type_shard_identity.h16
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp50
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp286
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h80
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client.h6
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager.h22
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.cpp10
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.h8
-rw-r--r--src/mongo/s/catalog/type_shard.cpp30
-rw-r--r--src/mongo/s/catalog/type_shard.h15
13 files changed, 561 insertions, 81 deletions
diff --git a/jstests/sharding/shard_aware_on_add_shard.js b/jstests/sharding/shard_aware_on_add_shard.js
index 9b7045c5dca..4442aed7041 100644
--- a/jstests/sharding/shard_aware_on_add_shard.js
+++ b/jstests/sharding/shard_aware_on_add_shard.js
@@ -14,12 +14,32 @@
};
var checkShardingStateInitialized = function(conn, configConnStr, shardName, clusterId) {
- var res = conn.getDB('admin').runCommand({shardingState: 1});
- assert.commandWorked(res);
- assert(res.enabled);
- assert.eq(configConnStr, res.configServer);
- assert.eq(shardName, res.shardName);
- assert.eq(clusterId, res.clusterId);
+ // TODO: SERVER-22665 a mixed-version test should be written specifically testing receiving
+ // addShard from a legacy mongos, and this assert.soon() should be changed back to assert
+ // synchronously.
+ assert.soon(function() {
+ var res = conn.getDB('admin').runCommand({shardingState: 1});
+ assert.commandWorked(res);
+ if (res.enabled && (configConnStr === res.configServer) &&
+ (shardName === res.shardName) && (clusterId.equals(res.clusterId))) {
+ return true;
+ }
+ return false;
+ });
+ };
+
+ var checkShardMarkedAsShardAware = function(mongosConn, shardName) {
+ // TODO: SERVER-22665 a mixed-version test should be written specifically testing receiving
+ // addShard from a legacy mongos, and this assert.soon() should be changed back to assert
+ // synchronously.
+ assert.soon(function() {
+ var res = mongosConn.getDB('config').getCollection('shards').findOne({_id: shardName});
+ assert.neq(null, res, "Could not find new shard " + shardName + " in config.shards");
+ if (res.state && res.state === 1) {
+ return true;
+ }
+ return false;
+ });
};
// Create the cluster to test adding shards to.
@@ -35,6 +55,7 @@
var newShardName = "newShard";
assert.commandWorked(st.s.adminCommand({addShard: standaloneConn.name, name: newShardName}));
checkShardingStateInitialized(standaloneConn, st.configRS.getURL(), newShardName, clusterId);
+ checkShardMarkedAsShardAware(st.s, newShardName);
MongoRunner.stopMongod(standaloneConn.port);
@@ -49,6 +70,7 @@
assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: replTest.getURL()}));
checkShardingStateInitialized(
replTest.getPrimary(), st.configRS.getURL(), replTest.getURL(), clusterId);
+ checkShardMarkedAsShardAware(st.s, newShardName);
replTest.stopSet();
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 226a5d65af1..9ccf10791e9 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -44,19 +44,24 @@
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/server_options.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
+#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/chunk_version.h"
+#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
namespace mongo {
namespace {
+using std::string;
+
/**
* Used to perform shard identity initialization once it is certain that the document is committed.
*/
-class ShardIdentityLopOpHandler final : public RecoveryUnit::Change {
+class ShardIdentityLogOpHandler final : public RecoveryUnit::Change {
public:
- ShardIdentityLopOpHandler(OperationContext* txn, ShardIdentityType shardIdentity)
+ ShardIdentityLogOpHandler(OperationContext* txn, ShardIdentityType shardIdentity)
: _txn(txn), _shardIdentity(std::move(shardIdentity)) {}
void commit() override {
@@ -72,9 +77,32 @@ private:
const ShardIdentityType _shardIdentity;
};
-} // unnamed namespace
+/**
+ * Used by the config server for backwards compatibility with 3.2 mongos to upsert a shardIdentity
+ * document (and thereby perform shard aware initialization) on a newly added shard.
+ */
+class LegacyAddShardLogOpHandler final : public RecoveryUnit::Change {
+public:
+ LegacyAddShardLogOpHandler(OperationContext* txn, ShardType shardType)
+ : _txn(txn), _shardType(std::move(shardType)) {}
-using std::string;
+ void commit() override {
+ // Only the primary should complete the addShard process by upserting the shardIdentity on
+ // the new shard.
+ if (repl::getGlobalReplicationCoordinator()->getMemberState().primary()) {
+ uassertStatusOK(
+ Grid::get(_txn)->catalogManager()->upsertShardIdentityOnShard(_txn, _shardType));
+ }
+ }
+
+ void rollback() override {}
+
+private:
+ OperationContext* _txn;
+ const ShardType _shardType;
+};
+
+} // unnamed namespace
CollectionShardingState::CollectionShardingState(
NamespaceString nss, std::unique_ptr<CollectionMetadata> initialMetadata)
@@ -166,11 +194,23 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i
auto shardIdentityDoc = uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc));
uassertStatusOK(shardIdentityDoc.validate());
txn->recoveryUnit()->registerChange(
- new ShardIdentityLopOpHandler(txn, std::move(shardIdentityDoc)));
+ new ShardIdentityLogOpHandler(txn, std::move(shardIdentityDoc)));
}
}
}
+ // For backwards compatibility with 3.2 mongos, perform share aware initialization on a newly
+ // added shard on inserts to config.shards missing the "state" field. (On addShard, a 3.2
+ // mongos performs the insert into config.shards without a "state" field.)
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
+ _nss == ShardType::ConfigNS) {
+ if (insertedDoc[ShardType::state.name()].eoo()) {
+ const auto shardType = uassertStatusOK(ShardType::fromBSON(insertedDoc));
+ txn->recoveryUnit()->registerChange(
+ new LegacyAddShardLogOpHandler(txn, std::move(shardType)));
+ }
+ }
+
checkShardVersionOrThrow(txn);
if (_sourceMgr) {
diff --git a/src/mongo/db/s/type_shard_identity.cpp b/src/mongo/db/s/type_shard_identity.cpp
index 6188f08a109..3eef36c9a22 100644
--- a/src/mongo/db/s/type_shard_identity.cpp
+++ b/src/mongo/db/s/type_shard_identity.cpp
@@ -40,13 +40,9 @@ namespace mongo {
const std::string ShardIdentityType::IdName("shardIdentity");
-namespace {
-
-const BSONField<std::string> configsvrConnString("configsvrConnectionString");
-const BSONField<std::string> shardName("shardName");
-const BSONField<OID> clusterId("clusterId");
-
-} // unnamed namespace
+const BSONField<std::string> ShardIdentityType::configsvrConnString("configsvrConnectionString");
+const BSONField<std::string> ShardIdentityType::shardName("shardName");
+const BSONField<OID> ShardIdentityType::clusterId("clusterId");
StatusWith<ShardIdentityType> ShardIdentityType::fromBSON(const BSONObj& source) {
if (!source.hasField("_id")) {
@@ -206,31 +202,6 @@ void ShardIdentityType::setClusterId(OID clusterId) {
_clusterId = std::move(clusterId);
}
-std::unique_ptr<BatchedUpdateRequest> ShardIdentityType::createUpsertForAddShard() const {
- invariant(validate().isOK());
-
- std::unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument());
-
- BSONObjBuilder query;
- query.append("_id", "shardIdentity");
- query.append("shardName", getShardName());
- query.append("clusterId", getClusterId());
- updateDoc->setQuery(query.obj());
-
- BSONObjBuilder update;
- BSONObjBuilder setConfigBuilder(update.subobjStart("$set"));
- setConfigBuilder.append(configsvrConnString(), getConfigsvrConnString().toString());
- setConfigBuilder.doneFast();
- updateDoc->setUpdateExpr(update.obj());
-
- updateDoc->setUpsert(true);
-
- std::unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest());
- updateRequest->addToUpdates(updateDoc.release());
-
- return updateRequest;
-}
-
BSONObj ShardIdentityType::createConfigServerUpdateObject(const std::string& newConnString) {
BSONObjBuilder builder;
BSONObjBuilder setConfigBuilder(builder.subobjStart("$set"));
diff --git a/src/mongo/db/s/type_shard_identity.h b/src/mongo/db/s/type_shard_identity.h
index 9c51913fe0f..ce29024b81b 100644
--- a/src/mongo/db/s/type_shard_identity.h
+++ b/src/mongo/db/s/type_shard_identity.h
@@ -41,11 +41,16 @@ namespace mongo {
*/
class ShardIdentityType {
public:
- ShardIdentityType() = default;
-
// The _id value for this document type.
static const std::string IdName;
+ // Field names and types in a shardIdentity document.
+ static const BSONField<std::string> configsvrConnString;
+ static const BSONField<std::string> shardName;
+ static const BSONField<OID> clusterId;
+
+ ShardIdentityType() = default;
+
/**
* Constructs a new ShardIdentityType object from BSON.
* Also does validation of the contents.
@@ -81,13 +86,6 @@ public:
void setClusterId(OID clusterId);
/**
- * Returns an update object that can be used to insert a shardIdentity doc on a shard or update
- * the existing shardIdentity doc's configsvrConnString (if the _id, shardName, and clusterId
- * match those in this ShardIdentityType instance).
- */
- std::unique_ptr<BatchedUpdateRequest> createUpsertForAddShard() const;
-
- /**
* Returns an update object that can be used to update the config server field of the
* shardIdentity document with the new connection string.
*/
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
index f3405100655..048cf792687 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
@@ -49,6 +49,7 @@
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_identity_loader.h"
+#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/s/write_ops/batched_insert_request.h"
#include "mongo/s/write_ops/batched_update_document.h"
@@ -185,17 +186,18 @@ protected:
*/
void expectShardIdentityUpsert(const HostAndPort& expectedHost,
const std::string& expectedShardName) {
+ // Create the expected upsert shardIdentity command for this shardType.
+ auto upsertCmdObj = catalogManager()->createShardIdentityUpsertForAddShard(
+ operationContext(), expectedShardName);
- ShardIdentityType expectedShardIdentity;
- expectedShardIdentity.setShardName(expectedShardName);
- expectedShardIdentity.setClusterId(_clusterId);
- expectedShardIdentity.setConfigsvrConnString(_configConnStr);
- invariant(expectedShardIdentity.validate().isOK());
+ // Get the BatchedUpdateRequest from the upsert command.
+ BatchedCommandRequest request(BatchedCommandRequest::BatchType::BatchType_Update);
+ std::string errMsg;
+ invariant(request.parseBSON("admin", upsertCmdObj, &errMsg) || !request.isValid(&errMsg));
- auto updateRequest = expectedShardIdentity.createUpsertForAddShard();
expectUpdates(expectedHost,
NamespaceString(NamespaceString::kConfigCollectionNamespace),
- updateRequest.get());
+ request.getUpdateRequest());
}
/**
@@ -380,6 +382,7 @@ TEST_F(AddShardTest, Standalone) {
expectedShard.setName(expectedShardName);
expectedShard.setHost("StandaloneHost:12345");
expectedShard.setMaxSizeMB(100);
+ expectedShard.setState(ShardType::ShardState::kShardAware);
expectInserts(NamespaceString(ShardType::ConfigNS), {expectedShard.toBSON()});
@@ -480,6 +483,7 @@ TEST_F(AddShardTest, StandaloneGenerateName) {
expectedShard.setName(expectedShardName);
expectedShard.setHost(shardTarget.toString());
expectedShard.setMaxSizeMB(100);
+ expectedShard.setState(ShardType::ShardState::kShardAware);
expectInserts(NamespaceString(ShardType::ConfigNS), {expectedShard.toBSON()});
@@ -920,6 +924,7 @@ TEST_F(AddShardTest, ReAddExistingShard) {
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);
newShard.setHost(connString.toString());
+ newShard.setState(ShardType::ShardState::kShardAware);
// When a shard with the same name already exists, the insert into config.shards will fail
// with a duplicate key error on the shard name.
@@ -989,6 +994,7 @@ TEST_F(AddShardTest, SuccessfullyAddReplicaSet) {
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);
newShard.setHost(connString.toString());
+ newShard.setState(ShardType::ShardState::kShardAware);
expectInserts(NamespaceString(ShardType::ConfigNS), {newShard.toBSON()});
@@ -1050,6 +1056,7 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) {
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);
newShard.setHost(connString.toString());
+ newShard.setState(ShardType::ShardState::kShardAware);
expectInserts(NamespaceString(ShardType::ConfigNS), {newShard.toBSON()});
@@ -1131,6 +1138,7 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) {
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);
newShard.setHost(fullConnString.toString());
+ newShard.setState(ShardType::ShardState::kShardAware);
expectInserts(NamespaceString(ShardType::ConfigNS), {newShard.toBSON()});
@@ -1141,5 +1149,33 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) {
future.timed_get(kFutureTimeout);
}
+TEST_F(AddShardTest, CreateShardIdentityUpsertForAddShard) {
+ std::string shardName = "shardName";
+
+ BSONObj expectedBSON = BSON("update"
+ << "system.version"
+ << "updates"
+ << BSON_ARRAY(BSON(
+ "q" << BSON("_id"
+ << "shardIdentity"
+ << "shardName"
+ << shardName
+ << "clusterId"
+ << _clusterId)
+ << "u"
+ << BSON("$set" << BSON("configsvrConnectionString"
+ << _configConnStr.toString()))
+ << "upsert"
+ << true))
+ << "writeConcern"
+ << BSON("w"
+ << "majority"
+ << "wtimeout"
+ << 15000));
+ ASSERT_EQUALS(
+ expectedBSON,
+ catalogManager()->createShardIdentityUpsertForAddShard(operationContext(), shardName));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index 422821a03df..f199c27eb76 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -42,6 +42,7 @@
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
@@ -70,6 +71,7 @@
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/hostandport.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -79,6 +81,14 @@ using str::stream;
namespace {
+using CallbackHandle = executor::TaskExecutor::CallbackHandle;
+using CallbackArgs = executor::TaskExecutor::CallbackArgs;
+using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
+using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn;
+
+const Seconds kAddShardTaskRetryInterval(30);
+const Seconds kDefaultFindHostMaxWaitTime(20);
+
const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
const ReadPreferenceSetting kConfigPrimarySelector(ReadPreference::PrimaryOnly);
const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0));
@@ -356,6 +366,7 @@ StatusWith<ShardType> ShardingCatalogManagerImpl::_validateHostAsShard(
ShardType shard;
shard.setName(actualShardName);
shard.setHost(actualShardConnStr.toString());
+ shard.setState(ShardType::ShardState::kShardAware);
return shard;
}
@@ -499,26 +510,11 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard(
shardType.setMaxSizeMB(maxSize);
}
- ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString(
- Grid::get(txn)->shardRegistry()->getConfigServerConnectionString());
- shardIdentity.setShardName(shardType.getName());
- shardIdentity.setClusterId(ClusterIdentityLoader::get(txn)->getClusterId());
- auto validateStatus = shardIdentity.validate();
- if (!validateStatus.isOK()) {
- return validateStatus;
- }
+ auto commandRequest = createShardIdentityUpsertForAddShard(txn, shardType.getName());
- log() << "going to insert shardIdentity document into shard: " << shardIdentity.toString();
-
- auto updateRequest = shardIdentity.createUpsertForAddShard();
- BatchedCommandRequest commandRequest(updateRequest.release());
- commandRequest.setNS(NamespaceString::kConfigCollectionNamespace);
- commandRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
-
- auto swCommandResponse =
- _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest.toBSON());
+ LOG(2) << "going to insert shardIdentity document into shard: " << shardType;
+ auto swCommandResponse = _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest);
if (!swCommandResponse.isOK()) {
return swCommandResponse.getStatus();
}
@@ -893,4 +889,258 @@ Status ShardingCatalogManagerImpl::_initConfigIndexes(OperationContext* txn) {
return Status::OK();
}
+Status ShardingCatalogManagerImpl::upsertShardIdentityOnShard(OperationContext* txn,
+ ShardType shardType) {
+
+ auto commandRequest = createShardIdentityUpsertForAddShard(txn, shardType.getName());
+
+ auto swConnString = ConnectionString::parse(shardType.getHost());
+ if (!swConnString.isOK()) {
+ return swConnString.getStatus();
+ }
+
+ // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter
+ // instead.
+ const std::shared_ptr<Shard> shard{
+ Grid::get(txn)->shardRegistry()->createConnection(swConnString.getValue())};
+ invariant(shard);
+ auto targeter = shard->getTargeter();
+
+ _scheduleAddShardTask(
+ std::move(shardType), std::move(targeter), std::move(commandRequest), false);
+
+ return Status::OK();
+}
+
+void ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled(
+ const CallbackArgs& cbArgs,
+ const ShardType shardType,
+ std::shared_ptr<RemoteCommandTargeter> targeter,
+ const BSONObj commandRequest) {
+ if (cbArgs.status == ErrorCodes::CallbackCanceled) {
+ stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex);
+ _untrackAddShardHandle_inlock(shardType.getName());
+ return;
+ }
+ _scheduleAddShardTask(
+ std::move(shardType), std::move(targeter), std::move(commandRequest), true);
+}
+
+void ShardingCatalogManagerImpl::_scheduleAddShardTask(
+ const ShardType shardType,
+ std::shared_ptr<RemoteCommandTargeter> targeter,
+ const BSONObj commandRequest,
+ const bool isRetry) {
+ stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex);
+
+ if (isRetry) {
+ // Untrack the handle from scheduleWorkAt, and schedule a new addShard task.
+ _untrackAddShardHandle_inlock(shardType.getName());
+ } else {
+ // If this is not a retry, only schedule a new addShard task for this shardId if there are
+ // none currently scheduled.
+ if (_hasAddShardHandle_inlock(shardType.getName())) {
+ return;
+ }
+ }
+
+ // Schedule the shardIdentity upsert request to run immediately, and track the handle.
+
+ auto swHost = targeter->findHost(ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ Milliseconds(kDefaultFindHostMaxWaitTime));
+ if (!swHost.isOK()) {
+ // A 3.2 mongos must have previously successfully communicated with hosts in this shard, so
+ // a failure to find a host here is probably transient, and it is safe to retry.
+ warning() << "Failed to find host for shard " << shardType
+ << " when trying to upsert a shardIdentity document, "
+ << causedBy(swHost.getStatus());
+ const Date_t now = _executorForAddShard->now();
+ const Date_t when = now + kAddShardTaskRetryInterval;
+ _trackAddShardHandle_inlock(
+ shardType.getName(),
+ _executorForAddShard->scheduleWorkAt(
+ when,
+ stdx::bind(&ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled,
+ this,
+ stdx::placeholders::_1,
+ std::move(shardType),
+ std::move(targeter),
+ std::move(commandRequest))));
+ return;
+ }
+
+ executor::RemoteCommandRequest request(
+ swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), Seconds(30));
+
+ const RemoteCommandCallbackFn callback =
+ stdx::bind(&ShardingCatalogManagerImpl::_handleAddShardTaskResponse,
+ this,
+ stdx::placeholders::_1,
+ std::move(shardType),
+ std::move(targeter));
+
+ _trackAddShardHandle_inlock(shardType.getName(),
+ _executorForAddShard->scheduleRemoteCommand(request, callback));
+}
+
+void ShardingCatalogManagerImpl::_handleAddShardTaskResponse(
+ const RemoteCommandCallbackArgs& cbArgs,
+ ShardType shardType,
+ std::shared_ptr<RemoteCommandTargeter> targeter) {
+ stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex);
+
+ // Untrack the handle from scheduleRemoteCommand.
+ _untrackAddShardHandle_inlock(shardType.getName());
+
+ Status responseStatus = cbArgs.response.getStatus();
+ if (responseStatus == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+
+ // Examine the response to determine if the upsert succeeded.
+
+ bool rescheduleTask = false;
+
+ auto swResponse = cbArgs.response;
+ if (!swResponse.isOK()) {
+ warning() << "Failed to upsert shardIdentity document during addShard into shard "
+ << shardType.getName() << "(" << shardType.getHost()
+ << "). The shardIdentity upsert will continue to be retried. "
+ << causedBy(swResponse.getStatus());
+ rescheduleTask = true;
+ } else {
+ // Create a CommandResponse object in order to use processBatchWriteResponse.
+ BSONObj responseObj = swResponse.getValue().data.getOwned();
+ BSONObj responseMetadata = swResponse.getValue().metadata.getOwned();
+ Status commandStatus = getStatusFromCommandResult(responseObj);
+ Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj);
+ Shard::CommandResponse commandResponse(std::move(responseObj),
+ std::move(responseMetadata),
+ std::move(commandStatus),
+ std::move(writeConcernStatus));
+
+ BatchedCommandResponse batchResponse;
+ auto batchResponseStatus =
+ Shard::CommandResponse::processBatchWriteResponse(commandResponse, &batchResponse);
+ if (!batchResponseStatus.isOK()) {
+ if (batchResponseStatus == ErrorCodes::DuplicateKey) {
+ warning()
+ << "Received duplicate key error when inserting the shardIdentity "
+ "document into "
+ << shardType.getName() << "(" << shardType.getHost()
+ << "). This means the shard has a shardIdentity document with a clusterId "
+ "that differs from this cluster's clusterId. It may still belong to "
+ "or not have been properly removed from another cluster. The "
+ "shardIdentity upsert will continue to be retried.";
+ } else {
+ warning()
+ << "Failed to upsert shardIdentity document into shard " << shardType.getName()
+ << "(" << shardType.getHost()
+ << ") during addShard. The shardIdentity upsert will continue to be retried. "
+ << causedBy(batchResponseStatus);
+ }
+ rescheduleTask = true;
+ }
+ }
+
+ if (rescheduleTask) {
+ // If the command did not succeed, schedule the task again with a delay.
+ const Date_t now = _executorForAddShard->now();
+ const Date_t when = now + kAddShardTaskRetryInterval;
+
+ // Track the handle from scheduleWorkAt.
+ _trackAddShardHandle_inlock(
+ shardType.getName(),
+ _executorForAddShard->scheduleWorkAt(
+ when,
+ stdx::bind(&ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled,
+ this,
+ stdx::placeholders::_1,
+ std::move(shardType),
+ std::move(targeter),
+ std::move(cbArgs.request.cmdObj))));
+ return;
+ } else {
+ // If the command succeeded, update config.shards to mark the shard as shardAware.
+
+ // Create a unique operation context for this thread, and destroy it at the end of the
+ // scope.
+ Client::initThread("updateShardStateToShardAware");
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ auto txn = cc().makeOperationContext();
+
+ auto updateStatus = _catalogClient->updateConfigDocument(
+ txn.get(),
+ ShardType::ConfigNS,
+ BSON(ShardType::name(shardType.getName())),
+ BSON("$set" << BSON(ShardType::state()
+ << static_cast<std::underlying_type<ShardType::ShardState>::type>(
+ ShardType::ShardState::kShardAware))),
+ false,
+ ShardingCatalogClient::kMajorityWriteConcern);
+
+ // We do not handle a failed response status. If the command failed, when a new config
+ // server transitions to primary, an addShard task for this shard will be run again and
+ // the update to config.shards will be automatically retried then. If it fails because the
+ // shard was removed through the normal removeShard path (so the entry in config.shards
+ // was deleted), no new addShard task will get scheduled on the next transition to primary.
+ if (!updateStatus.isOK()) {
+ warning() << "Failed to mark shard " << shardType.getName() << "("
+ << shardType.getHost()
+ << ") as shardAware in config.shards. This will be retried the next time a "
+ "config server transitions to primary. "
+ << causedBy(updateStatus.getStatus());
+ }
+ }
+}
+
+BSONObj ShardingCatalogManagerImpl::createShardIdentityUpsertForAddShard(
+ OperationContext* txn, const std::string& shardName) {
+ std::unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument());
+
+ BSONObjBuilder query;
+ query.append("_id", "shardIdentity");
+ query.append(ShardIdentityType::shardName(), shardName);
+ query.append(ShardIdentityType::clusterId(), ClusterIdentityLoader::get(txn)->getClusterId());
+ updateDoc->setQuery(query.obj());
+
+ BSONObjBuilder update;
+ {
+ BSONObjBuilder set(update.subobjStart("$set"));
+ set.append(ShardIdentityType::configsvrConnString(),
+ Grid::get(txn)->shardRegistry()->getConfigServerConnectionString().toString());
+ }
+ updateDoc->setUpdateExpr(update.obj());
+ updateDoc->setUpsert(true);
+
+ std::unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest());
+ updateRequest->addToUpdates(updateDoc.release());
+
+ BatchedCommandRequest commandRequest(updateRequest.release());
+ commandRequest.setNS(NamespaceString::kConfigCollectionNamespace);
+ commandRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+
+ return commandRequest.toBSON();
+}
+
+
+bool ShardingCatalogManagerImpl::_hasAddShardHandle_inlock(const ShardId& shardId) {
+ return _addShardHandles.find(shardId) != _addShardHandles.end();
+}
+
+void ShardingCatalogManagerImpl::_trackAddShardHandle_inlock(
+ const ShardId shardId, const StatusWith<CallbackHandle>& handle) {
+ if (handle.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return;
+ }
+ fassert(40219, handle.getStatus());
+ _addShardHandles.insert(std::pair<ShardId, CallbackHandle>(shardId, handle.getValue()));
+}
+
+void ShardingCatalogManagerImpl::_untrackAddShardHandle_inlock(const ShardId& shardId) {
+ auto it = _addShardHandles.find(shardId);
+ invariant(it != _addShardHandles.end());
+ _addShardHandles.erase(shardId);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
index 8e9ab963851..83dd97eabdd 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
@@ -30,6 +30,7 @@
#include <vector>
+#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/stdx/mutex.h"
@@ -37,6 +38,7 @@
namespace mongo {
class DatabaseType;
+class RemoteCommandTargeter;
class ShardingCatalogClient;
class VersionType;
@@ -78,6 +80,12 @@ public:
Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override;
+ Status upsertShardIdentityOnShard(OperationContext* txn, ShardType shardType) override;
+
+
+ BSONObj createShardIdentityUpsertForAddShard(OperationContext* txn,
+ const std::string& shardName) override;
+
private:
/**
* Generates a unique name to be given to a newly added shard.
@@ -122,12 +130,71 @@ private:
const BSONObj& cmdObj);
/**
+ * Returns the current cluster schema/protocol version.
+ */
+ StatusWith<VersionType> _getConfigVersion(OperationContext* txn);
+
+ /**
* Performs the necessary checks for version compatibility and creates a new config.version
* document if the current cluster config is empty.
*/
Status _initConfigVersion(OperationContext* txn);
/**
+ * Callback function used when rescheduling an addShard task after the first attempt failed.
+ * Checks if the callback has been canceled, and if not, proceeds to call
+ * _scheduleAddShardTask.
+ */
+ void _scheduleAddShardTaskUnlessCanceled(const executor::TaskExecutor::CallbackArgs& cbArgs,
+ const ShardType shardType,
+ std::shared_ptr<RemoteCommandTargeter> targeter,
+ const BSONObj commandRequest);
+
+ /**
+ * For rolling upgrade and backwards compatibility with 3.2 mongos, schedules an asynchronous
+ * task against the addShard executor to upsert a shardIdentity doc into the new shard
+ * described by shardType. If there is an existing such task for this shardId (as tracked by
+ * the _addShardHandles map), a new task is not scheduled. There could be an existing such task
+ * if addShard was called previously, but the upsert has not yet succeeded on the shard.
+ */
+ void _scheduleAddShardTask(const ShardType shardType,
+ std::shared_ptr<RemoteCommandTargeter> targeter,
+ const BSONObj commandRequest,
+ const bool isRetry);
+
+ /**
+ * Callback function for the asynchronous upsert of the shardIdentity doc scheduled by
+ * scheduleAddShardTaskIfNeeded. Checks the response from the shard, and updates config.shards
+ * to mark the shard as shardAware on success. On failure to perform the upsert, this callback
+ * schedules scheduleAddShardTaskIfNeeded to be called again after a delay.
+ */
+ void _handleAddShardTaskResponse(
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& cbArgs,
+ ShardType shardType,
+ std::shared_ptr<RemoteCommandTargeter> targeter);
+
+ /**
+ * Checks if a running or scheduled addShard task exists for the shard with id shardId.
+ * The caller must hold _addShardHandlesMutex.
+ */
+ bool _hasAddShardHandle_inlock(const ShardId& shardId);
+
+ /**
+ * Adds CallbackHandle handle for the shard with id shardID to the map of running or scheduled
+ * addShard tasks.
+ * The caller must hold _addShardHandlesMutex.
+ */
+ void _trackAddShardHandle_inlock(
+ const ShardId shardId, const StatusWith<executor::TaskExecutor::CallbackHandle>& handle);
+
+ /**
+ * Removes the handle to a running or scheduled addShard task callback for the shard with id
+ * shardId.
+ * The caller must hold _addShardHandlesMutex.
+ */
+ void _untrackAddShardHandle_inlock(const ShardId& shardId);
+
+ /**
* Builds all the expected indexes on the config server.
*/
Status _initConfigIndexes(OperationContext* txn);
@@ -160,6 +227,19 @@ private:
// True if initializeConfigDatabaseIfNeeded() has been called and returned successfully.
bool _configInitialized = false; // (M)
+
+ // For rolling upgrade and backwards compatibility with 3.2 mongos, maintains a mapping of
+ // a shardId to an outstanding addShard task scheduled against the _executorForAddShard.
+ // A "addShard" task upserts the shardIdentity document into the new shard. Such a task is
+ // scheduled:
+ // 1) on a config server's transition to primary for each shard in config.shards that is not
+ // marked as sharding aware
+ // 2) on a direct insert to the config.shards collection (usually from a 3.2 mongos).
+ // This map tracks that only one such task per shard can be running at a time.
+ std::map<ShardId, executor::TaskExecutor::CallbackHandle> _addShardHandles;
+
+ // Protects the _addShardHandles map.
+ stdx::mutex _addShardHandlesMutex;
};
} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h
index 816ef6adff6..a8e80b67937 100644
--- a/src/mongo/s/catalog/sharding_catalog_client.h
+++ b/src/mongo/s/catalog/sharding_catalog_client.h
@@ -384,7 +384,7 @@ public:
* Directly inserts a document in the specified namespace on the config server. The document
* must have an _id index. Must only be used for insertions in the 'config' database.
*
- * NOTE: Should not be used in new code. Instead add a new metadata operation to the interface.
+ * NOTE: Should not be used in new code outside the ShardingCatalogManager.
*/
virtual Status insertConfigDocument(OperationContext* txn,
const std::string& ns,
@@ -403,7 +403,7 @@ public:
* was upserted or it existed and any of the fields changed) and false otherwise (basically
* returns whether the update command's response update.n value is > 0).
*
- * NOTE: Should not be used in new code. Instead add a new metadata operation to the interface.
+ * NOTE: Should not be used in new code outside the ShardingCatalogManager.
*/
virtual StatusWith<bool> updateConfigDocument(OperationContext* txn,
const std::string& ns,
@@ -416,7 +416,7 @@ public:
* Removes documents matching a particular query predicate from the specified namespace on the
* config server. Must only be used for deletions from the 'config' database.
*
- * NOTE: Should not be used in new code. Instead add a new metadata operation to the interface.
+ * NOTE: Should not be used in new code outside the ShardingCatalogManager.
*/
virtual Status removeConfigDocuments(OperationContext* txn,
const std::string& ns,
diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h
index c830e03a362..73286475d9a 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager.h
@@ -31,11 +31,16 @@
#include <string>
#include "mongo/base/disallow_copying.h"
+#include "mongo/stdx/memory.h"
namespace mongo {
+class BSONObj;
class ConnectionString;
class OperationContext;
+class RemoteCommandTargeter;
+class ShardId;
+class ShardType;
class Status;
template <typename T>
class StatusWith;
@@ -118,6 +123,23 @@ public:
*/
virtual Status initializeConfigDatabaseIfNeeded(OperationContext* txn) = 0;
+ /**
+ * For rolling upgrade and backwards compatibility with 3.2 mongos, schedules an asynchronous
+ * task against addShard executor to upsert a shardIdentity doc into the new shard described by
+ * shardType. On failure to upsert the doc on the shard, the task reschedules itself with a
+ * delay indefinitely, and is canceled only when a removeShard is called.
+ */
+ virtual Status upsertShardIdentityOnShard(OperationContext* txn, ShardType shardType) = 0;
+
+ /**
+ * Returns a BSON representation of an update request that can be used to insert a
+ * shardIdentity doc into the shard for the given shardType (or update the shard's existing
+ * shardIdentity doc's configsvrConnString if the _id, shardName, and clusterId do not
+ * conflict).
+ */
+ virtual BSONObj createShardIdentityUpsertForAddShard(OperationContext* txn,
+ const std::string& shardName) = 0;
+
protected:
ShardingCatalogManager() = default;
};
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
index 7ba2b08926f..906a4797025 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
@@ -74,4 +74,14 @@ Status ShardingCatalogManagerMock::initializeConfigDatabaseIfNeeded(OperationCon
return {ErrorCodes::InternalError, "Method not implemented"};
}
+Status ShardingCatalogManagerMock::upsertShardIdentityOnShard(OperationContext* txn,
+ ShardType shardType) {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
+BSONObj ShardingCatalogManagerMock::createShardIdentityUpsertForAddShard(
+ OperationContext* txn, const std::string& shardName) {
+ MONGO_UNREACHABLE;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
index 49057fc1bde..20bca3ec435 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
@@ -28,7 +28,10 @@
#pragma once
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/s/type_shard_identity.h"
#include "mongo/s/catalog/sharding_catalog_manager.h"
+#include "mongo/s/catalog/type_shard.h"
namespace mongo {
@@ -60,6 +63,11 @@ public:
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override;
+
+ Status upsertShardIdentityOnShard(OperationContext* txn, ShardType shardType) override;
+
+ BSONObj createShardIdentityUpsertForAddShard(OperationContext* txn,
+ const std::string& shardName) override;
};
} // namespace mongo
diff --git a/src/mongo/s/catalog/type_shard.cpp b/src/mongo/s/catalog/type_shard.cpp
index ca01ec8ec2f..0b97a3d3cd6 100644
--- a/src/mongo/s/catalog/type_shard.cpp
+++ b/src/mongo/s/catalog/type_shard.cpp
@@ -34,6 +34,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/util/bson_extract.h"
+#include "mongo/s/grid.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/mongoutils/str.h"
@@ -46,7 +47,7 @@ const BSONField<std::string> ShardType::host("host");
const BSONField<bool> ShardType::draining("draining");
const BSONField<long long> ShardType::maxSizeMB("maxSize");
const BSONField<BSONArray> ShardType::tags("tags");
-
+const BSONField<ShardType::ShardState> ShardType::state("state");
StatusWith<ShardType> ShardType::fromBSON(const BSONObj& source) {
ShardType shard;
@@ -112,6 +113,27 @@ StatusWith<ShardType> ShardType::fromBSON(const BSONObj& source) {
}
}
+ {
+ long long shardState;
+ Status status = bsonExtractIntegerField(source, state.name(), &shardState);
+ if (status.isOK()) {
+ // Make sure the state field falls within the valid range of ShardState values.
+ if (!(shardState >= static_cast<std::underlying_type<ShardState>::type>(
+ ShardState::kNotShardAware) &&
+ shardState <= static_cast<std::underlying_type<ShardState>::type>(
+ ShardState::kShardAware))) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "Invalid shard state value: " << shardState);
+ } else {
+ shard._state = static_cast<ShardState>(shardState);
+ }
+ } else if (status == ErrorCodes::NoSuchKey) {
+ // state field can be mssing in which case it is presumed kNotShardAware
+ } else {
+ return status;
+ }
+ }
+
return shard;
}
@@ -146,6 +168,8 @@ BSONObj ShardType::toBSON() const {
builder.append(maxSizeMB(), getMaxSizeMB());
if (_tags)
builder.append(tags(), getTags());
+ if (_state)
+ builder.append(state(), static_cast<std::underlying_type<ShardState>::type>(getState()));
return builder.obj();
}
@@ -176,5 +200,9 @@ void ShardType::setTags(const std::vector<std::string>& tags) {
invariant(tags.size() > 0);
_tags = tags;
}
+void ShardType::setState(const ShardState state) {
+ invariant(!_state.is_initialized());
+ _state = state;
+}
} // namespace mongo
diff --git a/src/mongo/s/catalog/type_shard.h b/src/mongo/s/catalog/type_shard.h
index 9b30e1a9505..a86481c7fd0 100644
--- a/src/mongo/s/catalog/type_shard.h
+++ b/src/mongo/s/catalog/type_shard.h
@@ -33,6 +33,8 @@
#include <vector>
#include "mongo/db/jsobj.h"
+#include "mongo/s/shard_id.h"
+#include "mongo/s/write_ops/batched_update_request.h"
namespace mongo {
@@ -49,6 +51,11 @@ class StatusWith;
*/
class ShardType {
public:
+ enum class ShardState : int {
+ kNotShardAware = 0,
+ kShardAware,
+ };
+
// Name of the shards collection in the config server.
static const std::string ConfigNS;
@@ -58,6 +65,7 @@ public:
static const BSONField<bool> draining;
static const BSONField<long long> maxSizeMB;
static const BSONField<BSONArray> tags;
+ static const BSONField<ShardState> state;
/**
@@ -107,6 +115,11 @@ public:
}
void setTags(const std::vector<std::string>& tags);
+ const ShardState getState() const {
+ return _state.value_or(ShardState::kNotShardAware);
+ }
+ void setState(const ShardState state);
+
private:
// Convention: (M)andatory, (O)ptional, (S)pecial rule.
@@ -120,6 +133,8 @@ private:
boost::optional<long long> _maxSizeMB;
// (O) shard tags
boost::optional<std::vector<std::string>> _tags;
+ // (O) shard state
+ boost::optional<ShardState> _state;
};
} // namespace mongo