summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/catalog/replset/SConscript11
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_add_shard_to_zone_test.cpp115
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp33
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp68
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h4
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager.h8
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.cpp7
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.h5
-rw-r--r--src/mongo/s/client/SConscript1
-rw-r--r--src/mongo/s/client/shard.cpp26
-rw-r--r--src/mongo/s/client/shard.h9
-rw-r--r--src/mongo/s/config_server_test_fixture.cpp34
-rw-r--r--src/mongo/s/config_server_test_fixture.h11
14 files changed, 299 insertions, 34 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index c3c5178b2d9..af1722c76b2 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -128,7 +128,6 @@ env.Library(
'$BUILD_DIR/mongo/s/serveronly',
'$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/util/net/message_port_mock',
- 'sharding_egress_metadata_hook_for_mongos',
],
LIBDEPS_TAGS=[
# Depends on coreshard, but that would be circular
diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript
index e8303bc5f14..3fee8b6cab1 100644
--- a/src/mongo/s/catalog/replset/SConscript
+++ b/src/mongo/s/catalog/replset/SConscript
@@ -104,6 +104,17 @@ env.Library(
)
env.CppUnitTest(
+ target='config_server_catalog_test',
+ source=[
+ 'sharding_catalog_add_shard_to_zone_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/repl/replmocks',
+ '$BUILD_DIR/mongo/s/config_server_test_fixture',
+ ]
+)
+
+env.CppUnitTest(
target='sharding_catalog_test',
source=[
'sharding_catalog_add_shard_test.cpp',
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_to_zone_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_to_zone_test.cpp
new file mode 100644
index 00000000000..9c2257f85d6
--- /dev/null
+++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_to_zone_test.cpp
@@ -0,0 +1,115 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/read_preference.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/config_server_test_fixture.h"
+
+namespace mongo {
+namespace {
+
+
+ReadPreferenceSetting kReadPref(ReadPreference::PrimaryOnly);
+
+using AddShardToZoneTest = ConfigServerTestFixture;
+
+TEST_F(AddShardToZoneTest, AddSingleZoneToExistingShardShouldSucceed) {
+ ShardType shard;
+ shard.setName("a");
+ shard.setHost("a:1234");
+
+ setupShards({shard});
+
+ ASSERT_OK(catalogManager()->addShardToZone(operationContext(), shard.getName(), "z"));
+ auto shardDocStatus = getShardDoc(operationContext(), shard.getName());
+ ASSERT_OK(shardDocStatus.getStatus());
+
+ auto shardDoc = shardDocStatus.getValue();
+ auto tags = shardDoc.getTags();
+ ASSERT_EQ(1u, tags.size());
+ ASSERT_EQ("z", tags.front());
+}
+
+TEST_F(AddShardToZoneTest, AddZoneToShardWithSameTagShouldSucceed) {
+ ShardType shard;
+ shard.setName("a");
+ shard.setHost("a:1234");
+ shard.setTags({"x", "y"});
+
+ setupShards({shard});
+
+ ASSERT_OK(catalogManager()->addShardToZone(operationContext(), shard.getName(), "x"));
+
+ auto shardDocStatus = getShardDoc(operationContext(), shard.getName());
+ ASSERT_OK(shardDocStatus.getStatus());
+
+ auto shardDoc = shardDocStatus.getValue();
+ auto tags = shardDoc.getTags();
+ ASSERT_EQ(2u, tags.size());
+ ASSERT_EQ("x", tags.front());
+ ASSERT_EQ("y", tags.back());
+}
+
+TEST_F(AddShardToZoneTest, AddZoneToShardWithNewTagShouldAppend) {
+ ShardType shard;
+ shard.setName("a");
+ shard.setHost("a:1234");
+ shard.setTags({"x"});
+
+ setupShards({shard});
+
+ ASSERT_OK(catalogManager()->addShardToZone(operationContext(), shard.getName(), "y"));
+
+ auto shardDocStatus = getShardDoc(operationContext(), shard.getName());
+ ASSERT_OK(shardDocStatus.getStatus());
+
+ auto shardDoc = shardDocStatus.getValue();
+ auto tags = shardDoc.getTags();
+ ASSERT_EQ(2u, tags.size());
+ ASSERT_EQ("x", tags.front());
+ ASSERT_EQ("y", tags.back());
+}
+
+TEST_F(AddShardToZoneTest, AddSingleZoneToNonExistingShardShouldFail) {
+ ShardType shard;
+ shard.setName("a");
+ shard.setHost("a:1234");
+
+ setupShards({shard});
+
+ auto status = catalogManager()->addShardToZone(operationContext(), "b", "z");
+ ASSERT_EQ(ErrorCodes::ShardNotFound, status);
+}
+
+} // unnamed namespace
+} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
index caf410e35a3..93bbd60b809 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
@@ -129,37 +129,6 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) {
response->setOk(false);
}
-/**
- * Takes the response from running a batch write command and writes the appropriate response into
- * *batchResponse, while also returning the Status of the operation.
- */
-Status _processBatchWriteResponse(StatusWith<Shard::CommandResponse> response,
- BatchedCommandResponse* batchResponse) {
- Status status(ErrorCodes::InternalError, "status not set");
-
- if (!response.isOK()) {
- status = response.getStatus();
- } else if (!response.getValue().commandStatus.isOK()) {
- status = response.getValue().commandStatus;
- } else if (!response.getValue().writeConcernStatus.isOK()) {
- status = response.getValue().writeConcernStatus;
- } else {
- string errmsg;
- if (!batchResponse->parseBSON(response.getValue().response, &errmsg)) {
- status = Status(ErrorCodes::FailedToParse,
- str::stream() << "Failed to parse config server response: " << errmsg);
- } else {
- status = batchResponse->toStatus();
- }
- }
-
- if (!status.isOK()) {
- toBatchError(status, batchResponse);
- }
-
- return status;
-}
-
} // namespace
ShardingCatalogClientImpl::ShardingCatalogClientImpl(
@@ -1319,7 +1288,7 @@ void ShardingCatalogClientImpl::_runBatchWriteCommand(OperationContext* txn,
cmdObj,
Shard::RetryPolicy::kNoRetry); // We're handling our own retries here.
- Status status = _processBatchWriteResponse(response, batchResponse);
+ Status status = Shard::CommandResponse::processBatchWriteResponse(response, batchResponse);
if (retry < kMaxWriteRetry && configShard->isRetriableError(status.code(), retryPolicy)) {
batchResponse->clear();
LOG(1) << "Batch write command failed with retriable error and will be retried"
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index 8875b19ba8e..71845ffa7ee 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -42,6 +42,7 @@
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/s/type_shard_identity.h"
@@ -57,6 +58,7 @@
#include "mongo/s/set_shard_version_request.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/hostandport.h"
@@ -70,7 +72,6 @@ using str::stream;
namespace {
const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
-
const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
// Note: Even though we're setting UNSET here,
// kMajority implies JOURNAL if journaling is
@@ -117,6 +118,29 @@ Status _processBatchWriteResponse(StatusWith<Shard::CommandResponse> response,
return status;
}
+
+const ResourceId kZoneOpResourceId(RESOURCE_METADATA, StringData("$configZonedSharding"));
+
+/**
+ * Lock for shard zoning operations. This should be acquired when doing any operations that
+ * can afffect the config.tags (or the tags field of the config.shards) collection.
+ */
+class ScopedZoneOpExclusiveLock {
+public:
+ ScopedZoneOpExclusiveLock(OperationContext* txn)
+ : _transaction(txn, MODE_IX),
+ _globalIXLock(txn->lockState(), MODE_IX, UINT_MAX),
+ // Grab global lock recursively so locks will not be yielded.
+ _recursiveGlobalIXLock(txn->lockState(), MODE_IX, UINT_MAX),
+ _zoneLock(txn->lockState(), kZoneOpResourceId, MODE_X) {}
+
+private:
+ ScopedTransaction _transaction;
+ Lock::GlobalLock _globalIXLock;
+ Lock::GlobalLock _recursiveGlobalIXLock;
+ Lock::ResourceLock _zoneLock;
+};
+
} // namespace
@@ -589,6 +613,48 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard(
return shardType.getName();
}
+Status ShardingCatalogManagerImpl::addShardToZone(OperationContext* txn,
+ const std::string& shardName,
+ const std::string& zoneName) {
+ ScopedZoneOpExclusiveLock scopedLock(txn);
+
+ auto updateDoc = stdx::make_unique<BatchedUpdateDocument>();
+ updateDoc->setQuery(BSON(ShardType::name(shardName)));
+ updateDoc->setUpdateExpr(BSON("$addToSet" << BSON(ShardType::tags() << zoneName)));
+ updateDoc->setUpsert(false);
+ updateDoc->setMulti(false);
+
+ auto updateRequest = stdx::make_unique<BatchedUpdateRequest>();
+ updateRequest->addToUpdates(updateDoc.release());
+
+ BatchedCommandRequest request(updateRequest.release());
+ request.setNS(NamespaceString(ShardType::ConfigNS));
+ request.setWriteConcern(kMajorityWriteConcern.toBSON());
+
+ auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard();
+ auto response = configShard->runCommand(txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "config",
+ request.toBSON(),
+ Shard::RetryPolicy::kNoRetry);
+
+ BatchedCommandResponse batchResponse;
+ Status status = Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse);
+
+ if (!status.isOK()) {
+ return status;
+ }
+
+ invariant(batchResponse.isNSet());
+
+ if (batchResponse.getN() < 1) {
+ return {ErrorCodes::ShardNotFound,
+ str::stream() << "shard " << shardName << " does not exist"};
+ }
+
+ return Status::OK();
+}
+
void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) {
_executorForAddShard->appendConnectionStats(stats);
}
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
index 8d0a473d875..5784e969267 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
@@ -65,6 +65,10 @@ public:
const ConnectionString& shardConnectionString,
const long long maxSize) override;
+ Status addShardToZone(OperationContext* txn,
+ const std::string& shardName,
+ const std::string& zoneName) override;
+
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
private:
diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h
index 2ac9413128f..35548fa5720 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager.h
@@ -92,6 +92,14 @@ public:
const long long maxSize) = 0;
/**
+ * Adds the shard to the zone.
+ * Returns ErrorCodes::ShardNotFound if the shard does not exist.
+ */
+ virtual Status addShardToZone(OperationContext* txn,
+ const std::string& shardName,
+ const std::string& zoneName) = 0;
+
+ /**
* Append information about the connection pools owned by the CatalogManager.
*/
virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0;
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
index 1768a1a9925..ff841c4eae4 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
@@ -55,6 +55,13 @@ StatusWith<string> ShardingCatalogManagerMock::addShard(
return {ErrorCodes::InternalError, "Method not implemented"};
}
+Status ShardingCatalogManagerMock::addShardToZone(OperationContext* txn,
+ const std::string& shardName,
+ const std::string& zoneName) {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
+
void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {}
} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
index a56681e37fa..735637e9603 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
@@ -49,6 +49,11 @@ public:
const ConnectionString& shardConnectionString,
const long long maxSize) override;
+ Status addShardToZone(OperationContext* txn,
+ const std::string& shardName,
+ const std::string& zoneName) override;
+
+
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
};
diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript
index b536b27c13e..ecc4f0894e9 100644
--- a/src/mongo/s/client/SConscript
+++ b/src/mongo/s/client/SConscript
@@ -79,6 +79,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/s/shard_id',
+ '$BUILD_DIR/mongo/s/write_ops/batch_write_types',
]
)
diff --git a/src/mongo/s/client/shard.cpp b/src/mongo/s/client/shard.cpp
index 98c7cdae768..a4619ecd7c4 100644
--- a/src/mongo/s/client/shard.cpp
+++ b/src/mongo/s/client/shard.cpp
@@ -31,11 +31,14 @@
#include "mongo/platform/basic.h"
#include "mongo/s/client/shard.h"
+#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/log.h"
namespace mongo {
+using std::string;
+
namespace {
const int kOnErrorNumRetries = 3;
@@ -61,6 +64,29 @@ Status _getEffectiveCommandStatus(StatusWith<Shard::CommandResponse> cmdResponse
} // namespace
+Status Shard::CommandResponse::processBatchWriteResponse(
+ StatusWith<Shard::CommandResponse> response, BatchedCommandResponse* batchResponse) {
+ auto status = _getEffectiveCommandStatus(response);
+ if (status.isOK()) {
+ string errmsg;
+ if (!batchResponse->parseBSON(response.getValue().response, &errmsg)) {
+ status = Status(ErrorCodes::FailedToParse,
+ str::stream() << "Failed to parse config server response: " << errmsg);
+ } else {
+ status = batchResponse->toStatus();
+ }
+ }
+
+ if (!status.isOK()) {
+ batchResponse->clear();
+ batchResponse->setErrCode(status.code());
+ batchResponse->setErrMessage(status.reason());
+ batchResponse->setOk(false);
+ }
+
+ return status;
+}
+
Shard::Shard(const ShardId& id) : _id(id) {}
const ShardId Shard::getId() const {
diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h
index cf57f4ef1ac..ec8882b5ebf 100644
--- a/src/mongo/s/client/shard.h
+++ b/src/mongo/s/client/shard.h
@@ -37,6 +37,7 @@
namespace mongo {
+class BatchedCommandResponse;
class OperationContext;
class RemoteCommandTargeter;
@@ -55,6 +56,14 @@ public:
metadata(std::move(_metadata)),
commandStatus(std::move(_commandStatus)),
writeConcernStatus(std::move(_writeConcernStatus)) {}
+
+ /**
+ * Takes the response from running a batch write command and writes the appropriate response
+ * into batchResponse, while also returning the Status of the operation.
+ */
+ static Status processBatchWriteResponse(StatusWith<CommandResponse> response,
+ BatchedCommandResponse* batchResponse);
+
BSONObj response;
BSONObj metadata;
Status commandStatus;
diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp
index 63c29879d83..eef480d14c5 100644
--- a/src/mongo/s/config_server_test_fixture.cpp
+++ b/src/mongo/s/config_server_test_fixture.cpp
@@ -304,4 +304,38 @@ Status ConfigServerTestFixture::insertToConfigCollection(OperationContext* txn,
return insertStatus.getStatus();
}
+Status ConfigServerTestFixture::setupShards(const std::vector<ShardType>& shards) {
+ const NamespaceString shardNS(ShardType::ConfigNS);
+ for (const auto& shard : shards) {
+ auto insertStatus = insertToConfigCollection(operationContext(), shardNS, shard.toBSON());
+ if (!insertStatus.isOK()) {
+ return insertStatus;
+ }
+ }
+
+ return Status::OK();
+}
+
+StatusWith<ShardType> ConfigServerTestFixture::getShardDoc(OperationContext* txn,
+ const std::string& shardId) {
+ auto config = getConfigShard();
+ invariant(config);
+
+ NamespaceString ns(ShardType::ConfigNS);
+ auto findStatus = config->exhaustiveFindOnConfig(
+ txn, kReadPref, ns, BSON(ShardType::name(shardId)), BSONObj(), boost::none);
+ if (!findStatus.isOK()) {
+ return findStatus.getStatus();
+ }
+
+ auto findResult = findStatus.getValue();
+ if (findResult.docs.empty()) {
+ return {ErrorCodes::ShardNotFound,
+ str::stream() << "shard " << shardId << " does not exist"};
+ }
+
+ invariant(findResult.docs.size() == 1);
+ return ShardType::fromBSON(findResult.docs.front());
+}
+
} // namespace mongo
diff --git a/src/mongo/s/config_server_test_fixture.h b/src/mongo/s/config_server_test_fixture.h
index 9bee4bb5ea7..9ad3c2f0de2 100644
--- a/src/mongo/s/config_server_test_fixture.h
+++ b/src/mongo/s/config_server_test_fixture.h
@@ -124,6 +124,17 @@ public:
void onFindWithMetadataCommand(
executor::NetworkTestEnv::OnFindCommandWithMetadataFunction func);
+ /**
+ * Setup the config.shards collection to contain the given shards.
+ */
+ Status setupShards(const std::vector<ShardType>& shards);
+
+ /**
+ * Retrieves the shard document from the config server.
+ * Returns {ErrorCodes::ShardNotFound} if the given shard does not exists.
+ */
+ StatusWith<ShardType> getShardDoc(OperationContext* txn, const std::string& shardId);
+
void setUp() override;
void tearDown() override;