summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/active_shard_collection_registry.cpp
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2018-10-25 17:47:55 -0400
committerjannaerin <golden.janna@gmail.com>2018-11-01 10:34:10 -0400
commitc06cea15dcc13c7e8777be5da229b1423ae7465b (patch)
treee098709d750a73988202ea4b0713c7472b5ef5cd /src/mongo/db/s/active_shard_collection_registry.cpp
parent829b78b679188e672e813ce9d5b03334ae781d1c (diff)
downloadmongo-c06cea15dcc13c7e8777be5da229b1423ae7465b.tar.gz
SERVER-37354 Make _shardsvrShardCollection re-entrant
Diffstat (limited to 'src/mongo/db/s/active_shard_collection_registry.cpp')
-rw-r--r--src/mongo/db/s/active_shard_collection_registry.cpp179
1 files changed, 179 insertions, 0 deletions
diff --git a/src/mongo/db/s/active_shard_collection_registry.cpp b/src/mongo/db/s/active_shard_collection_registry.cpp
new file mode 100644
index 00000000000..9a168491262
--- /dev/null
+++ b/src/mongo/db/s/active_shard_collection_registry.cpp
@@ -0,0 +1,179 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/active_shard_collection_registry.h"
+
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/service_context.h"
+
+namespace mongo {
+namespace {
+
+const auto getRegistry = ServiceContext::declareDecoration<ActiveShardCollectionRegistry>();
+
+bool ActiveShardsvrShardCollectionEqualsNewRequest(const ShardsvrShardCollection& activeRequest,
+ const ShardsvrShardCollection& newRequest) {
+ if (activeRequest.get_shardsvrShardCollection().get() !=
+ newRequest.get_shardsvrShardCollection().get())
+ return false;
+ if (activeRequest.getKey().woCompare(newRequest.getKey()) != 0)
+ return false;
+ if (activeRequest.getUnique() != newRequest.getUnique())
+ return false;
+ if (activeRequest.getNumInitialChunks() != newRequest.getNumInitialChunks())
+ return false;
+ if ((activeRequest.getCollation() && newRequest.getCollation()) &&
+ (activeRequest.getCollation().get().woCompare(newRequest.getCollation().get()) != 0))
+ return false;
+ if (activeRequest.getGetUUIDfromPrimaryShard() != newRequest.getGetUUIDfromPrimaryShard())
+ return false;
+
+ if (activeRequest.getInitialSplitPoints() && newRequest.getInitialSplitPoints()) {
+ if (activeRequest.getInitialSplitPoints().get().size() !=
+ newRequest.getInitialSplitPoints().get().size()) {
+ return false;
+ } else {
+ for (std::size_t i = 0; i < activeRequest.getInitialSplitPoints().get().size(); i++) {
+ if (activeRequest.getInitialSplitPoints().get()[i].woCompare(
+ newRequest.getInitialSplitPoints().get()[i]) != 0)
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+} // namespace
+
+ActiveShardCollectionRegistry::ActiveShardCollectionRegistry() = default;
+
+ActiveShardCollectionRegistry::~ActiveShardCollectionRegistry() {
+ invariant(_activeShardCollectionMap.empty());
+}
+
+ActiveShardCollectionRegistry& ActiveShardCollectionRegistry::get(ServiceContext* service) {
+ return getRegistry(service);
+}
+
+ActiveShardCollectionRegistry& ActiveShardCollectionRegistry::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+StatusWith<ScopedShardCollection> ActiveShardCollectionRegistry::registerShardCollection(
+ const ShardsvrShardCollection& request) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ std::string nss = request.get_shardsvrShardCollection().get().ns();
+
+ auto iter = _activeShardCollectionMap.find(nss);
+ if (iter == _activeShardCollectionMap.end()) {
+ auto activeShardCollectionState = ActiveShardCollectionState(request);
+ _activeShardCollectionMap.try_emplace(nss, activeShardCollectionState);
+
+ return {ScopedShardCollection(nss, this, true, activeShardCollectionState.notification)};
+ } else {
+ auto activeShardCollectionState = iter->second;
+
+ if (ActiveShardsvrShardCollectionEqualsNewRequest(activeShardCollectionState.activeRequest,
+ request)) {
+ return {ScopedShardCollection(
+ nss, nullptr, false, activeShardCollectionState.notification)};
+ }
+ return activeShardCollectionState.constructErrorStatus(request);
+ }
+}
+
+void ActiveShardCollectionRegistry::_clearShardCollection(std::string nss) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto iter = _activeShardCollectionMap.find(nss);
+ invariant(iter != _activeShardCollectionMap.end());
+ _activeShardCollectionMap.erase(nss);
+}
+
+Status ActiveShardCollectionRegistry::ActiveShardCollectionState::constructErrorStatus(
+ const ShardsvrShardCollection& request) const {
+ return {ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Unable to shard collection "
+ << request.get_shardsvrShardCollection().get().ns()
+ << " with arguments: "
+ << request.toBSON()
+ << " because this shard is currently running shard collection on this "
+ << "collection with arguments: "
+ << activeRequest.toBSON()};
+}
+
+ScopedShardCollection::ScopedShardCollection(
+ std::string nss,
+ ActiveShardCollectionRegistry* registry,
+ bool shouldExecute,
+ std::shared_ptr<Notification<Status>> completionNotification)
+ : _nss(nss),
+ _registry(registry),
+ _shouldExecute(shouldExecute),
+ _completionNotification(std::move(completionNotification)) {}
+
+ScopedShardCollection::~ScopedShardCollection() {
+ if (_registry && _shouldExecute) {
+ // If this is a newly started shard collection the caller must always signal on completion
+ invariant(*_completionNotification);
+ _registry->_clearShardCollection(_nss);
+ }
+}
+
+ScopedShardCollection::ScopedShardCollection(ScopedShardCollection&& other) {
+ *this = std::move(other);
+}
+
+ScopedShardCollection& ScopedShardCollection::operator=(ScopedShardCollection&& other) {
+ if (&other != this) {
+ _registry = other._registry;
+ other._registry = nullptr;
+ _shouldExecute = other._shouldExecute;
+ _completionNotification = std::move(other._completionNotification);
+ _nss = std::move(other._nss);
+ }
+
+ return *this;
+}
+
+void ScopedShardCollection::signalComplete(Status status) {
+ invariant(_shouldExecute);
+ _completionNotification->set(status);
+}
+
+Status ScopedShardCollection::waitForCompletion(OperationContext* opCtx) {
+ invariant(!_shouldExecute);
+ return _completionNotification->get(opCtx);
+}
+
+} // namespace mongo