diff options
author | jannaerin <golden.janna@gmail.com> | 2018-10-25 17:47:55 -0400 |
---|---|---|
committer | jannaerin <golden.janna@gmail.com> | 2018-11-01 10:34:10 -0400 |
commit | c06cea15dcc13c7e8777be5da229b1423ae7465b (patch) | |
tree | e098709d750a73988202ea4b0713c7472b5ef5cd | |
parent | 829b78b679188e672e813ce9d5b03334ae781d1c (diff) | |
download | mongo-c06cea15dcc13c7e8777be5da229b1423ae7465b.tar.gz |
SERVER-37354 Make _shardsvrShardCollection re-entrant
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/active_shard_collection_registry.cpp | 179 | ||||
-rw-r--r-- | src/mongo/db/s/active_shard_collection_registry.h | 169 | ||||
-rw-r--r-- | src/mongo/db/s/active_shard_collection_registry_test.cpp | 190 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_shard_collection_command.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_shard_collection.cpp | 188 | ||||
-rw-r--r-- | src/mongo/s/request_types/shard_collection.idl | 2 |
7 files changed, 656 insertions, 92 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 38277dd005f..b91d82a2691 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -32,6 +32,7 @@ env.Library( source=[ 'active_migrations_registry.cpp', 'active_move_primaries_registry.cpp', + 'active_shard_collection_registry.cpp', 'chunk_move_write_concern_options.cpp', 'chunk_splitter.cpp', 'collection_range_deleter.cpp', @@ -331,6 +332,7 @@ env.CppUnitTest( source=[ 'active_migrations_registry_test.cpp', 'active_move_primaries_registry_test.cpp', + 'active_shard_collection_registry_test.cpp', 'catalog_cache_loader_mock.cpp', 'implicit_create_collection_test.cpp', 'migration_chunk_cloner_source_legacy_test.cpp', diff --git a/src/mongo/db/s/active_shard_collection_registry.cpp b/src/mongo/db/s/active_shard_collection_registry.cpp new file mode 100644 index 00000000000..9a168491262 --- /dev/null +++ b/src/mongo/db/s/active_shard_collection_registry.cpp @@ -0,0 +1,179 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/active_shard_collection_registry.h" + +#include "mongo/db/catalog_raii.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/service_context.h" + +namespace mongo { +namespace { + +const auto getRegistry = ServiceContext::declareDecoration<ActiveShardCollectionRegistry>(); + +bool ActiveShardsvrShardCollectionEqualsNewRequest(const ShardsvrShardCollection& activeRequest, + const ShardsvrShardCollection& newRequest) { + if (activeRequest.get_shardsvrShardCollection().get() != + newRequest.get_shardsvrShardCollection().get()) + return false; + if (activeRequest.getKey().woCompare(newRequest.getKey()) != 0) + return false; + if (activeRequest.getUnique() != newRequest.getUnique()) + return false; + if (activeRequest.getNumInitialChunks() != newRequest.getNumInitialChunks()) + return false; + if ((activeRequest.getCollation() && newRequest.getCollation()) && + (activeRequest.getCollation().get().woCompare(newRequest.getCollation().get()) != 0)) + return false; + if (activeRequest.getGetUUIDfromPrimaryShard() != newRequest.getGetUUIDfromPrimaryShard()) + return false; + + if (activeRequest.getInitialSplitPoints() && newRequest.getInitialSplitPoints()) { + if (activeRequest.getInitialSplitPoints().get().size() != + newRequest.getInitialSplitPoints().get().size()) { + return false; + } else { + for (std::size_t i = 0; i < activeRequest.getInitialSplitPoints().get().size(); i++) { + if (activeRequest.getInitialSplitPoints().get()[i].woCompare( + newRequest.getInitialSplitPoints().get()[i]) != 0) + return false; + } + } + } + + return true; +} + +} // namespace + +ActiveShardCollectionRegistry::ActiveShardCollectionRegistry() = default; + +ActiveShardCollectionRegistry::~ActiveShardCollectionRegistry() { + invariant(_activeShardCollectionMap.empty()); +} + +ActiveShardCollectionRegistry& ActiveShardCollectionRegistry::get(ServiceContext* service) { + return getRegistry(service); +} + +ActiveShardCollectionRegistry& ActiveShardCollectionRegistry::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +StatusWith<ScopedShardCollection> ActiveShardCollectionRegistry::registerShardCollection( + const ShardsvrShardCollection& request) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + std::string nss = request.get_shardsvrShardCollection().get().ns(); + + auto iter = _activeShardCollectionMap.find(nss); + if (iter == _activeShardCollectionMap.end()) { + auto activeShardCollectionState = ActiveShardCollectionState(request); + _activeShardCollectionMap.try_emplace(nss, activeShardCollectionState); + + return {ScopedShardCollection(nss, this, true, activeShardCollectionState.notification)}; + } else { + auto activeShardCollectionState = iter->second; + + if (ActiveShardsvrShardCollectionEqualsNewRequest(activeShardCollectionState.activeRequest, + request)) { + return {ScopedShardCollection( + nss, nullptr, false, activeShardCollectionState.notification)}; + } + return activeShardCollectionState.constructErrorStatus(request); + } +} + +void ActiveShardCollectionRegistry::_clearShardCollection(std::string nss) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto iter = _activeShardCollectionMap.find(nss); + invariant(iter != _activeShardCollectionMap.end()); + _activeShardCollectionMap.erase(nss); +} + +Status ActiveShardCollectionRegistry::ActiveShardCollectionState::constructErrorStatus( + const ShardsvrShardCollection& request) const { + return {ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unable to shard collection " + << request.get_shardsvrShardCollection().get().ns() + << " with arguments: " + << request.toBSON() + << " because this shard is currently running shard collection on this " + << "collection with arguments: " + << activeRequest.toBSON()}; +} + +ScopedShardCollection::ScopedShardCollection( + std::string nss, + ActiveShardCollectionRegistry* registry, + bool shouldExecute, + std::shared_ptr<Notification<Status>> completionNotification) + : _nss(nss), + _registry(registry), + _shouldExecute(shouldExecute), + _completionNotification(std::move(completionNotification)) {} + +ScopedShardCollection::~ScopedShardCollection() { + if (_registry && _shouldExecute) { + // If this is a newly started shard collection the caller must always signal on completion + invariant(*_completionNotification); + _registry->_clearShardCollection(_nss); + } +} + +ScopedShardCollection::ScopedShardCollection(ScopedShardCollection&& other) { + *this = std::move(other); +} + +ScopedShardCollection& ScopedShardCollection::operator=(ScopedShardCollection&& other) { + if (&other != this) { + _registry = other._registry; + other._registry = nullptr; + _shouldExecute = other._shouldExecute; + _completionNotification = std::move(other._completionNotification); + _nss = std::move(other._nss); + } + + return *this; +} + +void ScopedShardCollection::signalComplete(Status status) { + invariant(_shouldExecute); + _completionNotification->set(status); +} + +Status ScopedShardCollection::waitForCompletion(OperationContext* opCtx) { + invariant(!_shouldExecute); + return _completionNotification->get(opCtx); +} + +} // namespace mongo diff --git a/src/mongo/db/s/active_shard_collection_registry.h b/src/mongo/db/s/active_shard_collection_registry.h new file mode 100644 index 00000000000..216f735c1c5 --- /dev/null +++ b/src/mongo/db/s/active_shard_collection_registry.h @@ -0,0 +1,169 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> + +#include "mongo/base/disallow_copying.h" +#include "mongo/s/request_types/shard_collection_gen.h" +#include "mongo/stdx/memory.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/notification.h" + +namespace mongo { + +class OperationContext; +class ScopedShardCollection; +template <typename T> +class StatusWith; + +/** + * Thread-safe object that keeps track of any active shardCollection commands running. There is only + * one instance of this object per shard. + */ +class ActiveShardCollectionRegistry { + MONGO_DISALLOW_COPYING(ActiveShardCollectionRegistry); + +public: + ActiveShardCollectionRegistry(); + ~ActiveShardCollectionRegistry(); + + static ActiveShardCollectionRegistry& get(ServiceContext* service); + static ActiveShardCollectionRegistry& get(OperationContext* opCtx); + + /** + * If the collection being requested to shard is not already being sharded on this shard, + * registers an active shardCollection with the specified arguments. Returns a + * ScopedShardCollection object, which must be signaled by the caller before it goes out of + * scope. + * + * If this collection is already being sharded on this shard and it has the exact same + * arguments, returns a ScopedShardCollection. The ScopedShardCollection can be used to join the + * already running shard collection. + * + * Otherwise returns a ConflictingOperationInProgress error. + */ + StatusWith<ScopedShardCollection> registerShardCollection( + const ShardsvrShardCollection& request); + +private: + friend class ScopedShardCollection; + + // Describes the state of a currently active shardCollection operation + struct ActiveShardCollectionState { + ActiveShardCollectionState(ShardsvrShardCollection inRequest) + : activeRequest(std::move(inRequest)), + notification(std::make_shared<Notification<Status>>()) {} + + /** + * Constructs an error status to return in the case of conflicting operations. + */ + Status constructErrorStatus(const ShardsvrShardCollection& request) const; + + // Exact arguments of the currently active operation + ShardsvrShardCollection activeRequest; + + // Notification event that will be signaled when the currently active operation completes + std::shared_ptr<Notification<Status>> notification; + }; + + /** + * Unregisters a previously registered namespace with an ongoing shardCollection. Must only be + * called if a previous call to registerShardCollection has succeeded. + */ + void _clearShardCollection(std::string nss); + + // Protects the state below + stdx::mutex _mutex; + + // Map containing any collections currently being sharded + StringMap<ActiveShardCollectionState> _activeShardCollectionMap; +}; + +/** + * Object of this class is returned from the registerShardCollection call of the active shard + * collection registry. It can exist in two modes - 'execute' and 'join'. See the comments for + * registerShardCollection method for more details. + */ +class ScopedShardCollection { + MONGO_DISALLOW_COPYING(ScopedShardCollection); + +public: + ScopedShardCollection(std::string nss, + ActiveShardCollectionRegistry* registry, + bool shouldExecute, + std::shared_ptr<Notification<Status>> completionNotification); + ~ScopedShardCollection(); + + ScopedShardCollection(ScopedShardCollection&&); + ScopedShardCollection& operator=(ScopedShardCollection&&); + + /** + * Returns true if the shardCollection object is in the 'execute' mode. This means that the + * caller can execute the shardCollection command. The holder must execute the command and call + * signalComplete with a status. + */ + bool mustExecute() const { + return _shouldExecute; + } + + /** + * Must only be called if the object is in the 'execute' mode when the shardCollection command + * was invoked (the command immediately executed). Signals any callers that might be blocked in + * waitForCompletion. + */ + void signalComplete(Status status); + + /** + * Must only be called if the object is in the 'join' mode. Blocks until the main executor of + * the shardCollection command calls signalComplete. + */ + Status waitForCompletion(OperationContext* opCtx); + +private: + // Namespace of collection being sharded + std::string _nss; + + // Registry from which to unregister the migration. Not owned. + ActiveShardCollectionRegistry* _registry; + + /** + * Whether the holder is the first in line for a newly started shardCollection (in which case + * the destructor must unregister) or the caller is joining on an already-running + * shardCollection (in which case the caller must block and wait for completion). + */ + bool _shouldExecute; + + // This is the future, which will be signaled at the end of shardCollection + std::shared_ptr<Notification<Status>> _completionNotification; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/active_shard_collection_registry_test.cpp b/src/mongo/db/s/active_shard_collection_registry_test.cpp new file mode 100644 index 00000000000..e71aaaa1a64 --- /dev/null +++ b/src/mongo/db/s/active_shard_collection_registry_test.cpp @@ -0,0 +1,190 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding +#include "mongo/platform/basic.h" + +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/client.h" +#include "mongo/db/s/active_shard_collection_registry.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/s/request_types/shard_collection_gen.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +using unittest::assertGet; + +class ShardCollectionRegistrationTest : public ServiceContextMongoDTest { +protected: + ActiveShardCollectionRegistry _registry; +}; + +ShardsvrShardCollection createShardsvrShardCollectionRequest( + const NamespaceString& nss, + BSONObj key, + bool unique, + int numInitialChunks, + boost::optional<std::vector<mongo::BSONObj>> initialSplitPoints, + boost::optional<mongo::BSONObj> collation, + bool UUIDfromPrimaryShard) { + ShardsvrShardCollection shardsvrShardCollectionRequest; + shardsvrShardCollectionRequest.set_shardsvrShardCollection(nss); + shardsvrShardCollectionRequest.setKey(key); + shardsvrShardCollectionRequest.setUnique(unique); + shardsvrShardCollectionRequest.setNumInitialChunks(numInitialChunks); + shardsvrShardCollectionRequest.setInitialSplitPoints(initialSplitPoints); + shardsvrShardCollectionRequest.setCollation(collation); + shardsvrShardCollectionRequest.setGetUUIDfromPrimaryShard(UUIDfromPrimaryShard); + + return shardsvrShardCollectionRequest; +} + +TEST_F(ShardCollectionRegistrationTest, ScopedShardCollectionConstructorAndAssignment) { + auto shardsvrShardCollectionRequest = + createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"), + BSON("x" + << "hashed"), + false, + 1, + boost::none, + boost::none, + false); + auto originalScopedShardCollection = + assertGet(_registry.registerShardCollection(shardsvrShardCollectionRequest)); + ASSERT(originalScopedShardCollection.mustExecute()); + + ScopedShardCollection movedScopedShardCollection(std::move(originalScopedShardCollection)); + ASSERT(movedScopedShardCollection.mustExecute()); + + originalScopedShardCollection = std::move(movedScopedShardCollection); + ASSERT(originalScopedShardCollection.mustExecute()); + + // Need to signal the registered shard collection so the destructor doesn't invariant + originalScopedShardCollection.signalComplete(Status::OK()); +} + +TEST_F(ShardCollectionRegistrationTest, + SecondShardCollectionWithDifferentOptionsReturnsConflictingOperationInProgress) { + auto firstShardsvrShardCollectionRequest = + createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"), + BSON("x" + << "hashed"), + false, + 1, + boost::none, + boost::none, + false); + auto originalScopedShardCollection = + assertGet(_registry.registerShardCollection(firstShardsvrShardCollectionRequest)); + + auto secondShardsvrShardCollectionRequest = + createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"), + BSON("x" << 0), + false, + 1, + boost::none, + boost::none, + false); + auto secondScopedShardCollection = + _registry.registerShardCollection(secondShardsvrShardCollectionRequest); + ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, secondScopedShardCollection.getStatus()); + + originalScopedShardCollection.signalComplete(Status::OK()); +} + +TEST_F(ShardCollectionRegistrationTest, SecondShardCollectionWithSameOptionsJoinsFirst) { + auto firstShardsvrShardCollectionRequest = + createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"), + BSON("x" + << "hashed"), + false, + 1, + boost::none, + boost::none, + false); + auto originalScopedShardCollection = + assertGet(_registry.registerShardCollection(firstShardsvrShardCollectionRequest)); + ASSERT(originalScopedShardCollection.mustExecute()); + + auto secondShardsvrShardCollectionRequest = + createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"), + BSON("x" + << "hashed"), + false, + 1, + boost::none, + boost::none, + false); + auto secondScopedShardCollection = + assertGet(_registry.registerShardCollection(secondShardsvrShardCollectionRequest)); + ASSERT(!secondScopedShardCollection.mustExecute()); + + originalScopedShardCollection.signalComplete({ErrorCodes::InternalError, "Test error"}); + auto opCtx = makeOperationContext(); + ASSERT_EQ(Status(ErrorCodes::InternalError, "Test error"), + secondScopedShardCollection.waitForCompletion(opCtx.get())); +} + +TEST_F(ShardCollectionRegistrationTest, TwoShardCollectionsOnDifferentCollectionsAllowed) { + auto firstShardsvrShardCollectionRequest = + createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"), + BSON("x" + << "hashed"), + false, + 1, + boost::none, + boost::none, + false); + auto originalScopedShardCollection = + assertGet(_registry.registerShardCollection(firstShardsvrShardCollectionRequest)); + ASSERT(originalScopedShardCollection.mustExecute()); + + auto secondShardsvrShardCollectionRequest = + createShardsvrShardCollectionRequest(NamespaceString("TestDB2", "TestColl2"), + BSON("x" + << "hashed"), + false, + 1, + boost::none, + boost::none, + false); + auto secondScopedShardCollection = + assertGet(_registry.registerShardCollection(secondShardsvrShardCollectionRequest)); + ASSERT(secondScopedShardCollection.mustExecute()); + + originalScopedShardCollection.signalComplete(Status::OK()); + secondScopedShardCollection.signalComplete(Status::OK()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp index f618351fe51..ad655601c9e 100644 --- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp @@ -769,17 +769,13 @@ public: shardsvrShardCollectionRequest.setGetUUIDfromPrimaryShard( request.getGetUUIDfromPrimaryShard()); - // TODO(SERVER-37354): Remove the 'runWithoutInterruption' block. - auto cmdResponse = opCtx->runWithoutInterruption([&] { - return uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - "admin", - CommandHelpers::appendMajorityWriteConcern(CommandHelpers::appendPassthroughFields( - cmdObj, shardsvrShardCollectionRequest.toBSON())), - Shard::RetryPolicy::kIdempotent)); - }); - + auto cmdResponse = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + "admin", + CommandHelpers::appendMajorityWriteConcern(CommandHelpers::appendPassthroughFields( + cmdObj, shardsvrShardCollectionRequest.toBSON())), + Shard::RetryPolicy::kIdempotent)); if (cmdResponse.commandStatus != ErrorCodes::CommandNotFound) { uassertStatusOK(cmdResponse.commandStatus); diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp index bb6f1183557..a87de2ff318 100644 --- a/src/mongo/db/s/shardsvr_shard_collection.cpp +++ b/src/mongo/db/s/shardsvr_shard_collection.cpp @@ -44,6 +44,7 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/s/active_shard_collection_registry.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/config/initial_split_policy.h" #include "mongo/db/s/config/sharding_catalog_manager.h" @@ -629,90 +630,117 @@ public: IDLParserErrorContext("_shardsvrShardCollection"), cmdObj); const NamespaceString nss(parseNs(dbname, cmdObj)); - // Take the collection critical section so that no writes can happen. - CollectionCriticalSection critSec(opCtx, nss); + auto scopedShardCollection = uassertStatusOK( + ActiveShardCollectionRegistry::get(opCtx).registerShardCollection(request)); + Status status = {ErrorCodes::InternalError, "Uninitialized value"}; - auto proposedKey(request.getKey().getOwned()); - ShardKeyPattern shardKeyPattern(proposedKey); - - createCollectionOrValidateExisting(opCtx, nss, proposedKey, shardKeyPattern, request); - - // Read zone info - auto tags = getExistingTags(opCtx, nss); - - if (!tags.empty()) { - validateShardKeyAgainstExistingZones(opCtx, proposedKey, shardKeyPattern, tags); - } - - boost::optional<UUID> uuid; - if (request.getGetUUIDfromPrimaryShard()) { - uuid = getUUIDFromPrimaryShard(opCtx, nss); - } else { - uuid = UUID::gen(); - } - - auto shardRegistry = Grid::get(opCtx)->shardRegistry(); - shardRegistry->reload(opCtx); - - DBDirectClient localClient(opCtx); - bool isEmpty = (localClient.count(nss.ns()) == 0); - - std::vector<ShardId> shardIds; - shardRegistry->getAllShardIds(opCtx, &shardIds); - const int numShards = shardIds.size(); - - std::vector<BSONObj> initialSplitPoints; - std::vector<BSONObj> finalSplitPoints; - - if (request.getInitialSplitPoints()) { - finalSplitPoints = std::move(*request.getInitialSplitPoints()); - } else if (!tags.empty()) { - // no need to find split points since we will create chunks based on - // the existing zones - uassert(ErrorCodes::InvalidOptions, - str::stream() << "found existing zones but the collection is not empty", - isEmpty); + // Check if this collection is currently being sharded and if so, join it + if (!scopedShardCollection.mustExecute()) { + status = scopedShardCollection.waitForCompletion(opCtx); } else { - InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( - shardKeyPattern, - isEmpty, - numShards, - request.getNumInitialChunks(), - &initialSplitPoints, - &finalSplitPoints); - } - - result << "collectionsharded" << nss.ns(); - if (uuid) { - result << "collectionUUID" << *uuid; + try { + // Take the collection critical section so that no writes can happen. + CollectionCriticalSection critSec(opCtx, nss); + + auto proposedKey(request.getKey().getOwned()); + ShardKeyPattern shardKeyPattern(proposedKey); + + createCollectionOrValidateExisting( + opCtx, nss, proposedKey, shardKeyPattern, request); + + // Read zone info + auto tags = getExistingTags(opCtx, nss); + + if (!tags.empty()) { + validateShardKeyAgainstExistingZones(opCtx, proposedKey, shardKeyPattern, tags); + } + + boost::optional<UUID> uuid; + if (request.getGetUUIDfromPrimaryShard()) { + uuid = getUUIDFromPrimaryShard(opCtx, nss); + } else { + uuid = UUID::gen(); + } + + auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + shardRegistry->reload(opCtx); + + DBDirectClient localClient(opCtx); + bool isEmpty = (localClient.count(nss.ns()) == 0); + + std::vector<ShardId> shardIds; + shardRegistry->getAllShardIds(opCtx, &shardIds); + const int numShards = shardIds.size(); + + std::vector<BSONObj> initialSplitPoints; + std::vector<BSONObj> finalSplitPoints; + + if (request.getInitialSplitPoints()) { + finalSplitPoints = std::move(*request.getInitialSplitPoints()); + } else if (!tags.empty()) { + // no need to find split points since we will create chunks based on + // the existing zones + uassert(ErrorCodes::InvalidOptions, + str::stream() << "found existing zones but the collection is not empty", + isEmpty); + } else { + InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( + shardKeyPattern, + isEmpty, + numShards, + request.getNumInitialChunks(), + &initialSplitPoints, + &finalSplitPoints); + } + + result << "collectionsharded" << nss.ns(); + if (uuid) { + result << "collectionUUID" << *uuid; + } + + critSec.enterCommitPhase(); + + LOG(0) << "CMD: shardcollection: " << cmdObj; + + audit::logShardCollection( + Client::getCurrent(), nss.ns(), proposedKey, request.getUnique()); + + // The initial chunks are distributed evenly across shards if the initial split + // points were specified in the request by mapReduce or if we are using a hashed + // shard key. Otherwise, all the initial chunks are placed on the primary shard. + const bool fromMapReduce = bool(request.getInitialSplitPoints()); + const int numContiguousChunksPerShard = initialSplitPoints.empty() + ? 1 + : (finalSplitPoints.size() + 1) / (initialSplitPoints.size() + 1); + + // Step 6. Actually shard the collection. + shardCollection(opCtx, + nss, + uuid, + shardKeyPattern, + *request.getCollation(), + request.getUnique(), + finalSplitPoints, + tags, + fromMapReduce, + ShardingState::get(opCtx)->shardId(), + numContiguousChunksPerShard); + + status = Status::OK(); + } catch (const DBException& e) { + status = e.toStatus(); + } catch (const std::exception& e) { + scopedShardCollection.signalComplete( + {ErrorCodes::InternalError, + str::stream() + << "Severe error occurred while running shardCollection command: " + << e.what()}); + throw; + } + scopedShardCollection.signalComplete(status); } - critSec.enterCommitPhase(); - - LOG(0) << "CMD: shardcollection: " << cmdObj; - - audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, request.getUnique()); - - // The initial chunks are distributed evenly across shards if the initial split points were - // specified in the request by mapReduce or if we are using a hashed shard key. Otherwise, - // all the initial chunks are placed on the primary shard. - const bool fromMapReduce = bool(request.getInitialSplitPoints()); - const int numContiguousChunksPerShard = initialSplitPoints.empty() - ? 1 - : (finalSplitPoints.size() + 1) / (initialSplitPoints.size() + 1); - - // Step 6. Actually shard the collection. - shardCollection(opCtx, - nss, - uuid, - shardKeyPattern, - *request.getCollation(), - request.getUnique(), - finalSplitPoints, - tags, - fromMapReduce, - ShardingState::get(opCtx)->shardId(), - numContiguousChunksPerShard); + uassertStatusOK(status); return true; } diff --git a/src/mongo/s/request_types/shard_collection.idl b/src/mongo/s/request_types/shard_collection.idl index 08b4e2438e0..5a1e9229d38 100644 --- a/src/mongo/s/request_types/shard_collection.idl +++ b/src/mongo/s/request_types/shard_collection.idl @@ -161,4 +161,4 @@ structs: collectionUUID: type: uuid description: "The UUID of the collection that just got sharded." - optional: true + optional: true
\ No newline at end of file |