diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2016-06-16 01:04:29 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2016-07-14 16:32:10 -0400 |
commit | 2ea2c7700d9ccca1150e49c181c97b948889df5e (patch) | |
tree | 1e8d89f6506ea1a7fee2a172d7138e866dd1c111 | |
parent | af5daa51506541d9526ce576f2432809003d2432 (diff) | |
download | mongo-2ea2c7700d9ccca1150e49c181c97b948889df5e.tar.gz |
SERVER-22660 OpObserver on config server for inserts to config.shards from old mongos
-rw-r--r-- | jstests/sharding/shard_aware_on_add_shard.js | 34 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 50 | ||||
-rw-r--r-- | src/mongo/db/s/type_shard_identity.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/s/type_shard_identity.h | 16 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp | 50 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp | 286 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h | 80 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client.h | 6 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager.h | 22 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_mock.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_mock.h | 8 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_shard.cpp | 30 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_shard.h | 15 |
13 files changed, 561 insertions, 81 deletions
diff --git a/jstests/sharding/shard_aware_on_add_shard.js b/jstests/sharding/shard_aware_on_add_shard.js index 9b7045c5dca..4442aed7041 100644 --- a/jstests/sharding/shard_aware_on_add_shard.js +++ b/jstests/sharding/shard_aware_on_add_shard.js @@ -14,12 +14,32 @@ }; var checkShardingStateInitialized = function(conn, configConnStr, shardName, clusterId) { - var res = conn.getDB('admin').runCommand({shardingState: 1}); - assert.commandWorked(res); - assert(res.enabled); - assert.eq(configConnStr, res.configServer); - assert.eq(shardName, res.shardName); - assert.eq(clusterId, res.clusterId); + // TODO: SERVER-22665 a mixed-version test should be written specifically testing receiving + // addShard from a legacy mongos, and this assert.soon() should be changed back to assert + // synchronously. + assert.soon(function() { + var res = conn.getDB('admin').runCommand({shardingState: 1}); + assert.commandWorked(res); + if (res.enabled && (configConnStr === res.configServer) && + (shardName === res.shardName) && (clusterId.equals(res.clusterId))) { + return true; + } + return false; + }); + }; + + var checkShardMarkedAsShardAware = function(mongosConn, shardName) { + // TODO: SERVER-22665 a mixed-version test should be written specifically testing receiving + // addShard from a legacy mongos, and this assert.soon() should be changed back to assert + // synchronously. + assert.soon(function() { + var res = mongosConn.getDB('config').getCollection('shards').findOne({_id: shardName}); + assert.neq(null, res, "Could not find new shard " + shardName + " in config.shards"); + if (res.state && res.state === 1) { + return true; + } + return false; + }); }; // Create the cluster to test adding shards to. @@ -35,6 +55,7 @@ var newShardName = "newShard"; assert.commandWorked(st.s.adminCommand({addShard: standaloneConn.name, name: newShardName})); checkShardingStateInitialized(standaloneConn, st.configRS.getURL(), newShardName, clusterId); + checkShardMarkedAsShardAware(st.s, newShardName); MongoRunner.stopMongod(standaloneConn.port); @@ -49,6 +70,7 @@ assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: replTest.getURL()})); checkShardingStateInitialized( replTest.getPrimary(), st.configRS.getURL(), replTest.getURL(), clusterId); + checkShardMarkedAsShardAware(st.s, newShardName); replTest.stopSet(); diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 226a5d65af1..9ccf10791e9 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -44,19 +44,24 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_options.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" +#include "mongo/s/catalog/type_shard.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" namespace mongo { namespace { +using std::string; + /** * Used to perform shard identity initialization once it is certain that the document is committed. */ -class ShardIdentityLopOpHandler final : public RecoveryUnit::Change { +class ShardIdentityLogOpHandler final : public RecoveryUnit::Change { public: - ShardIdentityLopOpHandler(OperationContext* txn, ShardIdentityType shardIdentity) + ShardIdentityLogOpHandler(OperationContext* txn, ShardIdentityType shardIdentity) : _txn(txn), _shardIdentity(std::move(shardIdentity)) {} void commit() override { @@ -72,9 +77,32 @@ private: const ShardIdentityType _shardIdentity; }; -} // unnamed namespace +/** + * Used by the config server for backwards compatibility with 3.2 mongos to upsert a shardIdentity + * document (and thereby perform shard aware initialization) on a newly added shard. + */ +class LegacyAddShardLogOpHandler final : public RecoveryUnit::Change { +public: + LegacyAddShardLogOpHandler(OperationContext* txn, ShardType shardType) + : _txn(txn), _shardType(std::move(shardType)) {} -using std::string; + void commit() override { + // Only the primary should complete the addShard process by upserting the shardIdentity on + // the new shard. + if (repl::getGlobalReplicationCoordinator()->getMemberState().primary()) { + uassertStatusOK( + Grid::get(_txn)->catalogManager()->upsertShardIdentityOnShard(_txn, _shardType)); + } + } + + void rollback() override {} + +private: + OperationContext* _txn; + const ShardType _shardType; +}; + +} // unnamed namespace CollectionShardingState::CollectionShardingState( NamespaceString nss, std::unique_ptr<CollectionMetadata> initialMetadata) @@ -166,11 +194,23 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i auto shardIdentityDoc = uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc)); uassertStatusOK(shardIdentityDoc.validate()); txn->recoveryUnit()->registerChange( - new ShardIdentityLopOpHandler(txn, std::move(shardIdentityDoc))); + new ShardIdentityLogOpHandler(txn, std::move(shardIdentityDoc))); } } } + // For backwards compatibility with 3.2 mongos, perform share aware initialization on a newly + // added shard on inserts to config.shards missing the "state" field. (On addShard, a 3.2 + // mongos performs the insert into config.shards without a "state" field.) + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer && + _nss == ShardType::ConfigNS) { + if (insertedDoc[ShardType::state.name()].eoo()) { + const auto shardType = uassertStatusOK(ShardType::fromBSON(insertedDoc)); + txn->recoveryUnit()->registerChange( + new LegacyAddShardLogOpHandler(txn, std::move(shardType))); + } + } + checkShardVersionOrThrow(txn); if (_sourceMgr) { diff --git a/src/mongo/db/s/type_shard_identity.cpp b/src/mongo/db/s/type_shard_identity.cpp index 6188f08a109..3eef36c9a22 100644 --- a/src/mongo/db/s/type_shard_identity.cpp +++ b/src/mongo/db/s/type_shard_identity.cpp @@ -40,13 +40,9 @@ namespace mongo { const std::string ShardIdentityType::IdName("shardIdentity"); -namespace { - -const BSONField<std::string> configsvrConnString("configsvrConnectionString"); -const BSONField<std::string> shardName("shardName"); -const BSONField<OID> clusterId("clusterId"); - -} // unnamed namespace +const BSONField<std::string> ShardIdentityType::configsvrConnString("configsvrConnectionString"); +const BSONField<std::string> ShardIdentityType::shardName("shardName"); +const BSONField<OID> ShardIdentityType::clusterId("clusterId"); StatusWith<ShardIdentityType> ShardIdentityType::fromBSON(const BSONObj& source) { if (!source.hasField("_id")) { @@ -206,31 +202,6 @@ void ShardIdentityType::setClusterId(OID clusterId) { _clusterId = std::move(clusterId); } -std::unique_ptr<BatchedUpdateRequest> ShardIdentityType::createUpsertForAddShard() const { - invariant(validate().isOK()); - - std::unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument()); - - BSONObjBuilder query; - query.append("_id", "shardIdentity"); - query.append("shardName", getShardName()); - query.append("clusterId", getClusterId()); - updateDoc->setQuery(query.obj()); - - BSONObjBuilder update; - BSONObjBuilder setConfigBuilder(update.subobjStart("$set")); - setConfigBuilder.append(configsvrConnString(), getConfigsvrConnString().toString()); - setConfigBuilder.doneFast(); - updateDoc->setUpdateExpr(update.obj()); - - updateDoc->setUpsert(true); - - std::unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest()); - updateRequest->addToUpdates(updateDoc.release()); - - return updateRequest; -} - BSONObj ShardIdentityType::createConfigServerUpdateObject(const std::string& newConnString) { BSONObjBuilder builder; BSONObjBuilder setConfigBuilder(builder.subobjStart("$set")); diff --git a/src/mongo/db/s/type_shard_identity.h b/src/mongo/db/s/type_shard_identity.h index 9c51913fe0f..ce29024b81b 100644 --- a/src/mongo/db/s/type_shard_identity.h +++ b/src/mongo/db/s/type_shard_identity.h @@ -41,11 +41,16 @@ namespace mongo { */ class ShardIdentityType { public: - ShardIdentityType() = default; - // The _id value for this document type. static const std::string IdName; + // Field names and types in a shardIdentity document. + static const BSONField<std::string> configsvrConnString; + static const BSONField<std::string> shardName; + static const BSONField<OID> clusterId; + + ShardIdentityType() = default; + /** * Constructs a new ShardIdentityType object from BSON. * Also does validation of the contents. @@ -81,13 +86,6 @@ public: void setClusterId(OID clusterId); /** - * Returns an update object that can be used to insert a shardIdentity doc on a shard or update - * the existing shardIdentity doc's configsvrConnString (if the _id, shardName, and clusterId - * match those in this ShardIdentityType instance). - */ - std::unique_ptr<BatchedUpdateRequest> createUpsertForAddShard() const; - - /** * Returns an update object that can be used to update the config server field of the * shardIdentity document with the new connection string. */ diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp index f3405100655..048cf792687 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp @@ -49,6 +49,7 @@ #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_identity_loader.h" +#include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/s/write_ops/batched_insert_request.h" #include "mongo/s/write_ops/batched_update_document.h" @@ -185,17 +186,18 @@ protected: */ void expectShardIdentityUpsert(const HostAndPort& expectedHost, const std::string& expectedShardName) { + // Create the expected upsert shardIdentity command for this shardType. + auto upsertCmdObj = catalogManager()->createShardIdentityUpsertForAddShard( + operationContext(), expectedShardName); - ShardIdentityType expectedShardIdentity; - expectedShardIdentity.setShardName(expectedShardName); - expectedShardIdentity.setClusterId(_clusterId); - expectedShardIdentity.setConfigsvrConnString(_configConnStr); - invariant(expectedShardIdentity.validate().isOK()); + // Get the BatchedUpdateRequest from the upsert command. + BatchedCommandRequest request(BatchedCommandRequest::BatchType::BatchType_Update); + std::string errMsg; + invariant(request.parseBSON("admin", upsertCmdObj, &errMsg) || !request.isValid(&errMsg)); - auto updateRequest = expectedShardIdentity.createUpsertForAddShard(); expectUpdates(expectedHost, NamespaceString(NamespaceString::kConfigCollectionNamespace), - updateRequest.get()); + request.getUpdateRequest()); } /** @@ -380,6 +382,7 @@ TEST_F(AddShardTest, Standalone) { expectedShard.setName(expectedShardName); expectedShard.setHost("StandaloneHost:12345"); expectedShard.setMaxSizeMB(100); + expectedShard.setState(ShardType::ShardState::kShardAware); expectInserts(NamespaceString(ShardType::ConfigNS), {expectedShard.toBSON()}); @@ -480,6 +483,7 @@ TEST_F(AddShardTest, StandaloneGenerateName) { expectedShard.setName(expectedShardName); expectedShard.setHost(shardTarget.toString()); expectedShard.setMaxSizeMB(100); + expectedShard.setState(ShardType::ShardState::kShardAware); expectInserts(NamespaceString(ShardType::ConfigNS), {expectedShard.toBSON()}); @@ -920,6 +924,7 @@ TEST_F(AddShardTest, ReAddExistingShard) { newShard.setName(expectedShardName); newShard.setMaxSizeMB(100); newShard.setHost(connString.toString()); + newShard.setState(ShardType::ShardState::kShardAware); // When a shard with the same name already exists, the insert into config.shards will fail // with a duplicate key error on the shard name. @@ -989,6 +994,7 @@ TEST_F(AddShardTest, SuccessfullyAddReplicaSet) { newShard.setName(expectedShardName); newShard.setMaxSizeMB(100); newShard.setHost(connString.toString()); + newShard.setState(ShardType::ShardState::kShardAware); expectInserts(NamespaceString(ShardType::ConfigNS), {newShard.toBSON()}); @@ -1050,6 +1056,7 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) { newShard.setName(expectedShardName); newShard.setMaxSizeMB(100); newShard.setHost(connString.toString()); + newShard.setState(ShardType::ShardState::kShardAware); expectInserts(NamespaceString(ShardType::ConfigNS), {newShard.toBSON()}); @@ -1131,6 +1138,7 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) { newShard.setName(expectedShardName); newShard.setMaxSizeMB(100); newShard.setHost(fullConnString.toString()); + newShard.setState(ShardType::ShardState::kShardAware); expectInserts(NamespaceString(ShardType::ConfigNS), {newShard.toBSON()}); @@ -1141,5 +1149,33 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) { future.timed_get(kFutureTimeout); } +TEST_F(AddShardTest, CreateShardIdentityUpsertForAddShard) { + std::string shardName = "shardName"; + + BSONObj expectedBSON = BSON("update" + << "system.version" + << "updates" + << BSON_ARRAY(BSON( + "q" << BSON("_id" + << "shardIdentity" + << "shardName" + << shardName + << "clusterId" + << _clusterId) + << "u" + << BSON("$set" << BSON("configsvrConnectionString" + << _configConnStr.toString())) + << "upsert" + << true)) + << "writeConcern" + << BSON("w" + << "majority" + << "wtimeout" + << 15000)); + ASSERT_EQUALS( + expectedBSON, + catalogManager()->createShardIdentityUpsertForAddShard(operationContext(), shardName)); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index 422821a03df..f199c27eb76 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -42,6 +42,7 @@ #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" @@ -70,6 +71,7 @@ #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" +#include "mongo/util/scopeguard.h" namespace mongo { @@ -79,6 +81,14 @@ using str::stream; namespace { +using CallbackHandle = executor::TaskExecutor::CallbackHandle; +using CallbackArgs = executor::TaskExecutor::CallbackArgs; +using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; +using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn; + +const Seconds kAddShardTaskRetryInterval(30); +const Seconds kDefaultFindHostMaxWaitTime(20); + const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); const ReadPreferenceSetting kConfigPrimarySelector(ReadPreference::PrimaryOnly); const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); @@ -356,6 +366,7 @@ StatusWith<ShardType> ShardingCatalogManagerImpl::_validateHostAsShard( ShardType shard; shard.setName(actualShardName); shard.setHost(actualShardConnStr.toString()); + shard.setState(ShardType::ShardState::kShardAware); return shard; } @@ -499,26 +510,11 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard( shardType.setMaxSizeMB(maxSize); } - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - Grid::get(txn)->shardRegistry()->getConfigServerConnectionString()); - shardIdentity.setShardName(shardType.getName()); - shardIdentity.setClusterId(ClusterIdentityLoader::get(txn)->getClusterId()); - auto validateStatus = shardIdentity.validate(); - if (!validateStatus.isOK()) { - return validateStatus; - } + auto commandRequest = createShardIdentityUpsertForAddShard(txn, shardType.getName()); - log() << "going to insert shardIdentity document into shard: " << shardIdentity.toString(); - - auto updateRequest = shardIdentity.createUpsertForAddShard(); - BatchedCommandRequest commandRequest(updateRequest.release()); - commandRequest.setNS(NamespaceString::kConfigCollectionNamespace); - commandRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); - - auto swCommandResponse = - _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest.toBSON()); + LOG(2) << "going to insert shardIdentity document into shard: " << shardType; + auto swCommandResponse = _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest); if (!swCommandResponse.isOK()) { return swCommandResponse.getStatus(); } @@ -893,4 +889,258 @@ Status ShardingCatalogManagerImpl::_initConfigIndexes(OperationContext* txn) { return Status::OK(); } +Status ShardingCatalogManagerImpl::upsertShardIdentityOnShard(OperationContext* txn, + ShardType shardType) { + + auto commandRequest = createShardIdentityUpsertForAddShard(txn, shardType.getName()); + + auto swConnString = ConnectionString::parse(shardType.getHost()); + if (!swConnString.isOK()) { + return swConnString.getStatus(); + } + + // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter + // instead. + const std::shared_ptr<Shard> shard{ + Grid::get(txn)->shardRegistry()->createConnection(swConnString.getValue())}; + invariant(shard); + auto targeter = shard->getTargeter(); + + _scheduleAddShardTask( + std::move(shardType), std::move(targeter), std::move(commandRequest), false); + + return Status::OK(); +} + +void ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled( + const CallbackArgs& cbArgs, + const ShardType shardType, + std::shared_ptr<RemoteCommandTargeter> targeter, + const BSONObj commandRequest) { + if (cbArgs.status == ErrorCodes::CallbackCanceled) { + stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex); + _untrackAddShardHandle_inlock(shardType.getName()); + return; + } + _scheduleAddShardTask( + std::move(shardType), std::move(targeter), std::move(commandRequest), true); +} + +void ShardingCatalogManagerImpl::_scheduleAddShardTask( + const ShardType shardType, + std::shared_ptr<RemoteCommandTargeter> targeter, + const BSONObj commandRequest, + const bool isRetry) { + stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex); + + if (isRetry) { + // Untrack the handle from scheduleWorkAt, and schedule a new addShard task. + _untrackAddShardHandle_inlock(shardType.getName()); + } else { + // If this is not a retry, only schedule a new addShard task for this shardId if there are + // none currently scheduled. + if (_hasAddShardHandle_inlock(shardType.getName())) { + return; + } + } + + // Schedule the shardIdentity upsert request to run immediately, and track the handle. + + auto swHost = targeter->findHost(ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + Milliseconds(kDefaultFindHostMaxWaitTime)); + if (!swHost.isOK()) { + // A 3.2 mongos must have previously successfully communicated with hosts in this shard, so + // a failure to find a host here is probably transient, and it is safe to retry. + warning() << "Failed to find host for shard " << shardType + << " when trying to upsert a shardIdentity document, " + << causedBy(swHost.getStatus()); + const Date_t now = _executorForAddShard->now(); + const Date_t when = now + kAddShardTaskRetryInterval; + _trackAddShardHandle_inlock( + shardType.getName(), + _executorForAddShard->scheduleWorkAt( + when, + stdx::bind(&ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled, + this, + stdx::placeholders::_1, + std::move(shardType), + std::move(targeter), + std::move(commandRequest)))); + return; + } + + executor::RemoteCommandRequest request( + swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), Seconds(30)); + + const RemoteCommandCallbackFn callback = + stdx::bind(&ShardingCatalogManagerImpl::_handleAddShardTaskResponse, + this, + stdx::placeholders::_1, + std::move(shardType), + std::move(targeter)); + + _trackAddShardHandle_inlock(shardType.getName(), + _executorForAddShard->scheduleRemoteCommand(request, callback)); +} + +void ShardingCatalogManagerImpl::_handleAddShardTaskResponse( + const RemoteCommandCallbackArgs& cbArgs, + ShardType shardType, + std::shared_ptr<RemoteCommandTargeter> targeter) { + stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex); + + // Untrack the handle from scheduleRemoteCommand. + _untrackAddShardHandle_inlock(shardType.getName()); + + Status responseStatus = cbArgs.response.getStatus(); + if (responseStatus == ErrorCodes::CallbackCanceled) { + return; + } + + // Examine the response to determine if the upsert succeeded. + + bool rescheduleTask = false; + + auto swResponse = cbArgs.response; + if (!swResponse.isOK()) { + warning() << "Failed to upsert shardIdentity document during addShard into shard " + << shardType.getName() << "(" << shardType.getHost() + << "). The shardIdentity upsert will continue to be retried. " + << causedBy(swResponse.getStatus()); + rescheduleTask = true; + } else { + // Create a CommandResponse object in order to use processBatchWriteResponse. + BSONObj responseObj = swResponse.getValue().data.getOwned(); + BSONObj responseMetadata = swResponse.getValue().metadata.getOwned(); + Status commandStatus = getStatusFromCommandResult(responseObj); + Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); + Shard::CommandResponse commandResponse(std::move(responseObj), + std::move(responseMetadata), + std::move(commandStatus), + std::move(writeConcernStatus)); + + BatchedCommandResponse batchResponse; + auto batchResponseStatus = + Shard::CommandResponse::processBatchWriteResponse(commandResponse, &batchResponse); + if (!batchResponseStatus.isOK()) { + if (batchResponseStatus == ErrorCodes::DuplicateKey) { + warning() + << "Received duplicate key error when inserting the shardIdentity " + "document into " + << shardType.getName() << "(" << shardType.getHost() + << "). This means the shard has a shardIdentity document with a clusterId " + "that differs from this cluster's clusterId. It may still belong to " + "or not have been properly removed from another cluster. The " + "shardIdentity upsert will continue to be retried."; + } else { + warning() + << "Failed to upsert shardIdentity document into shard " << shardType.getName() + << "(" << shardType.getHost() + << ") during addShard. The shardIdentity upsert will continue to be retried. " + << causedBy(batchResponseStatus); + } + rescheduleTask = true; + } + } + + if (rescheduleTask) { + // If the command did not succeed, schedule the task again with a delay. + const Date_t now = _executorForAddShard->now(); + const Date_t when = now + kAddShardTaskRetryInterval; + + // Track the handle from scheduleWorkAt. + _trackAddShardHandle_inlock( + shardType.getName(), + _executorForAddShard->scheduleWorkAt( + when, + stdx::bind(&ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled, + this, + stdx::placeholders::_1, + std::move(shardType), + std::move(targeter), + std::move(cbArgs.request.cmdObj)))); + return; + } else { + // If the command succeeded, update config.shards to mark the shard as shardAware. + + // Create a unique operation context for this thread, and destroy it at the end of the + // scope. + Client::initThread("updateShardStateToShardAware"); + ON_BLOCK_EXIT([&] { Client::destroy(); }); + auto txn = cc().makeOperationContext(); + + auto updateStatus = _catalogClient->updateConfigDocument( + txn.get(), + ShardType::ConfigNS, + BSON(ShardType::name(shardType.getName())), + BSON("$set" << BSON(ShardType::state() + << static_cast<std::underlying_type<ShardType::ShardState>::type>( + ShardType::ShardState::kShardAware))), + false, + ShardingCatalogClient::kMajorityWriteConcern); + + // We do not handle a failed response status. If the command failed, when a new config + // server transitions to primary, an addShard task for this shard will be run again and + // the update to config.shards will be automatically retried then. If it fails because the + // shard was removed through the normal removeShard path (so the entry in config.shards + // was deleted), no new addShard task will get scheduled on the next transition to primary. + if (!updateStatus.isOK()) { + warning() << "Failed to mark shard " << shardType.getName() << "(" + << shardType.getHost() + << ") as shardAware in config.shards. This will be retried the next time a " + "config server transitions to primary. " + << causedBy(updateStatus.getStatus()); + } + } +} + +BSONObj ShardingCatalogManagerImpl::createShardIdentityUpsertForAddShard( + OperationContext* txn, const std::string& shardName) { + std::unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument()); + + BSONObjBuilder query; + query.append("_id", "shardIdentity"); + query.append(ShardIdentityType::shardName(), shardName); + query.append(ShardIdentityType::clusterId(), ClusterIdentityLoader::get(txn)->getClusterId()); + updateDoc->setQuery(query.obj()); + + BSONObjBuilder update; + { + BSONObjBuilder set(update.subobjStart("$set")); + set.append(ShardIdentityType::configsvrConnString(), + Grid::get(txn)->shardRegistry()->getConfigServerConnectionString().toString()); + } + updateDoc->setUpdateExpr(update.obj()); + updateDoc->setUpsert(true); + + std::unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest()); + updateRequest->addToUpdates(updateDoc.release()); + + BatchedCommandRequest commandRequest(updateRequest.release()); + commandRequest.setNS(NamespaceString::kConfigCollectionNamespace); + commandRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + + return commandRequest.toBSON(); +} + + +bool ShardingCatalogManagerImpl::_hasAddShardHandle_inlock(const ShardId& shardId) { + return _addShardHandles.find(shardId) != _addShardHandles.end(); +} + +void ShardingCatalogManagerImpl::_trackAddShardHandle_inlock( + const ShardId shardId, const StatusWith<CallbackHandle>& handle) { + if (handle.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(40219, handle.getStatus()); + _addShardHandles.insert(std::pair<ShardId, CallbackHandle>(shardId, handle.getValue())); +} + +void ShardingCatalogManagerImpl::_untrackAddShardHandle_inlock(const ShardId& shardId) { + auto it = _addShardHandles.find(shardId); + invariant(it != _addShardHandles.end()); + _addShardHandles.erase(shardId); +} + } // namespace mongo diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h index 8e9ab963851..83dd97eabdd 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h @@ -30,6 +30,7 @@ #include <vector> +#include "mongo/executor/task_executor.h" #include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/stdx/mutex.h" @@ -37,6 +38,7 @@ namespace mongo { class DatabaseType; +class RemoteCommandTargeter; class ShardingCatalogClient; class VersionType; @@ -78,6 +80,12 @@ public: Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override; + Status upsertShardIdentityOnShard(OperationContext* txn, ShardType shardType) override; + + + BSONObj createShardIdentityUpsertForAddShard(OperationContext* txn, + const std::string& shardName) override; + private: /** * Generates a unique name to be given to a newly added shard. @@ -122,12 +130,71 @@ private: const BSONObj& cmdObj); /** + * Returns the current cluster schema/protocol version. + */ + StatusWith<VersionType> _getConfigVersion(OperationContext* txn); + + /** * Performs the necessary checks for version compatibility and creates a new config.version * document if the current cluster config is empty. */ Status _initConfigVersion(OperationContext* txn); /** + * Callback function used when rescheduling an addShard task after the first attempt failed. + * Checks if the callback has been canceled, and if not, proceeds to call + * _scheduleAddShardTask. + */ + void _scheduleAddShardTaskUnlessCanceled(const executor::TaskExecutor::CallbackArgs& cbArgs, + const ShardType shardType, + std::shared_ptr<RemoteCommandTargeter> targeter, + const BSONObj commandRequest); + + /** + * For rolling upgrade and backwards compatibility with 3.2 mongos, schedules an asynchronous + * task against the addShard executor to upsert a shardIdentity doc into the new shard + * described by shardType. If there is an existing such task for this shardId (as tracked by + * the _addShardHandles map), a new task is not scheduled. There could be an existing such task + * if addShard was called previously, but the upsert has not yet succeeded on the shard. + */ + void _scheduleAddShardTask(const ShardType shardType, + std::shared_ptr<RemoteCommandTargeter> targeter, + const BSONObj commandRequest, + const bool isRetry); + + /** + * Callback function for the asynchronous upsert of the shardIdentity doc scheduled by + * scheduleAddShardTaskIfNeeded. Checks the response from the shard, and updates config.shards + * to mark the shard as shardAware on success. On failure to perform the upsert, this callback + * schedules scheduleAddShardTaskIfNeeded to be called again after a delay. + */ + void _handleAddShardTaskResponse( + const executor::TaskExecutor::RemoteCommandCallbackArgs& cbArgs, + ShardType shardType, + std::shared_ptr<RemoteCommandTargeter> targeter); + + /** + * Checks if a running or scheduled addShard task exists for the shard with id shardId. + * The caller must hold _addShardHandlesMutex. + */ + bool _hasAddShardHandle_inlock(const ShardId& shardId); + + /** + * Adds CallbackHandle handle for the shard with id shardID to the map of running or scheduled + * addShard tasks. + * The caller must hold _addShardHandlesMutex. + */ + void _trackAddShardHandle_inlock( + const ShardId shardId, const StatusWith<executor::TaskExecutor::CallbackHandle>& handle); + + /** + * Removes the handle to a running or scheduled addShard task callback for the shard with id + * shardId. + * The caller must hold _addShardHandlesMutex. + */ + void _untrackAddShardHandle_inlock(const ShardId& shardId); + + /** * Builds all the expected indexes on the config server. */ Status _initConfigIndexes(OperationContext* txn); @@ -160,6 +227,19 @@ private: // True if initializeConfigDatabaseIfNeeded() has been called and returned successfully. bool _configInitialized = false; // (M) + + // For rolling upgrade and backwards compatibility with 3.2 mongos, maintains a mapping of + // a shardId to an outstanding addShard task scheduled against the _executorForAddShard. + // A "addShard" task upserts the shardIdentity document into the new shard. Such a task is + // scheduled: + // 1) on a config server's transition to primary for each shard in config.shards that is not + // marked as sharding aware + // 2) on a direct insert to the config.shards collection (usually from a 3.2 mongos). + // This map tracks that only one such task per shard can be running at a time. + std::map<ShardId, executor::TaskExecutor::CallbackHandle> _addShardHandles; + + // Protects the _addShardHandles map. + stdx::mutex _addShardHandlesMutex; }; } // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index 816ef6adff6..a8e80b67937 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -384,7 +384,7 @@ public: * Directly inserts a document in the specified namespace on the config server. The document * must have an _id index. Must only be used for insertions in the 'config' database. * - * NOTE: Should not be used in new code. Instead add a new metadata operation to the interface. + * NOTE: Should not be used in new code outside the ShardingCatalogManager. */ virtual Status insertConfigDocument(OperationContext* txn, const std::string& ns, @@ -403,7 +403,7 @@ public: * was upserted or it existed and any of the fields changed) and false otherwise (basically * returns whether the update command's response update.n value is > 0). * - * NOTE: Should not be used in new code. Instead add a new metadata operation to the interface. + * NOTE: Should not be used in new code outside the ShardingCatalogManager. */ virtual StatusWith<bool> updateConfigDocument(OperationContext* txn, const std::string& ns, @@ -416,7 +416,7 @@ public: * Removes documents matching a particular query predicate from the specified namespace on the * config server. Must only be used for deletions from the 'config' database. * - * NOTE: Should not be used in new code. Instead add a new metadata operation to the interface. + * NOTE: Should not be used in new code outside the ShardingCatalogManager. */ virtual Status removeConfigDocuments(OperationContext* txn, const std::string& ns, diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h index c830e03a362..73286475d9a 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager.h +++ b/src/mongo/s/catalog/sharding_catalog_manager.h @@ -31,11 +31,16 @@ #include <string> #include "mongo/base/disallow_copying.h" +#include "mongo/stdx/memory.h" namespace mongo { +class BSONObj; class ConnectionString; class OperationContext; +class RemoteCommandTargeter; +class ShardId; +class ShardType; class Status; template <typename T> class StatusWith; @@ -118,6 +123,23 @@ public: */ virtual Status initializeConfigDatabaseIfNeeded(OperationContext* txn) = 0; + /** + * For rolling upgrade and backwards compatibility with 3.2 mongos, schedules an asynchronous + * task against addShard executor to upsert a shardIdentity doc into the new shard described by + * shardType. On failure to upsert the doc on the shard, the task reschedules itself with a + * delay indefinitely, and is canceled only when a removeShard is called. + */ + virtual Status upsertShardIdentityOnShard(OperationContext* txn, ShardType shardType) = 0; + + /** + * Returns a BSON representation of an update request that can be used to insert a + * shardIdentity doc into the shard for the given shardType (or update the shard's existing + * shardIdentity doc's configsvrConnString if the _id, shardName, and clusterId do not + * conflict). + */ + virtual BSONObj createShardIdentityUpsertForAddShard(OperationContext* txn, + const std::string& shardName) = 0; + protected: ShardingCatalogManager() = default; }; diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp index 7ba2b08926f..906a4797025 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp @@ -74,4 +74,14 @@ Status ShardingCatalogManagerMock::initializeConfigDatabaseIfNeeded(OperationCon return {ErrorCodes::InternalError, "Method not implemented"}; } +Status ShardingCatalogManagerMock::upsertShardIdentityOnShard(OperationContext* txn, + ShardType shardType) { + return {ErrorCodes::InternalError, "Method not implemented"}; +} + +BSONObj ShardingCatalogManagerMock::createShardIdentityUpsertForAddShard( + OperationContext* txn, const std::string& shardName) { + MONGO_UNREACHABLE; +} + } // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h index 49057fc1bde..20bca3ec435 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h @@ -28,7 +28,10 @@ #pragma once +#include "mongo/bson/bsonobj.h" +#include "mongo/db/s/type_shard_identity.h" #include "mongo/s/catalog/sharding_catalog_manager.h" +#include "mongo/s/catalog/type_shard.h" namespace mongo { @@ -60,6 +63,11 @@ public: void appendConnectionStats(executor::ConnectionPoolStats* stats) override; Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override; + + Status upsertShardIdentityOnShard(OperationContext* txn, ShardType shardType) override; + + BSONObj createShardIdentityUpsertForAddShard(OperationContext* txn, + const std::string& shardName) override; }; } // namespace mongo diff --git a/src/mongo/s/catalog/type_shard.cpp b/src/mongo/s/catalog/type_shard.cpp index ca01ec8ec2f..0b97a3d3cd6 100644 --- a/src/mongo/s/catalog/type_shard.cpp +++ b/src/mongo/s/catalog/type_shard.cpp @@ -34,6 +34,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" +#include "mongo/s/grid.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" @@ -46,7 +47,7 @@ const BSONField<std::string> ShardType::host("host"); const BSONField<bool> ShardType::draining("draining"); const BSONField<long long> ShardType::maxSizeMB("maxSize"); const BSONField<BSONArray> ShardType::tags("tags"); - +const BSONField<ShardType::ShardState> ShardType::state("state"); StatusWith<ShardType> ShardType::fromBSON(const BSONObj& source) { ShardType shard; @@ -112,6 +113,27 @@ StatusWith<ShardType> ShardType::fromBSON(const BSONObj& source) { } } + { + long long shardState; + Status status = bsonExtractIntegerField(source, state.name(), &shardState); + if (status.isOK()) { + // Make sure the state field falls within the valid range of ShardState values. + if (!(shardState >= static_cast<std::underlying_type<ShardState>::type>( + ShardState::kNotShardAware) && + shardState <= static_cast<std::underlying_type<ShardState>::type>( + ShardState::kShardAware))) { + return Status(ErrorCodes::BadValue, + str::stream() << "Invalid shard state value: " << shardState); + } else { + shard._state = static_cast<ShardState>(shardState); + } + } else if (status == ErrorCodes::NoSuchKey) { + // state field can be mssing in which case it is presumed kNotShardAware + } else { + return status; + } + } + return shard; } @@ -146,6 +168,8 @@ BSONObj ShardType::toBSON() const { builder.append(maxSizeMB(), getMaxSizeMB()); if (_tags) builder.append(tags(), getTags()); + if (_state) + builder.append(state(), static_cast<std::underlying_type<ShardState>::type>(getState())); return builder.obj(); } @@ -176,5 +200,9 @@ void ShardType::setTags(const std::vector<std::string>& tags) { invariant(tags.size() > 0); _tags = tags; } +void ShardType::setState(const ShardState state) { + invariant(!_state.is_initialized()); + _state = state; +} } // namespace mongo diff --git a/src/mongo/s/catalog/type_shard.h b/src/mongo/s/catalog/type_shard.h index 9b30e1a9505..a86481c7fd0 100644 --- a/src/mongo/s/catalog/type_shard.h +++ b/src/mongo/s/catalog/type_shard.h @@ -33,6 +33,8 @@ #include <vector> #include "mongo/db/jsobj.h" +#include "mongo/s/shard_id.h" +#include "mongo/s/write_ops/batched_update_request.h" namespace mongo { @@ -49,6 +51,11 @@ class StatusWith; */ class ShardType { public: + enum class ShardState : int { + kNotShardAware = 0, + kShardAware, + }; + // Name of the shards collection in the config server. static const std::string ConfigNS; @@ -58,6 +65,7 @@ public: static const BSONField<bool> draining; static const BSONField<long long> maxSizeMB; static const BSONField<BSONArray> tags; + static const BSONField<ShardState> state; /** @@ -107,6 +115,11 @@ public: } void setTags(const std::vector<std::string>& tags); + const ShardState getState() const { + return _state.value_or(ShardState::kNotShardAware); + } + void setState(const ShardState state); + private: // Convention: (M)andatory, (O)ptional, (S)pecial rule. @@ -120,6 +133,8 @@ private: boost::optional<long long> _maxSizeMB; // (O) shard tags boost::optional<std::vector<std::string>> _tags; + // (O) shard state + boost::optional<ShardState> _state; }; } // namespace mongo |