diff options
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/SConscript | 11 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_add_shard_to_zone_test.cpp | 115 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp | 33 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp | 68 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager.h | 8 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_mock.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_mock.h | 5 | ||||
-rw-r--r-- | src/mongo/s/client/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/client/shard.cpp | 26 | ||||
-rw-r--r-- | src/mongo/s/client/shard.h | 9 | ||||
-rw-r--r-- | src/mongo/s/config_server_test_fixture.cpp | 34 | ||||
-rw-r--r-- | src/mongo/s/config_server_test_fixture.h | 11 |
14 files changed, 299 insertions, 34 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index c3c5178b2d9..af1722c76b2 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -128,7 +128,6 @@ env.Library( '$BUILD_DIR/mongo/s/serveronly', '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/util/net/message_port_mock', - 'sharding_egress_metadata_hook_for_mongos', ], LIBDEPS_TAGS=[ # Depends on coreshard, but that would be circular diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript index e8303bc5f14..3fee8b6cab1 100644 --- a/src/mongo/s/catalog/replset/SConscript +++ b/src/mongo/s/catalog/replset/SConscript @@ -104,6 +104,17 @@ env.Library( ) env.CppUnitTest( + target='config_server_catalog_test', + source=[ + 'sharding_catalog_add_shard_to_zone_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/repl/replmocks', + '$BUILD_DIR/mongo/s/config_server_test_fixture', + ] +) + +env.CppUnitTest( target='sharding_catalog_test', source=[ 'sharding_catalog_add_shard_test.cpp', diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_to_zone_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_to_zone_test.cpp new file mode 100644 index 00000000000..9c2257f85d6 --- /dev/null +++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_to_zone_test.cpp @@ -0,0 +1,115 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/read_preference.h" +#include "mongo/db/namespace_string.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/config_server_test_fixture.h" + +namespace mongo { +namespace { + + +ReadPreferenceSetting kReadPref(ReadPreference::PrimaryOnly); + +using AddShardToZoneTest = ConfigServerTestFixture; + +TEST_F(AddShardToZoneTest, AddSingleZoneToExistingShardShouldSucceed) { + ShardType shard; + shard.setName("a"); + shard.setHost("a:1234"); + + setupShards({shard}); + + ASSERT_OK(catalogManager()->addShardToZone(operationContext(), shard.getName(), "z")); + auto shardDocStatus = getShardDoc(operationContext(), shard.getName()); + ASSERT_OK(shardDocStatus.getStatus()); + + auto shardDoc = shardDocStatus.getValue(); + auto tags = shardDoc.getTags(); + ASSERT_EQ(1u, tags.size()); + ASSERT_EQ("z", tags.front()); +} + +TEST_F(AddShardToZoneTest, AddZoneToShardWithSameTagShouldSucceed) { + ShardType shard; + shard.setName("a"); + shard.setHost("a:1234"); + shard.setTags({"x", "y"}); + + setupShards({shard}); + + ASSERT_OK(catalogManager()->addShardToZone(operationContext(), shard.getName(), "x")); + + auto shardDocStatus = getShardDoc(operationContext(), shard.getName()); + ASSERT_OK(shardDocStatus.getStatus()); + + auto shardDoc = shardDocStatus.getValue(); + auto tags = shardDoc.getTags(); + ASSERT_EQ(2u, tags.size()); + ASSERT_EQ("x", tags.front()); + ASSERT_EQ("y", tags.back()); +} + +TEST_F(AddShardToZoneTest, AddZoneToShardWithNewTagShouldAppend) { + ShardType shard; + shard.setName("a"); + shard.setHost("a:1234"); + shard.setTags({"x"}); + + setupShards({shard}); + + ASSERT_OK(catalogManager()->addShardToZone(operationContext(), shard.getName(), "y")); + + auto shardDocStatus = getShardDoc(operationContext(), shard.getName()); + ASSERT_OK(shardDocStatus.getStatus()); + + auto shardDoc = shardDocStatus.getValue(); + auto tags = shardDoc.getTags(); + ASSERT_EQ(2u, tags.size()); + ASSERT_EQ("x", tags.front()); + ASSERT_EQ("y", tags.back()); +} + +TEST_F(AddShardToZoneTest, AddSingleZoneToNonExistingShardShouldFail) { + ShardType shard; + shard.setName("a"); + shard.setHost("a:1234"); + + setupShards({shard}); + + auto status = catalogManager()->addShardToZone(operationContext(), "b", "z"); + ASSERT_EQ(ErrorCodes::ShardNotFound, status); +} + +} // unnamed namespace +} // namespace mongo diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp index caf410e35a3..93bbd60b809 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp @@ -129,37 +129,6 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) { response->setOk(false); } -/** - * Takes the response from running a batch write command and writes the appropriate response into - * *batchResponse, while also returning the Status of the operation. - */ -Status _processBatchWriteResponse(StatusWith<Shard::CommandResponse> response, - BatchedCommandResponse* batchResponse) { - Status status(ErrorCodes::InternalError, "status not set"); - - if (!response.isOK()) { - status = response.getStatus(); - } else if (!response.getValue().commandStatus.isOK()) { - status = response.getValue().commandStatus; - } else if (!response.getValue().writeConcernStatus.isOK()) { - status = response.getValue().writeConcernStatus; - } else { - string errmsg; - if (!batchResponse->parseBSON(response.getValue().response, &errmsg)) { - status = Status(ErrorCodes::FailedToParse, - str::stream() << "Failed to parse config server response: " << errmsg); - } else { - status = batchResponse->toStatus(); - } - } - - if (!status.isOK()) { - toBatchError(status, batchResponse); - } - - return status; -} - } // namespace ShardingCatalogClientImpl::ShardingCatalogClientImpl( @@ -1319,7 +1288,7 @@ void ShardingCatalogClientImpl::_runBatchWriteCommand(OperationContext* txn, cmdObj, Shard::RetryPolicy::kNoRetry); // We're handling our own retries here. - Status status = _processBatchWriteResponse(response, batchResponse); + Status status = Shard::CommandResponse::processBatchWriteResponse(response, batchResponse); if (retry < kMaxWriteRetry && configShard->isRetriableError(status.code(), retryPolicy)) { batchResponse->clear(); LOG(1) << "Batch write command failed with retriable error and will be retried" diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index 8875b19ba8e..71845ffa7ee 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -42,6 +42,7 @@ #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/s/type_shard_identity.h" @@ -57,6 +58,7 @@ #include "mongo/s/set_shard_version_request.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" @@ -70,7 +72,6 @@ using str::stream; namespace { const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); - const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, // Note: Even though we're setting UNSET here, // kMajority implies JOURNAL if journaling is @@ -117,6 +118,29 @@ Status _processBatchWriteResponse(StatusWith<Shard::CommandResponse> response, return status; } + +const ResourceId kZoneOpResourceId(RESOURCE_METADATA, StringData("$configZonedSharding")); + +/** + * Lock for shard zoning operations. This should be acquired when doing any operations that + * can afffect the config.tags (or the tags field of the config.shards) collection. + */ +class ScopedZoneOpExclusiveLock { +public: + ScopedZoneOpExclusiveLock(OperationContext* txn) + : _transaction(txn, MODE_IX), + _globalIXLock(txn->lockState(), MODE_IX, UINT_MAX), + // Grab global lock recursively so locks will not be yielded. + _recursiveGlobalIXLock(txn->lockState(), MODE_IX, UINT_MAX), + _zoneLock(txn->lockState(), kZoneOpResourceId, MODE_X) {} + +private: + ScopedTransaction _transaction; + Lock::GlobalLock _globalIXLock; + Lock::GlobalLock _recursiveGlobalIXLock; + Lock::ResourceLock _zoneLock; +}; + } // namespace @@ -589,6 +613,48 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard( return shardType.getName(); } +Status ShardingCatalogManagerImpl::addShardToZone(OperationContext* txn, + const std::string& shardName, + const std::string& zoneName) { + ScopedZoneOpExclusiveLock scopedLock(txn); + + auto updateDoc = stdx::make_unique<BatchedUpdateDocument>(); + updateDoc->setQuery(BSON(ShardType::name(shardName))); + updateDoc->setUpdateExpr(BSON("$addToSet" << BSON(ShardType::tags() << zoneName))); + updateDoc->setUpsert(false); + updateDoc->setMulti(false); + + auto updateRequest = stdx::make_unique<BatchedUpdateRequest>(); + updateRequest->addToUpdates(updateDoc.release()); + + BatchedCommandRequest request(updateRequest.release()); + request.setNS(NamespaceString(ShardType::ConfigNS)); + request.setWriteConcern(kMajorityWriteConcern.toBSON()); + + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); + auto response = configShard->runCommand(txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "config", + request.toBSON(), + Shard::RetryPolicy::kNoRetry); + + BatchedCommandResponse batchResponse; + Status status = Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse); + + if (!status.isOK()) { + return status; + } + + invariant(batchResponse.isNSet()); + + if (batchResponse.getN() < 1) { + return {ErrorCodes::ShardNotFound, + str::stream() << "shard " << shardName << " does not exist"}; + } + + return Status::OK(); +} + void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) { _executorForAddShard->appendConnectionStats(stats); } diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h index 8d0a473d875..5784e969267 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h @@ -65,6 +65,10 @@ public: const ConnectionString& shardConnectionString, const long long maxSize) override; + Status addShardToZone(OperationContext* txn, + const std::string& shardName, + const std::string& zoneName) override; + void appendConnectionStats(executor::ConnectionPoolStats* stats) override; private: diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h index 2ac9413128f..35548fa5720 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager.h +++ b/src/mongo/s/catalog/sharding_catalog_manager.h @@ -92,6 +92,14 @@ public: const long long maxSize) = 0; /** + * Adds the shard to the zone. + * Returns ErrorCodes::ShardNotFound if the shard does not exist. + */ + virtual Status addShardToZone(OperationContext* txn, + const std::string& shardName, + const std::string& zoneName) = 0; + + /** * Append information about the connection pools owned by the CatalogManager. */ virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0; diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp index 1768a1a9925..ff841c4eae4 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp @@ -55,6 +55,13 @@ StatusWith<string> ShardingCatalogManagerMock::addShard( return {ErrorCodes::InternalError, "Method not implemented"}; } +Status ShardingCatalogManagerMock::addShardToZone(OperationContext* txn, + const std::string& shardName, + const std::string& zoneName) { + return {ErrorCodes::InternalError, "Method not implemented"}; +} + + void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {} } // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h index a56681e37fa..735637e9603 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h @@ -49,6 +49,11 @@ public: const ConnectionString& shardConnectionString, const long long maxSize) override; + Status addShardToZone(OperationContext* txn, + const std::string& shardName, + const std::string& zoneName) override; + + void appendConnectionStats(executor::ConnectionPoolStats* stats) override; }; diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript index b536b27c13e..ecc4f0894e9 100644 --- a/src/mongo/s/client/SConscript +++ b/src/mongo/s/client/SConscript @@ -79,6 +79,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/s/shard_id', + '$BUILD_DIR/mongo/s/write_ops/batch_write_types', ] ) diff --git a/src/mongo/s/client/shard.cpp b/src/mongo/s/client/shard.cpp index 98c7cdae768..a4619ecd7c4 100644 --- a/src/mongo/s/client/shard.cpp +++ b/src/mongo/s/client/shard.cpp @@ -31,11 +31,14 @@ #include "mongo/platform/basic.h" #include "mongo/s/client/shard.h" +#include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/log.h" namespace mongo { +using std::string; + namespace { const int kOnErrorNumRetries = 3; @@ -61,6 +64,29 @@ Status _getEffectiveCommandStatus(StatusWith<Shard::CommandResponse> cmdResponse } // namespace +Status Shard::CommandResponse::processBatchWriteResponse( + StatusWith<Shard::CommandResponse> response, BatchedCommandResponse* batchResponse) { + auto status = _getEffectiveCommandStatus(response); + if (status.isOK()) { + string errmsg; + if (!batchResponse->parseBSON(response.getValue().response, &errmsg)) { + status = Status(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse config server response: " << errmsg); + } else { + status = batchResponse->toStatus(); + } + } + + if (!status.isOK()) { + batchResponse->clear(); + batchResponse->setErrCode(status.code()); + batchResponse->setErrMessage(status.reason()); + batchResponse->setOk(false); + } + + return status; +} + Shard::Shard(const ShardId& id) : _id(id) {} const ShardId Shard::getId() const { diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h index cf57f4ef1ac..ec8882b5ebf 100644 --- a/src/mongo/s/client/shard.h +++ b/src/mongo/s/client/shard.h @@ -37,6 +37,7 @@ namespace mongo { +class BatchedCommandResponse; class OperationContext; class RemoteCommandTargeter; @@ -55,6 +56,14 @@ public: metadata(std::move(_metadata)), commandStatus(std::move(_commandStatus)), writeConcernStatus(std::move(_writeConcernStatus)) {} + + /** + * Takes the response from running a batch write command and writes the appropriate response + * into batchResponse, while also returning the Status of the operation. + */ + static Status processBatchWriteResponse(StatusWith<CommandResponse> response, + BatchedCommandResponse* batchResponse); + BSONObj response; BSONObj metadata; Status commandStatus; diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp index 63c29879d83..eef480d14c5 100644 --- a/src/mongo/s/config_server_test_fixture.cpp +++ b/src/mongo/s/config_server_test_fixture.cpp @@ -304,4 +304,38 @@ Status ConfigServerTestFixture::insertToConfigCollection(OperationContext* txn, return insertStatus.getStatus(); } +Status ConfigServerTestFixture::setupShards(const std::vector<ShardType>& shards) { + const NamespaceString shardNS(ShardType::ConfigNS); + for (const auto& shard : shards) { + auto insertStatus = insertToConfigCollection(operationContext(), shardNS, shard.toBSON()); + if (!insertStatus.isOK()) { + return insertStatus; + } + } + + return Status::OK(); +} + +StatusWith<ShardType> ConfigServerTestFixture::getShardDoc(OperationContext* txn, + const std::string& shardId) { + auto config = getConfigShard(); + invariant(config); + + NamespaceString ns(ShardType::ConfigNS); + auto findStatus = config->exhaustiveFindOnConfig( + txn, kReadPref, ns, BSON(ShardType::name(shardId)), BSONObj(), boost::none); + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + auto findResult = findStatus.getValue(); + if (findResult.docs.empty()) { + return {ErrorCodes::ShardNotFound, + str::stream() << "shard " << shardId << " does not exist"}; + } + + invariant(findResult.docs.size() == 1); + return ShardType::fromBSON(findResult.docs.front()); +} + } // namespace mongo diff --git a/src/mongo/s/config_server_test_fixture.h b/src/mongo/s/config_server_test_fixture.h index 9bee4bb5ea7..9ad3c2f0de2 100644 --- a/src/mongo/s/config_server_test_fixture.h +++ b/src/mongo/s/config_server_test_fixture.h @@ -124,6 +124,17 @@ public: void onFindWithMetadataCommand( executor::NetworkTestEnv::OnFindCommandWithMetadataFunction func); + /** + * Setup the config.shards collection to contain the given shards. + */ + Status setupShards(const std::vector<ShardType>& shards); + + /** + * Retrieves the shard document from the config server. + * Returns {ErrorCodes::ShardNotFound} if the given shard does not exists. + */ + StatusWith<ShardType> getShardDoc(OperationContext* txn, const std::string& shardId); + void setUp() override; void tearDown() override; |