diff options
author | jannaerin <golden.janna@gmail.com> | 2018-10-25 17:47:55 -0400 |
---|---|---|
committer | jannaerin <golden.janna@gmail.com> | 2018-11-01 10:34:10 -0400 |
commit | c06cea15dcc13c7e8777be5da229b1423ae7465b (patch) | |
tree | e098709d750a73988202ea4b0713c7472b5ef5cd /src/mongo/db/s/active_shard_collection_registry.cpp | |
parent | 829b78b679188e672e813ce9d5b03334ae781d1c (diff) | |
download | mongo-c06cea15dcc13c7e8777be5da229b1423ae7465b.tar.gz |
SERVER-37354 Make _shardsvrShardCollection re-entrant
Diffstat (limited to 'src/mongo/db/s/active_shard_collection_registry.cpp')
-rw-r--r-- | src/mongo/db/s/active_shard_collection_registry.cpp | 179 |
1 files changed, 179 insertions, 0 deletions
diff --git a/src/mongo/db/s/active_shard_collection_registry.cpp b/src/mongo/db/s/active_shard_collection_registry.cpp new file mode 100644 index 00000000000..9a168491262 --- /dev/null +++ b/src/mongo/db/s/active_shard_collection_registry.cpp @@ -0,0 +1,179 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/active_shard_collection_registry.h" + +#include "mongo/db/catalog_raii.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/service_context.h" + +namespace mongo { +namespace { + +const auto getRegistry = ServiceContext::declareDecoration<ActiveShardCollectionRegistry>(); + +bool ActiveShardsvrShardCollectionEqualsNewRequest(const ShardsvrShardCollection& activeRequest, + const ShardsvrShardCollection& newRequest) { + if (activeRequest.get_shardsvrShardCollection().get() != + newRequest.get_shardsvrShardCollection().get()) + return false; + if (activeRequest.getKey().woCompare(newRequest.getKey()) != 0) + return false; + if (activeRequest.getUnique() != newRequest.getUnique()) + return false; + if (activeRequest.getNumInitialChunks() != newRequest.getNumInitialChunks()) + return false; + if ((activeRequest.getCollation() && newRequest.getCollation()) && + (activeRequest.getCollation().get().woCompare(newRequest.getCollation().get()) != 0)) + return false; + if (activeRequest.getGetUUIDfromPrimaryShard() != newRequest.getGetUUIDfromPrimaryShard()) + return false; + + if (activeRequest.getInitialSplitPoints() && newRequest.getInitialSplitPoints()) { + if (activeRequest.getInitialSplitPoints().get().size() != + newRequest.getInitialSplitPoints().get().size()) { + return false; + } else { + for (std::size_t i = 0; i < activeRequest.getInitialSplitPoints().get().size(); i++) { + if (activeRequest.getInitialSplitPoints().get()[i].woCompare( + newRequest.getInitialSplitPoints().get()[i]) != 0) + return false; + } + } + } + + return true; +} + +} // namespace + +ActiveShardCollectionRegistry::ActiveShardCollectionRegistry() = default; + +ActiveShardCollectionRegistry::~ActiveShardCollectionRegistry() { + invariant(_activeShardCollectionMap.empty()); +} + +ActiveShardCollectionRegistry& ActiveShardCollectionRegistry::get(ServiceContext* service) { + return getRegistry(service); +} + +ActiveShardCollectionRegistry& ActiveShardCollectionRegistry::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +StatusWith<ScopedShardCollection> ActiveShardCollectionRegistry::registerShardCollection( + const ShardsvrShardCollection& request) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + std::string nss = request.get_shardsvrShardCollection().get().ns(); + + auto iter = _activeShardCollectionMap.find(nss); + if (iter == _activeShardCollectionMap.end()) { + auto activeShardCollectionState = ActiveShardCollectionState(request); + _activeShardCollectionMap.try_emplace(nss, activeShardCollectionState); + + return {ScopedShardCollection(nss, this, true, activeShardCollectionState.notification)}; + } else { + auto activeShardCollectionState = iter->second; + + if (ActiveShardsvrShardCollectionEqualsNewRequest(activeShardCollectionState.activeRequest, + request)) { + return {ScopedShardCollection( + nss, nullptr, false, activeShardCollectionState.notification)}; + } + return activeShardCollectionState.constructErrorStatus(request); + } +} + +void ActiveShardCollectionRegistry::_clearShardCollection(std::string nss) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto iter = _activeShardCollectionMap.find(nss); + invariant(iter != _activeShardCollectionMap.end()); + _activeShardCollectionMap.erase(nss); +} + +Status ActiveShardCollectionRegistry::ActiveShardCollectionState::constructErrorStatus( + const ShardsvrShardCollection& request) const { + return {ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unable to shard collection " + << request.get_shardsvrShardCollection().get().ns() + << " with arguments: " + << request.toBSON() + << " because this shard is currently running shard collection on this " + << "collection with arguments: " + << activeRequest.toBSON()}; +} + +ScopedShardCollection::ScopedShardCollection( + std::string nss, + ActiveShardCollectionRegistry* registry, + bool shouldExecute, + std::shared_ptr<Notification<Status>> completionNotification) + : _nss(nss), + _registry(registry), + _shouldExecute(shouldExecute), + _completionNotification(std::move(completionNotification)) {} + +ScopedShardCollection::~ScopedShardCollection() { + if (_registry && _shouldExecute) { + // If this is a newly started shard collection the caller must always signal on completion + invariant(*_completionNotification); + _registry->_clearShardCollection(_nss); + } +} + +ScopedShardCollection::ScopedShardCollection(ScopedShardCollection&& other) { + *this = std::move(other); +} + +ScopedShardCollection& ScopedShardCollection::operator=(ScopedShardCollection&& other) { + if (&other != this) { + _registry = other._registry; + other._registry = nullptr; + _shouldExecute = other._shouldExecute; + _completionNotification = std::move(other._completionNotification); + _nss = std::move(other._nss); + } + + return *this; +} + +void ScopedShardCollection::signalComplete(Status status) { + invariant(_shouldExecute); + _completionNotification->set(status); +} + +Status ScopedShardCollection::waitForCompletion(OperationContext* opCtx) { + invariant(!_shouldExecute); + return _completionNotification->get(opCtx); +} + +} // namespace mongo |