summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2018-10-25 17:47:55 -0400
committerjannaerin <golden.janna@gmail.com>2018-11-01 10:34:10 -0400
commitc06cea15dcc13c7e8777be5da229b1423ae7465b (patch)
treee098709d750a73988202ea4b0713c7472b5ef5cd
parent829b78b679188e672e813ce9d5b03334ae781d1c (diff)
downloadmongo-c06cea15dcc13c7e8777be5da229b1423ae7465b.tar.gz
SERVER-37354 Make _shardsvrShardCollection re-entrant
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/active_shard_collection_registry.cpp179
-rw-r--r--src/mongo/db/s/active_shard_collection_registry.h169
-rw-r--r--src/mongo/db/s/active_shard_collection_registry_test.cpp190
-rw-r--r--src/mongo/db/s/config/configsvr_shard_collection_command.cpp18
-rw-r--r--src/mongo/db/s/shardsvr_shard_collection.cpp188
-rw-r--r--src/mongo/s/request_types/shard_collection.idl2
7 files changed, 656 insertions, 92 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 38277dd005f..b91d82a2691 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -32,6 +32,7 @@ env.Library(
source=[
'active_migrations_registry.cpp',
'active_move_primaries_registry.cpp',
+ 'active_shard_collection_registry.cpp',
'chunk_move_write_concern_options.cpp',
'chunk_splitter.cpp',
'collection_range_deleter.cpp',
@@ -331,6 +332,7 @@ env.CppUnitTest(
source=[
'active_migrations_registry_test.cpp',
'active_move_primaries_registry_test.cpp',
+ 'active_shard_collection_registry_test.cpp',
'catalog_cache_loader_mock.cpp',
'implicit_create_collection_test.cpp',
'migration_chunk_cloner_source_legacy_test.cpp',
diff --git a/src/mongo/db/s/active_shard_collection_registry.cpp b/src/mongo/db/s/active_shard_collection_registry.cpp
new file mode 100644
index 00000000000..9a168491262
--- /dev/null
+++ b/src/mongo/db/s/active_shard_collection_registry.cpp
@@ -0,0 +1,179 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/active_shard_collection_registry.h"
+
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/service_context.h"
+
+namespace mongo {
+namespace {
+
+const auto getRegistry = ServiceContext::declareDecoration<ActiveShardCollectionRegistry>();
+
+bool ActiveShardsvrShardCollectionEqualsNewRequest(const ShardsvrShardCollection& activeRequest,
+ const ShardsvrShardCollection& newRequest) {
+ if (activeRequest.get_shardsvrShardCollection().get() !=
+ newRequest.get_shardsvrShardCollection().get())
+ return false;
+ if (activeRequest.getKey().woCompare(newRequest.getKey()) != 0)
+ return false;
+ if (activeRequest.getUnique() != newRequest.getUnique())
+ return false;
+ if (activeRequest.getNumInitialChunks() != newRequest.getNumInitialChunks())
+ return false;
+ if ((activeRequest.getCollation() && newRequest.getCollation()) &&
+ (activeRequest.getCollation().get().woCompare(newRequest.getCollation().get()) != 0))
+ return false;
+ if (activeRequest.getGetUUIDfromPrimaryShard() != newRequest.getGetUUIDfromPrimaryShard())
+ return false;
+
+ if (activeRequest.getInitialSplitPoints() && newRequest.getInitialSplitPoints()) {
+ if (activeRequest.getInitialSplitPoints().get().size() !=
+ newRequest.getInitialSplitPoints().get().size()) {
+ return false;
+ } else {
+ for (std::size_t i = 0; i < activeRequest.getInitialSplitPoints().get().size(); i++) {
+ if (activeRequest.getInitialSplitPoints().get()[i].woCompare(
+ newRequest.getInitialSplitPoints().get()[i]) != 0)
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+} // namespace
+
+ActiveShardCollectionRegistry::ActiveShardCollectionRegistry() = default;
+
+ActiveShardCollectionRegistry::~ActiveShardCollectionRegistry() {
+ invariant(_activeShardCollectionMap.empty());
+}
+
+ActiveShardCollectionRegistry& ActiveShardCollectionRegistry::get(ServiceContext* service) {
+ return getRegistry(service);
+}
+
+ActiveShardCollectionRegistry& ActiveShardCollectionRegistry::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+StatusWith<ScopedShardCollection> ActiveShardCollectionRegistry::registerShardCollection(
+ const ShardsvrShardCollection& request) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ std::string nss = request.get_shardsvrShardCollection().get().ns();
+
+ auto iter = _activeShardCollectionMap.find(nss);
+ if (iter == _activeShardCollectionMap.end()) {
+ auto activeShardCollectionState = ActiveShardCollectionState(request);
+ _activeShardCollectionMap.try_emplace(nss, activeShardCollectionState);
+
+ return {ScopedShardCollection(nss, this, true, activeShardCollectionState.notification)};
+ } else {
+ auto activeShardCollectionState = iter->second;
+
+ if (ActiveShardsvrShardCollectionEqualsNewRequest(activeShardCollectionState.activeRequest,
+ request)) {
+ return {ScopedShardCollection(
+ nss, nullptr, false, activeShardCollectionState.notification)};
+ }
+ return activeShardCollectionState.constructErrorStatus(request);
+ }
+}
+
+void ActiveShardCollectionRegistry::_clearShardCollection(std::string nss) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto iter = _activeShardCollectionMap.find(nss);
+ invariant(iter != _activeShardCollectionMap.end());
+ _activeShardCollectionMap.erase(nss);
+}
+
+Status ActiveShardCollectionRegistry::ActiveShardCollectionState::constructErrorStatus(
+ const ShardsvrShardCollection& request) const {
+ return {ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Unable to shard collection "
+ << request.get_shardsvrShardCollection().get().ns()
+ << " with arguments: "
+ << request.toBSON()
+ << " because this shard is currently running shard collection on this "
+ << "collection with arguments: "
+ << activeRequest.toBSON()};
+}
+
+ScopedShardCollection::ScopedShardCollection(
+ std::string nss,
+ ActiveShardCollectionRegistry* registry,
+ bool shouldExecute,
+ std::shared_ptr<Notification<Status>> completionNotification)
+ : _nss(nss),
+ _registry(registry),
+ _shouldExecute(shouldExecute),
+ _completionNotification(std::move(completionNotification)) {}
+
+ScopedShardCollection::~ScopedShardCollection() {
+ if (_registry && _shouldExecute) {
+ // If this is a newly started shard collection the caller must always signal on completion
+ invariant(*_completionNotification);
+ _registry->_clearShardCollection(_nss);
+ }
+}
+
+ScopedShardCollection::ScopedShardCollection(ScopedShardCollection&& other) {
+ *this = std::move(other);
+}
+
+ScopedShardCollection& ScopedShardCollection::operator=(ScopedShardCollection&& other) {
+ if (&other != this) {
+ _registry = other._registry;
+ other._registry = nullptr;
+ _shouldExecute = other._shouldExecute;
+ _completionNotification = std::move(other._completionNotification);
+ _nss = std::move(other._nss);
+ }
+
+ return *this;
+}
+
+void ScopedShardCollection::signalComplete(Status status) {
+ invariant(_shouldExecute);
+ _completionNotification->set(status);
+}
+
+Status ScopedShardCollection::waitForCompletion(OperationContext* opCtx) {
+ invariant(!_shouldExecute);
+ return _completionNotification->get(opCtx);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/active_shard_collection_registry.h b/src/mongo/db/s/active_shard_collection_registry.h
new file mode 100644
index 00000000000..216f735c1c5
--- /dev/null
+++ b/src/mongo/db/s/active_shard_collection_registry.h
@@ -0,0 +1,169 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/s/request_types/shard_collection_gen.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/notification.h"
+
+namespace mongo {
+
+class OperationContext;
+class ScopedShardCollection;
+template <typename T>
+class StatusWith;
+
+/**
+ * Thread-safe object that keeps track of any active shardCollection commands running. There is only
+ * one instance of this object per shard.
+ */
+class ActiveShardCollectionRegistry {
+ MONGO_DISALLOW_COPYING(ActiveShardCollectionRegistry);
+
+public:
+ ActiveShardCollectionRegistry();
+ ~ActiveShardCollectionRegistry();
+
+ static ActiveShardCollectionRegistry& get(ServiceContext* service);
+ static ActiveShardCollectionRegistry& get(OperationContext* opCtx);
+
+ /**
+ * If the collection being requested to shard is not already being sharded on this shard,
+ * registers an active shardCollection with the specified arguments. Returns a
+ * ScopedShardCollection object, which must be signaled by the caller before it goes out of
+ * scope.
+ *
+ * If this collection is already being sharded on this shard and it has the exact same
+ * arguments, returns a ScopedShardCollection. The ScopedShardCollection can be used to join the
+ * already running shard collection.
+ *
+ * Otherwise returns a ConflictingOperationInProgress error.
+ */
+ StatusWith<ScopedShardCollection> registerShardCollection(
+ const ShardsvrShardCollection& request);
+
+private:
+ friend class ScopedShardCollection;
+
+ // Describes the state of a currently active shardCollection operation
+ struct ActiveShardCollectionState {
+ ActiveShardCollectionState(ShardsvrShardCollection inRequest)
+ : activeRequest(std::move(inRequest)),
+ notification(std::make_shared<Notification<Status>>()) {}
+
+ /**
+ * Constructs an error status to return in the case of conflicting operations.
+ */
+ Status constructErrorStatus(const ShardsvrShardCollection& request) const;
+
+ // Exact arguments of the currently active operation
+ ShardsvrShardCollection activeRequest;
+
+ // Notification event that will be signaled when the currently active operation completes
+ std::shared_ptr<Notification<Status>> notification;
+ };
+
+ /**
+ * Unregisters a previously registered namespace with an ongoing shardCollection. Must only be
+ * called if a previous call to registerShardCollection has succeeded.
+ */
+ void _clearShardCollection(std::string nss);
+
+ // Protects the state below
+ stdx::mutex _mutex;
+
+ // Map containing any collections currently being sharded
+ StringMap<ActiveShardCollectionState> _activeShardCollectionMap;
+};
+
+/**
+ * Object of this class is returned from the registerShardCollection call of the active shard
+ * collection registry. It can exist in two modes - 'execute' and 'join'. See the comments for
+ * registerShardCollection method for more details.
+ */
+class ScopedShardCollection {
+ MONGO_DISALLOW_COPYING(ScopedShardCollection);
+
+public:
+ ScopedShardCollection(std::string nss,
+ ActiveShardCollectionRegistry* registry,
+ bool shouldExecute,
+ std::shared_ptr<Notification<Status>> completionNotification);
+ ~ScopedShardCollection();
+
+ ScopedShardCollection(ScopedShardCollection&&);
+ ScopedShardCollection& operator=(ScopedShardCollection&&);
+
+ /**
+ * Returns true if the shardCollection object is in the 'execute' mode. This means that the
+ * caller can execute the shardCollection command. The holder must execute the command and call
+ * signalComplete with a status.
+ */
+ bool mustExecute() const {
+ return _shouldExecute;
+ }
+
+ /**
+ * Must only be called if the object is in the 'execute' mode when the shardCollection command
+ * was invoked (the command immediately executed). Signals any callers that might be blocked in
+ * waitForCompletion.
+ */
+ void signalComplete(Status status);
+
+ /**
+ * Must only be called if the object is in the 'join' mode. Blocks until the main executor of
+ * the shardCollection command calls signalComplete.
+ */
+ Status waitForCompletion(OperationContext* opCtx);
+
+private:
+ // Namespace of collection being sharded
+ std::string _nss;
+
+ // Registry from which to unregister the migration. Not owned.
+ ActiveShardCollectionRegistry* _registry;
+
+ /**
+ * Whether the holder is the first in line for a newly started shardCollection (in which case
+ * the destructor must unregister) or the caller is joining on an already-running
+ * shardCollection (in which case the caller must block and wait for completion).
+ */
+ bool _shouldExecute;
+
+ // This is the future, which will be signaled at the end of shardCollection
+ std::shared_ptr<Notification<Status>> _completionNotification;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/active_shard_collection_registry_test.cpp b/src/mongo/db/s/active_shard_collection_registry_test.cpp
new file mode 100644
index 00000000000..e71aaaa1a64
--- /dev/null
+++ b/src/mongo/db/s/active_shard_collection_registry_test.cpp
@@ -0,0 +1,190 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/client.h"
+#include "mongo/db/s/active_shard_collection_registry.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/s/request_types/shard_collection_gen.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+using unittest::assertGet;
+
+class ShardCollectionRegistrationTest : public ServiceContextMongoDTest {
+protected:
+ ActiveShardCollectionRegistry _registry;
+};
+
+ShardsvrShardCollection createShardsvrShardCollectionRequest(
+ const NamespaceString& nss,
+ BSONObj key,
+ bool unique,
+ int numInitialChunks,
+ boost::optional<std::vector<mongo::BSONObj>> initialSplitPoints,
+ boost::optional<mongo::BSONObj> collation,
+ bool UUIDfromPrimaryShard) {
+ ShardsvrShardCollection shardsvrShardCollectionRequest;
+ shardsvrShardCollectionRequest.set_shardsvrShardCollection(nss);
+ shardsvrShardCollectionRequest.setKey(key);
+ shardsvrShardCollectionRequest.setUnique(unique);
+ shardsvrShardCollectionRequest.setNumInitialChunks(numInitialChunks);
+ shardsvrShardCollectionRequest.setInitialSplitPoints(initialSplitPoints);
+ shardsvrShardCollectionRequest.setCollation(collation);
+ shardsvrShardCollectionRequest.setGetUUIDfromPrimaryShard(UUIDfromPrimaryShard);
+
+ return shardsvrShardCollectionRequest;
+}
+
+TEST_F(ShardCollectionRegistrationTest, ScopedShardCollectionConstructorAndAssignment) {
+ auto shardsvrShardCollectionRequest =
+ createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"),
+ BSON("x"
+ << "hashed"),
+ false,
+ 1,
+ boost::none,
+ boost::none,
+ false);
+ auto originalScopedShardCollection =
+ assertGet(_registry.registerShardCollection(shardsvrShardCollectionRequest));
+ ASSERT(originalScopedShardCollection.mustExecute());
+
+ ScopedShardCollection movedScopedShardCollection(std::move(originalScopedShardCollection));
+ ASSERT(movedScopedShardCollection.mustExecute());
+
+ originalScopedShardCollection = std::move(movedScopedShardCollection);
+ ASSERT(originalScopedShardCollection.mustExecute());
+
+ // Need to signal the registered shard collection so the destructor doesn't invariant
+ originalScopedShardCollection.signalComplete(Status::OK());
+}
+
+TEST_F(ShardCollectionRegistrationTest,
+ SecondShardCollectionWithDifferentOptionsReturnsConflictingOperationInProgress) {
+ auto firstShardsvrShardCollectionRequest =
+ createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"),
+ BSON("x"
+ << "hashed"),
+ false,
+ 1,
+ boost::none,
+ boost::none,
+ false);
+ auto originalScopedShardCollection =
+ assertGet(_registry.registerShardCollection(firstShardsvrShardCollectionRequest));
+
+ auto secondShardsvrShardCollectionRequest =
+ createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"),
+ BSON("x" << 0),
+ false,
+ 1,
+ boost::none,
+ boost::none,
+ false);
+ auto secondScopedShardCollection =
+ _registry.registerShardCollection(secondShardsvrShardCollectionRequest);
+ ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, secondScopedShardCollection.getStatus());
+
+ originalScopedShardCollection.signalComplete(Status::OK());
+}
+
+TEST_F(ShardCollectionRegistrationTest, SecondShardCollectionWithSameOptionsJoinsFirst) {
+ auto firstShardsvrShardCollectionRequest =
+ createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"),
+ BSON("x"
+ << "hashed"),
+ false,
+ 1,
+ boost::none,
+ boost::none,
+ false);
+ auto originalScopedShardCollection =
+ assertGet(_registry.registerShardCollection(firstShardsvrShardCollectionRequest));
+ ASSERT(originalScopedShardCollection.mustExecute());
+
+ auto secondShardsvrShardCollectionRequest =
+ createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"),
+ BSON("x"
+ << "hashed"),
+ false,
+ 1,
+ boost::none,
+ boost::none,
+ false);
+ auto secondScopedShardCollection =
+ assertGet(_registry.registerShardCollection(secondShardsvrShardCollectionRequest));
+ ASSERT(!secondScopedShardCollection.mustExecute());
+
+ originalScopedShardCollection.signalComplete({ErrorCodes::InternalError, "Test error"});
+ auto opCtx = makeOperationContext();
+ ASSERT_EQ(Status(ErrorCodes::InternalError, "Test error"),
+ secondScopedShardCollection.waitForCompletion(opCtx.get()));
+}
+
+TEST_F(ShardCollectionRegistrationTest, TwoShardCollectionsOnDifferentCollectionsAllowed) {
+ auto firstShardsvrShardCollectionRequest =
+ createShardsvrShardCollectionRequest(NamespaceString("TestDB", "TestColl"),
+ BSON("x"
+ << "hashed"),
+ false,
+ 1,
+ boost::none,
+ boost::none,
+ false);
+ auto originalScopedShardCollection =
+ assertGet(_registry.registerShardCollection(firstShardsvrShardCollectionRequest));
+ ASSERT(originalScopedShardCollection.mustExecute());
+
+ auto secondShardsvrShardCollectionRequest =
+ createShardsvrShardCollectionRequest(NamespaceString("TestDB2", "TestColl2"),
+ BSON("x"
+ << "hashed"),
+ false,
+ 1,
+ boost::none,
+ boost::none,
+ false);
+ auto secondScopedShardCollection =
+ assertGet(_registry.registerShardCollection(secondShardsvrShardCollectionRequest));
+ ASSERT(secondScopedShardCollection.mustExecute());
+
+ originalScopedShardCollection.signalComplete(Status::OK());
+ secondScopedShardCollection.signalComplete(Status::OK());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
index f618351fe51..ad655601c9e 100644
--- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
@@ -769,17 +769,13 @@ public:
shardsvrShardCollectionRequest.setGetUUIDfromPrimaryShard(
request.getGetUUIDfromPrimaryShard());
- // TODO(SERVER-37354): Remove the 'runWithoutInterruption' block.
- auto cmdResponse = opCtx->runWithoutInterruption([&] {
- return uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- "admin",
- CommandHelpers::appendMajorityWriteConcern(CommandHelpers::appendPassthroughFields(
- cmdObj, shardsvrShardCollectionRequest.toBSON())),
- Shard::RetryPolicy::kIdempotent));
- });
-
+ auto cmdResponse = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ "admin",
+ CommandHelpers::appendMajorityWriteConcern(CommandHelpers::appendPassthroughFields(
+ cmdObj, shardsvrShardCollectionRequest.toBSON())),
+ Shard::RetryPolicy::kIdempotent));
if (cmdResponse.commandStatus != ErrorCodes::CommandNotFound) {
uassertStatusOK(cmdResponse.commandStatus);
diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp
index bb6f1183557..a87de2ff318 100644
--- a/src/mongo/db/s/shardsvr_shard_collection.cpp
+++ b/src/mongo/db/s/shardsvr_shard_collection.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/logical_clock.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
+#include "mongo/db/s/active_shard_collection_registry.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/config/initial_split_policy.h"
#include "mongo/db/s/config/sharding_catalog_manager.h"
@@ -629,90 +630,117 @@ public:
IDLParserErrorContext("_shardsvrShardCollection"), cmdObj);
const NamespaceString nss(parseNs(dbname, cmdObj));
- // Take the collection critical section so that no writes can happen.
- CollectionCriticalSection critSec(opCtx, nss);
+ auto scopedShardCollection = uassertStatusOK(
+ ActiveShardCollectionRegistry::get(opCtx).registerShardCollection(request));
+ Status status = {ErrorCodes::InternalError, "Uninitialized value"};
- auto proposedKey(request.getKey().getOwned());
- ShardKeyPattern shardKeyPattern(proposedKey);
-
- createCollectionOrValidateExisting(opCtx, nss, proposedKey, shardKeyPattern, request);
-
- // Read zone info
- auto tags = getExistingTags(opCtx, nss);
-
- if (!tags.empty()) {
- validateShardKeyAgainstExistingZones(opCtx, proposedKey, shardKeyPattern, tags);
- }
-
- boost::optional<UUID> uuid;
- if (request.getGetUUIDfromPrimaryShard()) {
- uuid = getUUIDFromPrimaryShard(opCtx, nss);
- } else {
- uuid = UUID::gen();
- }
-
- auto shardRegistry = Grid::get(opCtx)->shardRegistry();
- shardRegistry->reload(opCtx);
-
- DBDirectClient localClient(opCtx);
- bool isEmpty = (localClient.count(nss.ns()) == 0);
-
- std::vector<ShardId> shardIds;
- shardRegistry->getAllShardIds(opCtx, &shardIds);
- const int numShards = shardIds.size();
-
- std::vector<BSONObj> initialSplitPoints;
- std::vector<BSONObj> finalSplitPoints;
-
- if (request.getInitialSplitPoints()) {
- finalSplitPoints = std::move(*request.getInitialSplitPoints());
- } else if (!tags.empty()) {
- // no need to find split points since we will create chunks based on
- // the existing zones
- uassert(ErrorCodes::InvalidOptions,
- str::stream() << "found existing zones but the collection is not empty",
- isEmpty);
+ // Check if this collection is currently being sharded and if so, join it
+ if (!scopedShardCollection.mustExecute()) {
+ status = scopedShardCollection.waitForCompletion(opCtx);
} else {
- InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection(
- shardKeyPattern,
- isEmpty,
- numShards,
- request.getNumInitialChunks(),
- &initialSplitPoints,
- &finalSplitPoints);
- }
-
- result << "collectionsharded" << nss.ns();
- if (uuid) {
- result << "collectionUUID" << *uuid;
+ try {
+ // Take the collection critical section so that no writes can happen.
+ CollectionCriticalSection critSec(opCtx, nss);
+
+ auto proposedKey(request.getKey().getOwned());
+ ShardKeyPattern shardKeyPattern(proposedKey);
+
+ createCollectionOrValidateExisting(
+ opCtx, nss, proposedKey, shardKeyPattern, request);
+
+ // Read zone info
+ auto tags = getExistingTags(opCtx, nss);
+
+ if (!tags.empty()) {
+ validateShardKeyAgainstExistingZones(opCtx, proposedKey, shardKeyPattern, tags);
+ }
+
+ boost::optional<UUID> uuid;
+ if (request.getGetUUIDfromPrimaryShard()) {
+ uuid = getUUIDFromPrimaryShard(opCtx, nss);
+ } else {
+ uuid = UUID::gen();
+ }
+
+ auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ shardRegistry->reload(opCtx);
+
+ DBDirectClient localClient(opCtx);
+ bool isEmpty = (localClient.count(nss.ns()) == 0);
+
+ std::vector<ShardId> shardIds;
+ shardRegistry->getAllShardIds(opCtx, &shardIds);
+ const int numShards = shardIds.size();
+
+ std::vector<BSONObj> initialSplitPoints;
+ std::vector<BSONObj> finalSplitPoints;
+
+ if (request.getInitialSplitPoints()) {
+ finalSplitPoints = std::move(*request.getInitialSplitPoints());
+ } else if (!tags.empty()) {
+ // no need to find split points since we will create chunks based on
+ // the existing zones
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "found existing zones but the collection is not empty",
+ isEmpty);
+ } else {
+ InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection(
+ shardKeyPattern,
+ isEmpty,
+ numShards,
+ request.getNumInitialChunks(),
+ &initialSplitPoints,
+ &finalSplitPoints);
+ }
+
+ result << "collectionsharded" << nss.ns();
+ if (uuid) {
+ result << "collectionUUID" << *uuid;
+ }
+
+ critSec.enterCommitPhase();
+
+ LOG(0) << "CMD: shardcollection: " << cmdObj;
+
+ audit::logShardCollection(
+ Client::getCurrent(), nss.ns(), proposedKey, request.getUnique());
+
+ // The initial chunks are distributed evenly across shards if the initial split
+ // points were specified in the request by mapReduce or if we are using a hashed
+ // shard key. Otherwise, all the initial chunks are placed on the primary shard.
+ const bool fromMapReduce = bool(request.getInitialSplitPoints());
+ const int numContiguousChunksPerShard = initialSplitPoints.empty()
+ ? 1
+ : (finalSplitPoints.size() + 1) / (initialSplitPoints.size() + 1);
+
+ // Step 6. Actually shard the collection.
+ shardCollection(opCtx,
+ nss,
+ uuid,
+ shardKeyPattern,
+ *request.getCollation(),
+ request.getUnique(),
+ finalSplitPoints,
+ tags,
+ fromMapReduce,
+ ShardingState::get(opCtx)->shardId(),
+ numContiguousChunksPerShard);
+
+ status = Status::OK();
+ } catch (const DBException& e) {
+ status = e.toStatus();
+ } catch (const std::exception& e) {
+ scopedShardCollection.signalComplete(
+ {ErrorCodes::InternalError,
+ str::stream()
+ << "Severe error occurred while running shardCollection command: "
+ << e.what()});
+ throw;
+ }
+ scopedShardCollection.signalComplete(status);
}
- critSec.enterCommitPhase();
-
- LOG(0) << "CMD: shardcollection: " << cmdObj;
-
- audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, request.getUnique());
-
- // The initial chunks are distributed evenly across shards if the initial split points were
- // specified in the request by mapReduce or if we are using a hashed shard key. Otherwise,
- // all the initial chunks are placed on the primary shard.
- const bool fromMapReduce = bool(request.getInitialSplitPoints());
- const int numContiguousChunksPerShard = initialSplitPoints.empty()
- ? 1
- : (finalSplitPoints.size() + 1) / (initialSplitPoints.size() + 1);
-
- // Step 6. Actually shard the collection.
- shardCollection(opCtx,
- nss,
- uuid,
- shardKeyPattern,
- *request.getCollation(),
- request.getUnique(),
- finalSplitPoints,
- tags,
- fromMapReduce,
- ShardingState::get(opCtx)->shardId(),
- numContiguousChunksPerShard);
+ uassertStatusOK(status);
return true;
}
diff --git a/src/mongo/s/request_types/shard_collection.idl b/src/mongo/s/request_types/shard_collection.idl
index 08b4e2438e0..5a1e9229d38 100644
--- a/src/mongo/s/request_types/shard_collection.idl
+++ b/src/mongo/s/request_types/shard_collection.idl
@@ -161,4 +161,4 @@ structs:
collectionUUID:
type: uuid
description: "The UUID of the collection that just got sharded."
- optional: true
+ optional: true \ No newline at end of file