diff options
author | Misha Tyulenev <misha@mongodb.com> | 2016-03-03 13:39:56 -0500 |
---|---|---|
committer | Misha Tyulenev <misha@mongodb.com> | 2016-03-03 13:40:25 -0500 |
commit | bc196517c4162993edbe6ca3669b9cd70865deec (patch) | |
tree | 9bbc44e7875ee29ae3140a8125073ea289d35a5a /src | |
parent | 9778d0678715fc3f9b9f725cfd11ea85ce03b2fc (diff) | |
download | mongo-bc196517c4162993edbe6ca3669b9cd70865deec.tar.gz |
SERVER-22318 remove SCCC support
Diffstat (limited to 'src')
52 files changed, 123 insertions, 5329 deletions
diff --git a/src/mongo/client/parallel.cpp b/src/mongo/client/parallel.cpp index 43865ac7ff0..37d2622538a 100644 --- a/src/mongo/client/parallel.cpp +++ b/src/mongo/client/parallel.cpp @@ -115,7 +115,7 @@ static void throwCursorError(DBClientCursor* cursor) { ParallelSortClusteredCursor::ParallelSortClusteredCursor(const QuerySpec& qSpec, const CommandInfo& cInfo) - : _qSpec(qSpec), _cInfo(cInfo), _totalTries(0), _cmChangeAttempted(false) { + : _qSpec(qSpec), _cInfo(cInfo), _totalTries(0) { _done = false; _didInit = false; @@ -688,15 +688,6 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) { } throw; } catch (DBException& e) { - if (e.getCode() == ErrorCodes::IncompatibleCatalogManager) { - fassert(28792, !_cmChangeAttempted); - _cmChangeAttempted = true; - - grid.forwardingCatalogManager()->waitForCatalogManagerChange(txn); - startInit(txn); - return; - } - warning() << "db exception when initializing on " << shardId << ", current connection state is " << mdata.toBSON() << causedBy(e); e._shard = shardId; diff --git a/src/mongo/client/parallel.h b/src/mongo/client/parallel.h index 65585d352b9..8e91d67b5f1 100644 --- a/src/mongo/client/parallel.h +++ b/src/mongo/client/parallel.h @@ -190,7 +190,6 @@ private: std::map<std::string, int> _staleNSMap; int _totalTries; - bool _cmChangeAttempted; std::map<ShardId, PCMData> _cursorMap; diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index e1c23c300df..995359afd34 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -643,7 +643,6 @@ serveronlyEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) serveronlyLibdeps = [ "$BUILD_DIR/mongo/client/parallel", "$BUILD_DIR/mongo/executor/network_interface_factory", - "$BUILD_DIR/mongo/s/catalog/legacy/catalog_manager_legacy", "$BUILD_DIR/mongo/s/catalog/replset/catalog_manager_replica_set", "$BUILD_DIR/mongo/s/client/sharding_connection_hook", "$BUILD_DIR/mongo/s/coreshard", diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index 8fda1b0b17f..3bb85bb0bc7 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -112,31 +112,6 @@ BSONObj fixindex(const string& newDbName, BSONObj o) { return res; } -namespace { -Status _checkForCatalogManagerChangeIfNeeded(const CloneOptions& opts) { - if (!opts.checkForCatalogChange) { - return Status::OK(); - } - auto catalogManager = grid.forwardingCatalogManager(); - invariant(catalogManager); - - Status status = catalogManager->checkForPendingCatalogChange(); - if (!status.isOK()) { - return status; - } - - auto currentConfigServerMode = catalogManager->getMode(); - if (currentConfigServerMode != opts.initialCatalogMode) { - invariant(opts.initialCatalogMode == CatalogManager::ConfigServerMode::SCCC && - currentConfigServerMode == CatalogManager::ConfigServerMode::CSRS); - return Status(ErrorCodes::IncompatibleCatalogManager, - "CatalogManager was swapped from SCCC to CSRS mode during movePrimary." - "Aborting movePrimary to unblock mongos."); - } - return Status::OK(); -} -} // namespace - Cloner::Cloner() {} struct Cloner::Fun { @@ -144,7 +119,6 @@ struct Cloner::Fun { void operator()(DBClientCursorBatchIterator& i) { invariant(from_collection.coll() != "system.indexes"); - uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(_opts)); // XXX: can probably take dblock instead unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(txn, MODE_X)); @@ -514,7 +488,6 @@ Status Cloner::copyDb(OperationContext* txn, clonedColls->clear(); } - uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts)); { // getCollectionInfos may make a remote call, which may block indefinitely, so release @@ -613,7 +586,6 @@ Status Cloner::copyDb(OperationContext* txn, if (opts.snapshot) q.snapshot(); - uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts)); copy(txn, toDBName, from_name, options, to_name, masterSameProcess, opts, q); // Copy releases the lock, so we need to re-load the database. This should @@ -671,7 +643,6 @@ Status Cloner::copyDb(OperationContext* txn, NamespaceString from_name(opts.fromDB, collectionName); NamespaceString to_name(toDBName, collectionName); - uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts)); copyIndexes(txn, toDBName, diff --git a/src/mongo/db/commands/clone.cpp b/src/mongo/db/commands/clone.cpp index a438dfa95bc..b5632dd44d7 100644 --- a/src/mongo/db/commands/clone.cpp +++ b/src/mongo/db/commands/clone.cpp @@ -114,7 +114,6 @@ public: "Cannot run clone command for use by sharding movePrimary command on a " "node that isn't yet sharding aware")); } - opts.initialCatalogMode = catalogManager->getMode(); } // See if there's any collections we should ignore diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 22dd23b462c..75fefbcb51f 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -1575,26 +1575,6 @@ public: } mapReduceCommand; -namespace { -Status _checkForCatalogManagerChange(ForwardingCatalogManager* catalogManager, - CatalogManager::ConfigServerMode initialConfigServerMode) { - Status status = catalogManager->checkForPendingCatalogChange(); - if (!status.isOK()) { - return status; - } - - auto currentConfigServerMode = catalogManager->getMode(); - if (currentConfigServerMode != initialConfigServerMode) { - invariant(initialConfigServerMode == CatalogManager::ConfigServerMode::SCCC && - currentConfigServerMode == CatalogManager::ConfigServerMode::CSRS); - return Status(ErrorCodes::IncompatibleCatalogManager, - "CatalogManager was swapped from SCCC to CSRS mode during mapreduce." - "Aborting mapreduce to unblock mongos."); - } - return Status::OK(); -} -} // namespace - /** * This class represents a map/reduce command executed on the output server of a sharded env */ @@ -1635,11 +1615,6 @@ public: << dbname)); } - // Store the initial catalog manager mode so we can check if it changes at any point. - CatalogManager::ConfigServerMode initialConfigServerMode = - grid.catalogManager(txn)->getMode(); - - boost::optional<DisableDocumentValidation> maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmdObj)) maybeDisableValidation.emplace(txn); @@ -1737,12 +1712,6 @@ public: BSONObj query; BSONArrayBuilder chunkSizes; while (true) { - Status status = _checkForCatalogManagerChange(grid.forwardingCatalogManager(), - initialConfigServerMode); - if (!status.isOK()) { - return appendCommandStatus(result, status); - } - ChunkPtr chunk; if (chunks.size() > 0) { chunk = chunks[index]; @@ -1761,12 +1730,6 @@ public: int chunkSize = 0; while (cursor.more() || !values.empty()) { - status = _checkForCatalogManagerChange(grid.forwardingCatalogManager(), - initialConfigServerMode); - if (!status.isOK()) { - return appendCommandStatus(result, status); - } - BSONObj t; if (cursor.more()) { t = cursor.next().getOwned(); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 14678aefb94..665ff4ed032 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -103,5 +103,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/mongoscore', + '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', + '$BUILD_DIR/mongo/s/sharding_test_fixture', ] ) diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp index a9d77fbc1fc..248bad2d8e9 100644 --- a/src/mongo/db/s/migration_impl.cpp +++ b/src/mongo/db/s/migration_impl.cpp @@ -144,12 +144,11 @@ Status ChunkMoveOperationState::initialize(const BSONObj& cmdObj) { return Status::OK(); } -StatusWith<ForwardingCatalogManager::ScopedDistLock*> -ChunkMoveOperationState::acquireMoveMetadata() { +StatusWith<DistLockManager::ScopedDistLock*> ChunkMoveOperationState::acquireMoveMetadata() { // Get the distributed lock const string whyMessage(stream() << "migrating chunk [" << _minKey << ", " << _maxKey << ") in " << _nss.ns()); - _distLockStatus = grid.forwardingCatalogManager()->distLock(_txn, _nss.ns(), whyMessage); + _distLockStatus = grid.catalogManager(_txn)->distLock(_txn, _nss.ns(), whyMessage); if (!_distLockStatus->isOK()) { const string msg = stream() << "could not acquire collection lock for " << _nss.ns() diff --git a/src/mongo/db/s/migration_impl.h b/src/mongo/db/s/migration_impl.h index 51e6a8ba295..8f2061ed5ac 100644 --- a/src/mongo/db/s/migration_impl.h +++ b/src/mongo/db/s/migration_impl.h @@ -36,7 +36,7 @@ #include "mongo/client/connection_string.h" #include "mongo/db/namespace_string.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/catalog/forwarding_catalog_manager.h" +#include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/chunk_version.h" namespace mongo { @@ -77,7 +77,7 @@ public: * TODO: Once the entire chunk move process is moved to be inside this state machine, there * will not be any need to expose the distributed lock. */ - StatusWith<ForwardingCatalogManager::ScopedDistLock*> acquireMoveMetadata(); + StatusWith<DistLockManager::ScopedDistLock*> acquireMoveMetadata(); /** * Starts the move chunk operation. @@ -158,7 +158,7 @@ private: BSONObj _maxKey; // The distributed lock, which protects other migrations from happening on the same collection - boost::optional<StatusWith<ForwardingCatalogManager::ScopedDistLock>> _distLockStatus; + boost::optional<StatusWith<DistLockManager::ScopedDistLock>> _distLockStatus; // The cached collection metadata and the shard version from the time the migration process // started. This metadata is guaranteed to not change until either failure or successful diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 9fdc8114f45..0b9b72e9e33 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -239,13 +239,6 @@ public: timing.done(2); - Status distLockStatus = distLock->checkForPendingCatalogChange(); - if (!distLockStatus.isOK()) { - warning() << "Aborting migration due to need to swap current catalog manager" - << causedBy(distLockStatus); - return appendCommandStatus(result, distLockStatus); - } - MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep2); // 3. @@ -311,13 +304,6 @@ public: timing.done(3); - distLockStatus = distLock->checkForPendingCatalogChange(); - if (!distLockStatus.isOK()) { - warning() << "Aborting migration due to need to swap current catalog manager" - << causedBy(distLockStatus); - return appendCommandStatus(result, distLockStatus); - } - MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep3); // 4. @@ -414,24 +400,10 @@ public: } txn->checkForInterrupt(); - - distLockStatus = distLock->checkForPendingCatalogChange(); - if (!distLockStatus.isOK()) { - warning() << "Aborting migration due to need to swap current catalog manager" - << causedBy(distLockStatus); - return appendCommandStatus(result, distLockStatus); - } } timing.done(4); - distLockStatus = distLock->checkForPendingCatalogChange(); - if (!distLockStatus.isOK()) { - warning() << "Aborting migration due to need to swap current catalog manager" - << causedBy(distLockStatus); - return appendCommandStatus(result, distLockStatus); - } - MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep4); // 5. diff --git a/src/mongo/db/s/sharding_state_recovery.cpp b/src/mongo/db/s/sharding_state_recovery.cpp index 69c7dfe5596..50c4b3c0555 100644 --- a/src/mongo/db/s/sharding_state_recovery.cpp +++ b/src/mongo/db/s/sharding_state_recovery.cpp @@ -219,10 +219,6 @@ Status modifyRecoveryDocument(OperationContext* txn, } // namespace Status ShardingStateRecovery::startMetadataOp(OperationContext* txn) { - if (grid.catalogManager(txn)->getMode() != CatalogManager::ConfigServerMode::CSRS) { - return Status::OK(); - } - Status upsertStatus = modifyRecoveryDocument(txn, RecoveryDocument::Increment, kMajorityWriteConcern); @@ -237,10 +233,6 @@ Status ShardingStateRecovery::startMetadataOp(OperationContext* txn) { } void ShardingStateRecovery::endMetadataOp(OperationContext* txn) { - if (grid.catalogManager(txn)->getMode() != CatalogManager::ConfigServerMode::CSRS) { - return; - } - Status status = modifyRecoveryDocument(txn, RecoveryDocument::Decrement, WriteConcernOptions()); if (!status.isOK()) { warning() << "Failed to decrement minOpTimeUpdaters due to " << status; diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 8d69566e0ed..bfd6eead67e 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -22,7 +22,7 @@ env.Library( '$BUILD_DIR/mongo/executor/network_interface_factory', '$BUILD_DIR/mongo/executor/network_interface_thread_pool', '$BUILD_DIR/mongo/executor/thread_pool_task_executor', - '$BUILD_DIR/mongo/s/catalog/forwarding_catalog_manager', + '$BUILD_DIR/mongo/s/catalog/catalog_manager_impl', 'client/sharding_connection_hook', 'coreshard', ], @@ -75,7 +75,7 @@ env.Library( '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/rpc/metadata', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', - '$BUILD_DIR/mongo/s/catalog/forwarding_catalog_manager', + '$BUILD_DIR/mongo/s/catalog/catalog_manager_impl', '$BUILD_DIR/mongo/s/catalog/replset/catalog_manager_replica_set', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/mongoscore', @@ -166,7 +166,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/s/query/cluster_cursor_manager', '$BUILD_DIR/mongo/executor/task_executor_pool', - 'catalog/forwarding_catalog_manager', + 'catalog/catalog_manager_impl', 'catalog/catalog_types', 'client/sharding_client', 'cluster_ops_impl', diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index 939f682ef36..48d36bf3130 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -344,7 +344,7 @@ void warnOnMultiVersion(const ShardInfoMap& shardInfo) { } void Balancer::_doBalanceRound(OperationContext* txn, - ForwardingCatalogManager::ScopedDistLock* distLock, + DistLockManager::ScopedDistLock* distLock, vector<shared_ptr<MigrateInfo>>* candidateChunks) { invariant(candidateChunks); @@ -383,8 +383,6 @@ void Balancer::_doBalanceRound(OperationContext* txn, // For each collection, check if the balancing policy recommends moving anything around. for (const auto& coll : collections) { - uassertStatusOK(distLock->checkForPendingCatalogChange()); - // Skip collections for which balancing is disabled const NamespaceString& nss = coll.getNs(); @@ -603,8 +601,11 @@ void Balancer::run() { uassert(13258, "oids broken after resetting!", _checkOIDs(txn.get())); { - auto scopedDistLock = grid.forwardingCatalogManager()->distLock( - txn.get(), "balancer", "doing balance round"); + auto scopedDistLock = grid.catalogManager(txn.get()) + ->distLock(txn.get(), + "balancer", + "doing balance round", + DistLockManager::kSingleLockAttemptTimeout); if (!scopedDistLock.isOK()) { LOG(1) << "skipping balancing round" << causedBy(scopedDistLock.getStatus()); diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h index 6429091ef5c..f9c0eb069aa 100644 --- a/src/mongo/s/balance.h +++ b/src/mongo/s/balance.h @@ -30,7 +30,7 @@ #pragma once -#include "mongo/s/catalog/forwarding_catalog_manager.h" +#include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/util/background.h" namespace mongo { @@ -94,7 +94,7 @@ private: * possibly be moved */ void _doBalanceRound(OperationContext* txn, - ForwardingCatalogManager::ScopedDistLock* distLock, + DistLockManager::ScopedDistLock* distLock, std::vector<std::shared_ptr<MigrateInfo>>* candidateChunks); /** diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript index 59a62d17af7..674e548a317 100644 --- a/src/mongo/s/catalog/SConscript +++ b/src/mongo/s/catalog/SConscript @@ -4,7 +4,6 @@ Import("env") env.SConscript( dirs=[ - 'legacy', 'replset', ], ) @@ -65,13 +64,11 @@ env.Library( ) env.Library( - target='forwarding_catalog_manager', + target='catalog_manager_impl', source=[ - 'forwarding_catalog_manager.cpp' ], LIBDEPS=[ 'catalog_manager', - 'legacy/catalog_manager_legacy', 'replset/catalog_manager_replica_set', 'replset/dist_lock_catalog_impl', 'replset/replset_dist_lock_manager', diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h index eadfdc9c231..c58a2a89cc1 100644 --- a/src/mongo/s/catalog/catalog_manager.h +++ b/src/mongo/s/catalog/catalog_manager.h @@ -34,6 +34,7 @@ #include <vector> #include "mongo/base/disallow_copying.h" +#include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/client/shard.h" #include "mongo/s/optime_pair.h" @@ -51,7 +52,6 @@ struct ChunkVersion; class CollectionType; class ConnectionString; class DatabaseType; -class DistLockManager; class NamespaceString; class OperationContext; class SettingsType; @@ -82,7 +82,6 @@ enum ShardDrainingStatus { */ class CatalogManager { MONGO_DISALLOW_COPYING(CatalogManager); - friend class ForwardingCatalogManager; public: enum class ConfigServerMode { @@ -106,12 +105,6 @@ public: virtual void shutDown(OperationContext* txn, bool allowNetworking = true) = 0; /** - * Returns what type of catalog manager this is - CSRS for the CatalogManagerReplicaSet and - * SCCC for the CatalogManagerLegacy. - */ - virtual ConfigServerMode getMode() = 0; - - /** * Creates a new database or updates the sharding status for an existing one. Cannot be * used for the admin/config/local DBs, which should not be created or sharded manually * anyways. @@ -445,6 +438,13 @@ public: virtual Status appendInfoForConfigServerDatabases(OperationContext* txn, BSONArrayBuilder* builder) = 0; + + virtual StatusWith<DistLockManager::ScopedDistLock> distLock( + OperationContext* txn, + StringData name, + StringData whyMessage, + stdx::chrono::milliseconds waitFor = DistLockManager::kSingleLockAttemptTimeout) = 0; + protected: CatalogManager() = default; diff --git a/src/mongo/s/catalog/catalog_manager_common.cpp b/src/mongo/s/catalog/catalog_manager_common.cpp index d8943f626d3..cc4d8d8106b 100644 --- a/src/mongo/s/catalog/catalog_manager_common.cpp +++ b/src/mongo/s/catalog/catalog_manager_common.cpp @@ -606,4 +606,12 @@ Status CatalogManagerCommon::_log(OperationContext* txn, return result; } +StatusWith<DistLockManager::ScopedDistLock> CatalogManagerCommon::distLock( + OperationContext* txn, + StringData name, + StringData whyMessage, + stdx::chrono::milliseconds waitFor) { + return getDistLockManager()->lock(txn, name, whyMessage, waitFor); +} + } // namespace mongo diff --git a/src/mongo/s/catalog/catalog_manager_common.h b/src/mongo/s/catalog/catalog_manager_common.h index 95353f800f8..f6512c6f4df 100644 --- a/src/mongo/s/catalog/catalog_manager_common.h +++ b/src/mongo/s/catalog/catalog_manager_common.h @@ -79,6 +79,11 @@ public: const std::string& ns, const BSONObj& detail) final; + StatusWith<DistLockManager::ScopedDistLock> distLock(OperationContext* txn, + StringData name, + StringData whyMessage, + stdx::chrono::milliseconds waitFor) final; + protected: /** * Selects an optimal shard on which to place a newly created database from the set of diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp index 8ab6fdc3ee9..c4cce181b1e 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/catalog_manager_mock.cpp @@ -145,6 +145,14 @@ StatusWith<OpTimePair<std::vector<ShardType>>> CatalogManagerMock::getAllShards( return {ErrorCodes::InternalError, "Method not implemented"}; } +StatusWith<DistLockManager::ScopedDistLock> CatalogManagerMock::distLock( + OperationContext* txn, + StringData name, + StringData whyMessage, + stdx::chrono::milliseconds waitFor) { + return {ErrorCodes::InternalError, "Method not implemented"}; +} + bool CatalogManagerMock::runUserManagementWriteCommand(OperationContext* txn, const string& commandName, const string& dbname, diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h index 2ff1c71fab4..01b7431e7ad 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.h +++ b/src/mongo/s/catalog/catalog_manager_mock.h @@ -41,10 +41,6 @@ public: CatalogManagerMock(); ~CatalogManagerMock(); - ConfigServerMode getMode() override { - return ConfigServerMode::NONE; - } - Status startup(OperationContext* txn, bool allowNetworking) override; void shutDown(OperationContext* txn, bool allowNetworking) override; @@ -160,6 +156,12 @@ public: DistLockManager* getDistLockManager() override; + StatusWith<DistLockManager::ScopedDistLock> distLock( + OperationContext* txn, + StringData name, + StringData whyMessage, + stdx::chrono::milliseconds waitFor) override; + Status initConfigVersion(OperationContext* txn) override; Status appendInfoForConfigServerDatabases(OperationContext* txn, diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.cpp b/src/mongo/s/catalog/forwarding_catalog_manager.cpp deleted file mode 100644 index 63e75997d1e..00000000000 --- a/src/mongo/s/catalog/forwarding_catalog_manager.cpp +++ /dev/null @@ -1,617 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/s/catalog/forwarding_catalog_manager.h" - -#include <cstdint> -#include <vector> - -#include "mongo/client/connection_string.h" -#include "mongo/db/client.h" -#include "mongo/db/service_context.h" -#include "mongo/platform/random.h" -#include "mongo/s/catalog/legacy/catalog_manager_legacy.h" -#include "mongo/s/catalog/replset/catalog_manager_replica_set.h" -#include "mongo/s/catalog/replset/dist_lock_catalog_impl.h" -#include "mongo/s/catalog/replset/replset_dist_lock_manager.h" -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog/type_database.h" -#include "mongo/s/catalog/type_shard.h" -#include "mongo/s/catalog/type_settings.h" -#include "mongo/s/catalog/type_tags.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/write_ops/batched_command_response.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/clock_source.h" -#include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { - -using executor::TaskExecutor; - -namespace { -std::unique_ptr<CatalogManager> makeCatalogManager(ServiceContext* service, - const ConnectionString& configCS, - ShardRegistry* shardRegistry, - const HostAndPort& thisHost) { - std::unique_ptr<SecureRandom> rng(SecureRandom::create()); - std::string distLockProcessId = str::stream() - << thisHost.toString() << ':' - << durationCount<Seconds>(service->getClockSource()->now().toDurationSinceEpoch()) << ':' - << static_cast<int32_t>(rng->nextInt64()); - - switch (configCS.type()) { - case ConnectionString::SET: { - auto distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry); - auto distLockManager = stdx::make_unique<ReplSetDistLockManager>( - service, - distLockProcessId, - std::move(distLockCatalog), - ReplSetDistLockManager::kDistLockPingInterval, - ReplSetDistLockManager::kDistLockExpirationTime); - - return stdx::make_unique<CatalogManagerReplicaSet>(std::move(distLockManager)); - } - case ConnectionString::SYNC: - case ConnectionString::MASTER: - case ConnectionString::CUSTOM: { - auto catalogManagerLegacy = stdx::make_unique<CatalogManagerLegacy>(); - uassertStatusOK(catalogManagerLegacy->init(configCS, distLockProcessId)); - return std::move(catalogManagerLegacy); - } - default: - MONGO_UNREACHABLE; - } -} -} - -ForwardingCatalogManager::ForwardingCatalogManager(ServiceContext* service, - const ConnectionString& configCS, - ShardRegistry* shardRegistry, - const HostAndPort& thisHost) - : ForwardingCatalogManager(service, - makeCatalogManager(service, configCS, shardRegistry, thisHost), - shardRegistry, - thisHost) {} - -ForwardingCatalogManager::ForwardingCatalogManager(ServiceContext* service, - std::unique_ptr<CatalogManager> actual, - ShardRegistry* shardRegistry, - const HostAndPort& thisHost) - : _service(service), - _shardRegistry(shardRegistry), - _thisHost(thisHost), - _operationLock("CatalogOperationLock"), - _actual(std::move(actual)) {} - -ForwardingCatalogManager::~ForwardingCatalogManager() = default; - - -StatusWith<ForwardingCatalogManager::ScopedDistLock> ForwardingCatalogManager::distLock( - OperationContext* txn, - StringData name, - StringData whyMessage, - stdx::chrono::milliseconds waitFor) { - for (int i = 0; i < 2; ++i) { - try { - _operationLock.lock_shared(); - auto guard = MakeGuard([this] { _operationLock.unlock_shared(); }); - auto dlmLock = _actual->getDistLockManager()->lock(txn, name, whyMessage, waitFor); - if (dlmLock.isOK()) { - guard.Dismiss(); // Don't unlock _operationLock; hold it until the returned - // ScopedDistLock goes out of scope! - try { - return ScopedDistLock(txn, this, std::move(dlmLock.getValue())); - } catch (...) { - // Once the guard that unlocks _operationLock is dismissed, any exception before - // this method returns is fatal. - std::terminate(); - } - } else if (dlmLock.getStatus() != ErrorCodes::IncompatibleCatalogManager) { - return dlmLock.getStatus(); - } - } catch (const DBException& ex) { - if (ex.getCode() != ErrorCodes::IncompatibleCatalogManager) { - throw; - } - } - - waitForCatalogManagerChange(txn); - } - MONGO_UNREACHABLE; -} - -namespace { -const auto scopedDistLockHeld = OperationContext::declareDecoration<bool>(); -} - -CatalogManager* ForwardingCatalogManager::getCatalogManagerToUse(OperationContext* txn) { - if (scopedDistLockHeld(txn)) { - return _actual.get(); - } else { - return this; - } -} - -ForwardingCatalogManager::ScopedDistLock::ScopedDistLock(OperationContext* txn, - ForwardingCatalogManager* fcm, - DistLockManager::ScopedDistLock theLock) - : _txn(txn), _fcm(fcm), _lock(std::move(theLock)) { - scopedDistLockHeld(txn) = true; -} - -ForwardingCatalogManager::ScopedDistLock::ScopedDistLock(ScopedDistLock&& other) - : _txn(other._txn), _fcm(other._fcm), _lock(std::move(other._lock)) { - other._txn = nullptr; - other._fcm = nullptr; -} - -ForwardingCatalogManager::ScopedDistLock::~ScopedDistLock() { - if (_fcm) { // This ScopedDistLock was not moved from - auto guard = MakeGuard([this] { _fcm->_operationLock.unlock_shared(); }); - DistLockManager::ScopedDistLock dlmLock = std::move(_lock); - scopedDistLockHeld(_txn) = false; - } -} - -ForwardingCatalogManager::ScopedDistLock& ForwardingCatalogManager::ScopedDistLock::operator=( - ScopedDistLock&& other) { -#if defined(_MSC_VER) && _MSC_VER < 1900 // MSVC 2013 STL can emit self-move-assign. - if (&other == this) - return *this; -#endif - - invariant(!_fcm); - _fcm = other._fcm; - other._fcm = nullptr; - _lock = std::move(other._lock); - return *this; -} - -Status ForwardingCatalogManager::ScopedDistLock::checkForPendingCatalogChange() { - return _fcm->checkForPendingCatalogChange(); -} - -Status ForwardingCatalogManager::ScopedDistLock::checkStatus() { - Status status = checkForPendingCatalogChange(); - if (!status.isOK()) { - return status; - } - - return _lock.checkStatus(); -} - -Status ForwardingCatalogManager::scheduleReplaceCatalogManagerIfNeeded( - ConfigServerMode desiredMode, StringData replSetName, const HostAndPort& knownServer) { - stdx::lock_guard<stdx::mutex> lk(_observerMutex); - const auto currentMode = _actual->getMode(); - if (currentMode == desiredMode) { - return Status::OK(); - } - if (desiredMode == ConfigServerMode::SCCC) { - // TODO(spencer): Support downgrade. - return {ErrorCodes::IllegalOperation, - "Config server reports that it legacy SCCC mode, but we are already using " - "the replica set config server protocol for config server " - "communication. Downgrade needed but not yet supported"}; - } - invariant(desiredMode == ConfigServerMode::CSRS); - if (_nextConfigChangeComplete.isValid()) { - if (_nextConfigConnectionString.getSetName() != replSetName) { - severe() << "Conflicting new config server replica set names: " - << _nextConfigConnectionString.getSetName() << " vs " << replSetName; - fassertFailed(28788); - } - } else { - _nextConfigConnectionString = ConnectionString::forReplicaSet(replSetName, {knownServer}); - _nextConfigChangeComplete = - fassertStatusOK(28789, _shardRegistry->getExecutor()->makeEvent()); - fassertStatusOK( - 28787, - _shardRegistry->getExecutor()->scheduleWork( - [this](const TaskExecutor::CallbackArgs& args) { _replaceCatalogManager(args); })); - } - return {ErrorCodes::IncompatibleCatalogManager, - "Need to swap sharding catalog manager. Config server " - "reports that it is in replica set mode, but we are still using the " - "legacy SCCC protocol for config server communication"}; -} - -void ForwardingCatalogManager::waitForCatalogManagerChange(OperationContext* txn) { - fassert(28802, !scopedDistLockHeld(txn)); - - stdx::unique_lock<stdx::mutex> oblk(_observerMutex); - invariant(_nextConfigChangeComplete.isValid()); - auto configChangeComplete = _nextConfigChangeComplete; - oblk.unlock(); - _shardRegistry->getExecutor()->waitForEvent(configChangeComplete); -} - -Status ForwardingCatalogManager::checkForPendingCatalogChange() { - stdx::lock_guard<stdx::mutex> lk(_observerMutex); - if (!_nextConfigChangeComplete.isValid() || _configChangeComplete) { - return Status::OK(); - } - return Status(ErrorCodes::IncompatibleCatalogManager, - "Need to swap sharding catalog manager. Config server " - "reports that it is in replica set mode, but we are still using the " - "legacy SCCC protocol for config server communication"); -} - -namespace { - -template <typename T> -struct CheckForIncompatibleCatalogManager { - T operator()(T&& v) { - return std::forward<T>(v); - } -}; - -template <typename T> -struct CheckForIncompatibleCatalogManager<StatusWith<T>> { - StatusWith<T> operator()(StatusWith<T>&& v) { - if (!v.isOK() && v.getStatus().code() == ErrorCodes::IncompatibleCatalogManager) { - uassertStatusOK(v); - } - return std::forward<StatusWith<T>>(v); - } -}; - -template <> -struct CheckForIncompatibleCatalogManager<Status> { - Status operator()(Status&& v) { - if (v.code() == ErrorCodes::IncompatibleCatalogManager) { - uassertStatusOK(v); - } - return std::move(v); - } -}; - -template <typename T> -T checkForIncompatibleCatalogManager(T&& v) { - return CheckForIncompatibleCatalogManager<T>()(std::forward<T>(v)); -} - -} // namespace - -template <typename Callable> -auto ForwardingCatalogManager::retry(OperationContext* txn, Callable&& c) - -> decltype(std::forward<Callable>(c)()) { - for (int i = 0; i < 2; ++i) { - try { - rwlock_shared oplk(_operationLock); - return checkForIncompatibleCatalogManager(std::forward<Callable>(c)()); - } catch (const DBException& ex) { - if (ex.getCode() != ErrorCodes::IncompatibleCatalogManager) { - throw; - } - } - - waitForCatalogManagerChange(txn); - } - MONGO_UNREACHABLE; -} - -void ForwardingCatalogManager::_unlockOldDistLocks(std::string processID) { - Client::initThreadIfNotAlready("DistributedLockUnlocker"); - auto txn = cc().makeOperationContext(); - log() << "Unlocking existing distributed locks after catalog manager swap"; - rwlock_shared oplk(_operationLock); - _actual->getDistLockManager()->unlockAll(txn.get(), processID); -} - -void ForwardingCatalogManager::_replaceCatalogManager(const TaskExecutor::CallbackArgs& args) { - if (!args.status.isOK()) { - return; - } - Client::initThreadIfNotAlready("CatalogManagerReplacer"); - auto txn = cc().makeOperationContext(); - log() << "Swapping sharding Catalog Manager from mirrored (SCCC) to replica set (CSRS) mode"; - - stdx::lock_guard<RWLock> oplk(_operationLock); - std::string distLockProcessID = _actual->getDistLockManager()->getProcessID(); - // Shut down the old catalog manager before taking _observerMutex to prevent deadlock with - // the LegacyDistLockPinger thread calling scheduleReplaceCatalogManagerIfNeeded. - _actual->shutDown(txn.get(), /* allowNetworking */ false); - - stdx::lock_guard<stdx::mutex> oblk(_observerMutex); - _actual = makeCatalogManager(_service, _nextConfigConnectionString, _shardRegistry, _thisHost); - _shardRegistry->updateConfigServerConnectionString(_nextConfigConnectionString); - // Note: this assumes that downgrade is not supported, as this will not start the config - // server consistency checker for the legacy catalog manager. - fassert(28790, _actual->startup(txn.get(), false /* allowNetworking */)); - args.executor->signalEvent(_nextConfigChangeComplete); - _configChangeComplete = true; - - log() << "Swapping sharding Catalog Manager to replica set (CSRS) mode completed successfully"; - - // Must be done in a new thread to avoid deadlock resulting from running network operations - // within a TaskExecutor callback. - stdx::thread distLockUnlockThread( - [this, distLockProcessID] { _unlockOldDistLocks(distLockProcessID); }); - distLockUnlockThread.detach(); -} - -CatalogManager::ConfigServerMode ForwardingCatalogManager::getMode() { - stdx::lock_guard<stdx::mutex> lk(_observerMutex); - return _actual->getMode(); -} - -Status ForwardingCatalogManager::startup(OperationContext* txn, bool allowNetworking) { - return retry(txn, - [this, txn, allowNetworking] { return _actual->startup(txn, allowNetworking); }); -} - -void ForwardingCatalogManager::shutDown(OperationContext* txn, bool allowNetworking) { - retry(txn, - [this, txn, allowNetworking] { - _actual->shutDown(txn, allowNetworking); - return 1; - }); -} - -Status ForwardingCatalogManager::enableSharding(OperationContext* txn, const std::string& dbName) { - return retry(txn, [&] { return _actual->enableSharding(txn, dbName); }); -} - -Status ForwardingCatalogManager::shardCollection(OperationContext* txn, - const std::string& ns, - const ShardKeyPattern& fieldsAndOrder, - bool unique, - const std::vector<BSONObj>& initPoints, - const std::set<ShardId>& initShardsIds) { - return retry(txn, - [&] { - return _actual->shardCollection( - txn, ns, fieldsAndOrder, unique, initPoints, initShardsIds); - }); -} - -StatusWith<std::string> ForwardingCatalogManager::addShard( - OperationContext* txn, - const std::string* shardProposedName, - const ConnectionString& shardConnectionString, - const long long maxSize) { - return retry( - txn, - [&] { return _actual->addShard(txn, shardProposedName, shardConnectionString, maxSize); }); -} - -StatusWith<ShardDrainingStatus> ForwardingCatalogManager::removeShard(OperationContext* txn, - const std::string& name) { - return retry(txn, [&] { return _actual->removeShard(txn, name); }); -} - -Status ForwardingCatalogManager::updateDatabase(OperationContext* txn, - const std::string& dbName, - const DatabaseType& db) { - return retry(txn, [&] { return _actual->updateDatabase(txn, dbName, db); }); -} - -StatusWith<OpTimePair<DatabaseType>> ForwardingCatalogManager::getDatabase( - OperationContext* txn, const std::string& dbName) { - return retry(txn, [&] { return _actual->getDatabase(txn, dbName); }); -} - -Status ForwardingCatalogManager::updateCollection(OperationContext* txn, - const std::string& collNs, - const CollectionType& coll) { - return retry(txn, [&] { return _actual->updateCollection(txn, collNs, coll); }); -} - -StatusWith<OpTimePair<CollectionType>> ForwardingCatalogManager::getCollection( - OperationContext* txn, const std::string& collNs) { - return retry(txn, [&] { return _actual->getCollection(txn, collNs); }); -} - -Status ForwardingCatalogManager::getCollections(OperationContext* txn, - const std::string* dbName, - std::vector<CollectionType>* collections, - repl::OpTime* opTime) { - invariant(collections->empty()); - return retry(txn, - [&] { - collections->clear(); - return _actual->getCollections(txn, dbName, collections, opTime); - }); -} - -Status ForwardingCatalogManager::dropCollection(OperationContext* txn, const NamespaceString& ns) { - return retry(txn, [&] { return _actual->dropCollection(txn, ns); }); -} - -Status ForwardingCatalogManager::getDatabasesForShard(OperationContext* txn, - const std::string& shardName, - std::vector<std::string>* dbs) { - invariant(dbs->empty()); - return retry(txn, - [&] { - dbs->clear(); - return _actual->getDatabasesForShard(txn, shardName, dbs); - }); -} - -Status ForwardingCatalogManager::getChunks(OperationContext* txn, - const BSONObj& query, - const BSONObj& sort, - boost::optional<int> limit, - std::vector<ChunkType>* chunks, - repl::OpTime* opTime) { - invariant(chunks->empty()); - return retry(txn, - [&] { - chunks->clear(); - return _actual->getChunks(txn, query, sort, limit, chunks, opTime); - }); -} - -Status ForwardingCatalogManager::getTagsForCollection(OperationContext* txn, - const std::string& collectionNs, - std::vector<TagsType>* tags) { - invariant(tags->empty()); - return retry(txn, - [&] { - tags->clear(); - return _actual->getTagsForCollection(txn, collectionNs, tags); - }); -} - -StatusWith<std::string> ForwardingCatalogManager::getTagForChunk(OperationContext* txn, - const std::string& collectionNs, - const ChunkType& chunk) { - return retry(txn, [&] { return _actual->getTagForChunk(txn, collectionNs, chunk); }); -} - -StatusWith<OpTimePair<std::vector<ShardType>>> ForwardingCatalogManager::getAllShards( - OperationContext* txn) { - return retry(txn, [&] { return _actual->getAllShards(txn); }); -} - -bool ForwardingCatalogManager::runUserManagementWriteCommand(OperationContext* txn, - const std::string& commandName, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - return retry(txn, - [&] { - BSONObjBuilder builder; - const bool success = _actual->runUserManagementWriteCommand( - txn, commandName, dbname, cmdObj, &builder); - result->appendElements(builder.done()); - return success; - }); -} - -bool ForwardingCatalogManager::runUserManagementReadCommand(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - return retry(txn, - [&] { - BSONObjBuilder builder; - const bool success = - _actual->runUserManagementReadCommand(txn, dbname, cmdObj, &builder); - result->appendElements(builder.done()); - return success; - }); -} - -Status ForwardingCatalogManager::applyChunkOpsDeprecated(OperationContext* txn, - const BSONArray& updateOps, - const BSONArray& preCondition, - const std::string& nss, - const ChunkVersion& lastChunkVersion) { - return retry(txn, - [&] { - return _actual->applyChunkOpsDeprecated( - txn, updateOps, preCondition, nss, lastChunkVersion); - }); -} - -Status ForwardingCatalogManager::logAction(OperationContext* txn, - const std::string& what, - const std::string& ns, - const BSONObj& detail) { - return retry(txn, [&] { return _actual->logAction(txn, what, ns, detail); }); -} - -Status ForwardingCatalogManager::logChange(OperationContext* txn, - const std::string& what, - const std::string& ns, - const BSONObj& detail) { - return retry(txn, [&] { return _actual->logChange(txn, what, ns, detail); }); -} - -StatusWith<SettingsType> ForwardingCatalogManager::getGlobalSettings(OperationContext* txn, - const std::string& key) { - return retry(txn, [&] { return _actual->getGlobalSettings(txn, key); }); -} - -void ForwardingCatalogManager::writeConfigServerDirect(OperationContext* txn, - const BatchedCommandRequest& request, - BatchedCommandResponse* response) { - retry(txn, - [&] { - BatchedCommandResponse theResponse; - _actual->writeConfigServerDirect(txn, request, &theResponse); - theResponse.cloneTo(response); - return 1; - }); -} - -Status ForwardingCatalogManager::insertConfigDocument(OperationContext* txn, - const std::string& ns, - const BSONObj& doc) { - return retry(txn, [&] { return _actual->insertConfigDocument(txn, ns, doc); }); -} - -StatusWith<bool> ForwardingCatalogManager::updateConfigDocument(OperationContext* txn, - const std::string& ns, - const BSONObj& query, - const BSONObj& update, - bool upsert) { - return retry(txn, - [&] { return _actual->updateConfigDocument(txn, ns, query, update, upsert); }); -} - -Status ForwardingCatalogManager::removeConfigDocuments(OperationContext* txn, - const std::string& ns, - const BSONObj& query) { - return retry(txn, [&] { return _actual->removeConfigDocuments(txn, ns, query); }); -} - -Status ForwardingCatalogManager::createDatabase(OperationContext* txn, const std::string& dbName) { - return retry(txn, [&] { return _actual->createDatabase(txn, dbName); }); -} - -DistLockManager* ForwardingCatalogManager::getDistLockManager() { - warning() << "getDistLockManager called on ForwardingCatalogManager which should never happen " - "outside of unit tests!"; - rwlock_shared oplk(_operationLock); - return _actual->getDistLockManager(); -} - -Status ForwardingCatalogManager::initConfigVersion(OperationContext* txn) { - return retry(txn, [&] { return _actual->initConfigVersion(txn); }); -} - -Status ForwardingCatalogManager::appendInfoForConfigServerDatabases(OperationContext* txn, - BSONArrayBuilder* builder) { - return retry(txn, [&] { return _actual->appendInfoForConfigServerDatabases(txn, builder); }); -} - -} // namespace mongo diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.h b/src/mongo/s/catalog/forwarding_catalog_manager.h deleted file mode 100644 index 15227b16c29..00000000000 --- a/src/mongo/s/catalog/forwarding_catalog_manager.h +++ /dev/null @@ -1,298 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/client/connection_string.h" -#include "mongo/db/server_options.h" -#include "mongo/executor/task_executor.h" -#include "mongo/s/catalog/catalog_manager.h" -#include "mongo/s/catalog/dist_lock_manager.h" -#include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/rwlock.h" -#include "mongo/util/net/hostandport.h" - -namespace mongo { - -class NamespaceString; -class ServiceContext; -class ShardRegistry; -class VersionType; -struct ReadPreferenceSetting; - -/** - * The ForwardingCatalogManager is an indirection layer that allows for dynamic switching of - * catalog manager implementations at runtime, to facilitate upgrade. - * Inheriting privately from CatalogManager is intentional. All callers of CatalogManager methods - * on a ForwardingCatalogManager will get access to the FCM pointer by calling - * FCM::getCatalogManagerToUse, which can return a CatalogManager* because it is a member of FCM - * and thus knows that FCM inherits from CatalogManager. This makes it obvious if we try to call - * CatalogManager methods directly on a ForwardingCatalogManager pointer. - */ -class ForwardingCatalogManager final : private CatalogManager { -public: - class ScopedDistLock; - - ForwardingCatalogManager(ServiceContext* service, - const ConnectionString& configCS, - ShardRegistry* shardRegistry, - const HostAndPort& thisHost); - - /** - * Constructor for use in tests. - */ - ForwardingCatalogManager(ServiceContext* service, - std::unique_ptr<CatalogManager> actual, - ShardRegistry* shardRegistry, - const HostAndPort& thisHost); - - virtual ~ForwardingCatalogManager(); - - // Only public because of unit tests - DistLockManager* getDistLockManager() override; - - ConfigServerMode getMode() override; - - /** - * If desiredMode doesn't equal _actual->getMode(), schedules work to swap the actual catalog - * manager to one of the type specified by desiredMode. - * Currently only supports going to CSRS mode from SCCC mode. - */ - Status scheduleReplaceCatalogManagerIfNeeded(ConfigServerMode desiredMode, - StringData replSetName, - const HostAndPort& knownServer); - - /** - * Blocking method, which will waits for a previously scheduled catalog manager change to - * complete. It is illegal to call unless scheduleReplaceCatalogManagerIfNeeded has been called. - */ - void waitForCatalogManagerChange(OperationContext* txn); - - /** - * Checks to see if we are currently waiting to swap the catalog manager. - */ - Status checkForPendingCatalogChange(); - - /** - * Returns a ScopedDistLock which is the RAII type for holding a distributed lock. - * ScopedDistLock prevents the underlying CatalogManager from being swapped as long as it is - * in scope. - */ - StatusWith<ScopedDistLock> distLock( - OperationContext* txn, - StringData name, - StringData whyMessage, - stdx::chrono::milliseconds waitFor = DistLockManager::kSingleLockAttemptTimeout); - - /** - * Returns a pointer to the CatalogManager that should be used for general CatalogManager - * operation. Most of the time it will return 'this' - the ForwardingCatalogManager. If there - * is a distributed lock held as part of this operation, however, it will return the underlying - * CatalogManager to prevent deadlock from occurring by trying to swap the catalog manager while - * a distlock is held. - */ - CatalogManager* getCatalogManagerToUse(OperationContext* txn); - - Status appendInfoForConfigServerDatabases(OperationContext* txn, - BSONArrayBuilder* builder) override; - -private: - Status startup(OperationContext* txn, bool allowNetworking) override; - - void shutDown(OperationContext* txn, bool allowNetworking = true) override; - - Status enableSharding(OperationContext* txn, const std::string& dbName) override; - - Status shardCollection(OperationContext* txn, - const std::string& ns, - const ShardKeyPattern& fieldsAndOrder, - bool unique, - const std::vector<BSONObj>& initPoints, - const std::set<ShardId>& initShardsIds) override; - - StatusWith<std::string> addShard(OperationContext* txn, - const std::string* shardProposedName, - const ConnectionString& shardConnectionString, - const long long maxSize) override; - - StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, - const std::string& name) override; - - Status updateDatabase(OperationContext* txn, - const std::string& dbName, - const DatabaseType& db) override; - - StatusWith<OpTimePair<DatabaseType>> getDatabase(OperationContext* txn, - const std::string& dbName) override; - - Status updateCollection(OperationContext* txn, - const std::string& collNs, - const CollectionType& coll) override; - - StatusWith<OpTimePair<CollectionType>> getCollection(OperationContext* txn, - const std::string& collNs) override; - - Status getCollections(OperationContext* txn, - const std::string* dbName, - std::vector<CollectionType>* collections, - repl::OpTime* opTime) override; - - Status dropCollection(OperationContext* txn, const NamespaceString& ns) override; - - Status getDatabasesForShard(OperationContext* txn, - const std::string& shardName, - std::vector<std::string>* dbs) override; - - Status getChunks(OperationContext* txn, - const BSONObj& query, - const BSONObj& sort, - boost::optional<int> limit, - std::vector<ChunkType>* chunks, - repl::OpTime* opTime) override; - - Status getTagsForCollection(OperationContext* txn, - const std::string& collectionNs, - std::vector<TagsType>* tags) override; - - StatusWith<std::string> getTagForChunk(OperationContext* txn, - const std::string& collectionNs, - const ChunkType& chunk) override; - - StatusWith<OpTimePair<std::vector<ShardType>>> getAllShards(OperationContext* txn) override; - - bool runUserManagementWriteCommand(OperationContext* txn, - const std::string& commandName, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) override; - - bool runUserManagementReadCommand(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) override; - - Status applyChunkOpsDeprecated(OperationContext* txn, - const BSONArray& updateOps, - const BSONArray& preCondition, - const std::string& nss, - const ChunkVersion& lastChunkVersion) override; - - Status logAction(OperationContext* txn, - const std::string& what, - const std::string& ns, - const BSONObj& detail) override; - - Status logChange(OperationContext* txn, - const std::string& what, - const std::string& ns, - const BSONObj& detail) override; - - StatusWith<SettingsType> getGlobalSettings(OperationContext* txn, - const std::string& key) override; - - void writeConfigServerDirect(OperationContext* txn, - const BatchedCommandRequest& request, - BatchedCommandResponse* response) override; - - Status insertConfigDocument(OperationContext* txn, - const std::string& ns, - const BSONObj& doc) override; - - StatusWith<bool> updateConfigDocument(OperationContext* txn, - const std::string& ns, - const BSONObj& query, - const BSONObj& update, - bool upsert) override; - - Status removeConfigDocuments(OperationContext* txn, - const std::string& ns, - const BSONObj& query) override; - - Status createDatabase(OperationContext* txn, const std::string& dbName) override; - - Status initConfigVersion(OperationContext* txn) override; - - template <typename Callable> - auto retry(OperationContext* txn, Callable&& c) -> decltype(std::forward<Callable>(c)()); - - void _replaceCatalogManager(const executor::TaskExecutor::CallbackArgs& args); - - void _unlockOldDistLocks(std::string processID); - - ServiceContext* _service; - ShardRegistry* _shardRegistry; - HostAndPort _thisHost; - - RWLock _operationLock; - stdx::mutex _observerMutex; // If going to hold both _operationLock and _observerMutex, get - // _operationLock first. - - // The actual catalog manager implementation. - // - // Must hold _operationLock or _observerMutex in any mode to read. Must hold both in exclusive - // mode to write. - std::unique_ptr<CatalogManager> _actual; - - ConnectionString _nextConfigConnectionString; // Guarded by _observerMutex. - executor::TaskExecutor::EventHandle _nextConfigChangeComplete; // Guarded by _observerMutex. - bool _configChangeComplete{false}; -}; - -class ForwardingCatalogManager::ScopedDistLock { - MONGO_DISALLOW_COPYING(ScopedDistLock); - -public: - ScopedDistLock(OperationContext* txn, - ForwardingCatalogManager* fcm, - DistLockManager::ScopedDistLock theLock); - ScopedDistLock(ScopedDistLock&& other); - ~ScopedDistLock(); - - ScopedDistLock& operator=(ScopedDistLock&& other); - - /** - * Checks to see if we are currently waiting to swap the catalog manager. If so, holding on to - * this ScopedDistLock will block the swap from happening, so it is important that if this - * returns a non-OK status the caller must release the lock (most likely by failing the current - * operation). - */ - Status checkForPendingCatalogChange(); - - /** - * Queries the config server to make sure the lock is still present, as well as checking - * if we need to swap the catalog manager - */ - Status checkStatus(); - -private: - OperationContext* _txn; - ForwardingCatalogManager* _fcm; - DistLockManager::ScopedDistLock _lock; -}; - -} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/SConscript b/src/mongo/s/catalog/legacy/SConscript deleted file mode 100644 index dfca5d3b84f..00000000000 --- a/src/mongo/s/catalog/legacy/SConscript +++ /dev/null @@ -1,26 +0,0 @@ -# -*- mode: python -*- - -Import("env") - -# TODO: config upgrade tests are currently in dbtests -env.Library( - target='catalog_manager_legacy', - source=[ - 'catalog_manager_legacy.cpp', - 'cluster_client_internal.cpp', - 'config_coordinator.cpp', - 'config_upgrade.cpp', - 'distlock.cpp', - 'legacy_dist_lock_manager.cpp', - 'legacy_dist_lock_pinger.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/executor/network_interface', - '$BUILD_DIR/mongo/s/catalog/catalog_manager', - '$BUILD_DIR/mongo/s/catalog/dist_lock_manager', - ], - LIBDEPS_TAGS=[ - # Depends on inShutdown - 'incomplete', - ], -) diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp deleted file mode 100644 index 0b9b8c034d5..00000000000 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ /dev/null @@ -1,1425 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/s/catalog/legacy/catalog_manager_legacy.h" - -#include <pcrecpp.h> - -#include "mongo/base/status.h" -#include "mongo/base/status_with.h" -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/bson/util/bson_extract.h" -#include "mongo/client/connpool.h" -#include "mongo/db/audit.h" -#include "mongo/db/client.h" -#include "mongo/db/commands.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/repl/optime.h" -#include "mongo/db/server_options.h" -#include "mongo/executor/network_interface.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/catalog/config_server_version.h" -#include "mongo/s/catalog/legacy/cluster_client_internal.h" -#include "mongo/s/catalog/legacy/config_coordinator.h" -#include "mongo/s/catalog/legacy/config_upgrade.h" -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog/type_config_version.h" -#include "mongo/s/catalog/type_database.h" -#include "mongo/s/catalog/type_settings.h" -#include "mongo/s/catalog/type_shard.h" -#include "mongo/s/catalog/type_tags.h" -#include "mongo/s/chunk_manager.h" -#include "mongo/s/client/dbclient_multi_command.h" -#include "mongo/s/client/shard.h" -#include "mongo/s/client/shard_connection.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/config.h" -#include "mongo/s/catalog/dist_lock_manager.h" -#include "mongo/s/catalog/legacy/legacy_dist_lock_manager.h" -#include "mongo/s/catalog/type_config_version.h" -#include "mongo/s/grid.h" -#include "mongo/s/set_shard_version_request.h" -#include "mongo/s/shard_key_pattern.h" -#include "mongo/s/write_ops/batched_command_request.h" -#include "mongo/s/write_ops/batched_command_response.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/mongoutils/str.h" -#include "mongo/util/net/hostandport.h" -#include "mongo/util/log.h" -#include "mongo/util/stringutils.h" -#include "mongo/util/time_support.h" - -namespace mongo { - -MONGO_FP_DECLARE(setSCCCDropCollDistLockWait); - -using std::set; -using std::shared_ptr; -using std::string; -using std::unique_ptr; -using std::vector; -using str::stream; - -namespace { - -bool validConfigWC(const BSONObj& writeConcern) { - BSONElement elem(writeConcern["w"]); - if (elem.eoo()) { - return true; - } - - if (elem.isNumber() && elem.numberInt() <= 1) { - return true; - } - - if (elem.type() == String && elem.str() == "majority") { - return true; - } - - return false; -} - -void toBatchError(const Status& status, BatchedCommandResponse* response) { - response->clear(); - response->setErrCode(status.code()); - response->setErrMessage(status.reason()); - response->setOk(false); - - dassert(response->isValid(NULL)); -} - -} // namespace - - -CatalogManagerLegacy::CatalogManagerLegacy() = default; - -CatalogManagerLegacy::~CatalogManagerLegacy() = default; - -Status CatalogManagerLegacy::init(const ConnectionString& configDBCS, - const std::string& distLockProcessId) { - // Initialization should not happen more than once - invariant(!_configServerConnectionString.isValid()); - invariant(_configServers.empty()); - invariant(configDBCS.isValid()); - - // Extract the hosts in HOST:PORT format - set<HostAndPort> configHostsAndPortsSet; - set<string> configHostsOnly; - std::vector<HostAndPort> configHostAndPorts = configDBCS.getServers(); - for (size_t i = 0; i < configHostAndPorts.size(); i++) { - // Append the default port, if not specified - HostAndPort configHost = configHostAndPorts[i]; - if (!configHost.hasPort()) { - configHost = HostAndPort(configHost.host(), ServerGlobalParams::ConfigServerPort); - } - - // Make sure there are no duplicates - if (!configHostsAndPortsSet.insert(configHost).second) { - StringBuilder sb; - sb << "Host " << configHost.toString() - << " exists twice in the config servers listing."; - - return Status(ErrorCodes::InvalidOptions, sb.str()); - } - - configHostsOnly.insert(configHost.host()); - } - - // Make sure the hosts are reachable - for (set<string>::const_iterator i = configHostsOnly.begin(); i != configHostsOnly.end(); i++) { - const string host = *i; - - // If this is a CUSTOM connection string (for testing) don't do DNS resolution - if (uassertStatusOK(ConnectionString::parse(host)).type() == ConnectionString::CUSTOM) { - continue; - } - - bool ok = false; - - for (int x = 10; x > 0; x--) { - if (!hostbyname(host.c_str()).empty()) { - ok = true; - break; - } - - log() << "can't resolve DNS for [" << host << "] sleeping and trying " << x - << " more times"; - sleepsecs(10); - } - - if (!ok) { - return Status(ErrorCodes::HostNotFound, - stream() << "unable to resolve DNS for host " << host); - } - } - - LOG(1) << " config string : " << configDBCS.toString(); - - // Now that the config hosts are verified, initialize the catalog manager. The code below - // should never fail. - - _configServerConnectionString = configDBCS; - - if (_configServerConnectionString.type() == ConnectionString::MASTER) { - _configServers.push_back(_configServerConnectionString); - } else if (_configServerConnectionString.type() == ConnectionString::SYNC) { - const vector<HostAndPort> configHPs = _configServerConnectionString.getServers(); - for (vector<HostAndPort>::const_iterator it = configHPs.begin(); it != configHPs.end(); - ++it) { - _configServers.push_back(ConnectionString(*it)); - } - } else { - // This is only for tests. - invariant(_configServerConnectionString.type() == ConnectionString::CUSTOM); - _configServers.push_back(_configServerConnectionString); - } - - _distLockManager = - stdx::make_unique<LegacyDistLockManager>(_configServerConnectionString, distLockProcessId); - - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _inShutdown = false; - _consistentFromLastCheck = true; - } - - return Status::OK(); -} - -Status CatalogManagerLegacy::startup(OperationContext* txn, bool allowNetworking) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_started) { - return Status::OK(); - } - - if (allowNetworking) { - Status status = _startConfigServerChecker(); - if (!status.isOK()) { - return status; - } - } - - _distLockManager->startUp(); - - _started = true; - return Status::OK(); -} - -Status CatalogManagerLegacy::initConfigVersion(OperationContext* txn) { - return checkAndInitConfigVersion(txn, this, getDistLockManager()); -} - -Status CatalogManagerLegacy::_startConfigServerChecker() { - const auto status = _checkConfigServersConsistent(); - if (!status.isOK()) { - return status; - } - - stdx::thread t(stdx::bind(&CatalogManagerLegacy::_consistencyChecker, this)); - _consistencyCheckerThread.swap(t); - - return Status::OK(); -} - -void CatalogManagerLegacy::shutDown(OperationContext* txn, bool allowNetworking) { - LOG(1) << "CatalogManagerLegacy::shutDown() called."; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _inShutdown = true; - _consistencyCheckerCV.notify_one(); - } - - // Only try to join the thread if we actually started it. - if (_consistencyCheckerThread.joinable()) - _consistencyCheckerThread.join(); - - invariant(_distLockManager); - _distLockManager->shutDown(txn, allowNetworking); -} - -Status CatalogManagerLegacy::shardCollection(OperationContext* txn, - const string& ns, - const ShardKeyPattern& fieldsAndOrder, - bool unique, - const vector<BSONObj>& initPoints, - const set<ShardId>& initShardIds) { - // Lock the collection globally so that no other mongos can try to shard or drop the collection - // at the same time. - auto scopedDistLock = getDistLockManager()->lock(txn, ns, "shardCollection"); - if (!scopedDistLock.isOK()) { - return scopedDistLock.getStatus(); - } - - auto status = getDatabase(txn, nsToDatabase(ns)); - if (!status.isOK()) { - return status.getStatus(); - } - - ShardId dbPrimaryShardId = status.getValue().value.getPrimary(); - const auto primaryShard = grid.shardRegistry()->getShard(txn, dbPrimaryShardId); - - // This is an extra safety check that the collection is not getting sharded concurrently by - // two different mongos instances. It is not 100%-proof, but it reduces the chance that two - // invocations of shard collection will step on each other's toes. - { - ScopedDbConnection conn(_configServerConnectionString, 30); - unsigned long long existingChunks = - conn->count(ChunkType::ConfigNS, BSON(ChunkType::ns(ns))); - if (existingChunks > 0) { - conn.done(); - return Status(ErrorCodes::AlreadyInitialized, - str::stream() << "collection " << ns << " already sharded with " - << existingChunks << " chunks."); - } - - conn.done(); - } - - log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder; - - // Record start in changelog - { - BSONObjBuilder collectionDetail; - collectionDetail.append("shardKey", fieldsAndOrder.toBSON()); - collectionDetail.append("collection", ns); - collectionDetail.append("primary", primaryShard->toString()); - - { - BSONArrayBuilder initialShards(collectionDetail.subarrayStart("initShards")); - for (const ShardId& shardId : initShardIds) { - initialShards.append(shardId); - } - } - - collectionDetail.append("numChunks", static_cast<int>(initPoints.size() + 1)); - - logChange(txn, "shardCollection.start", ns, collectionDetail.obj()); - } - - shared_ptr<ChunkManager> manager(new ChunkManager(ns, fieldsAndOrder, unique)); - Status createFirstChunksStatus = - manager->createFirstChunks(txn, dbPrimaryShardId, &initPoints, &initShardIds); - if (!createFirstChunksStatus.isOK()) { - return createFirstChunksStatus; - } - manager->loadExistingRanges(txn, nullptr); - - CollectionInfo collInfo; - collInfo.useChunkManager(manager); - collInfo.save(txn, ns); - - // Tell the primary mongod to refresh its data - // TODO: Think the real fix here is for mongos to just - // assume that all collections are sharded, when we get there - SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( - grid.shardRegistry()->getConfigServerConnectionString(), - dbPrimaryShardId, - primaryShard->getConnString(), - NamespaceString(ns), - manager->getVersion(), - true); - - auto ssvStatus = grid.shardRegistry()->runCommandWithNotMasterRetries( - txn, dbPrimaryShardId, "admin", ssv.toBSON()); - if (!ssvStatus.isOK()) { - warning() << "could not update initial version of " << ns << " on shard primary " - << dbPrimaryShardId << ssvStatus.getStatus(); - } - - logChange(txn, "shardCollection.end", ns, BSON("version" << manager->getVersion().toString())); - - return Status::OK(); -} - -StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationContext* txn, - const std::string& name) { - ScopedDbConnection conn(_configServerConnectionString, 30); - - if (conn->count(ShardType::ConfigNS, - BSON(ShardType::name() << NE << name << ShardType::draining(true)))) { - conn.done(); - return Status(ErrorCodes::ConflictingOperationInProgress, - "Can't have more than one draining shard at a time"); - } - - if (conn->count(ShardType::ConfigNS, BSON(ShardType::name() << NE << name)) == 0) { - conn.done(); - return Status(ErrorCodes::IllegalOperation, "Can't remove last shard"); - } - - // Case 1: start draining chunks - BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, - BSON(ShardType::name() << name << ShardType::draining(true))); - if (shardDoc.isEmpty()) { - log() << "going to start draining shard: " << name; - - auto updateStatus = updateConfigDocument(txn, - ShardType::ConfigNS, - BSON(ShardType::name() << name), - BSON("$set" << BSON(ShardType::draining(true))), - false); - if (!updateStatus.isOK()) { - log() << "error starting removeShard: " << name << causedBy(updateStatus.getStatus()); - return updateStatus.getStatus(); - } - - grid.shardRegistry()->reload(txn); - conn.done(); - - // Record start in changelog - logChange(txn, "removeShard.start", "", BSON("shard" << name)); - return ShardDrainingStatus::STARTED; - } - - // Case 2: all chunks drained - BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str())); - long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc); - long long dbCount = - conn->count(DatabaseType::ConfigNS, - BSON(DatabaseType::name.ne("local") << DatabaseType::primary(name))); - if (chunkCount == 0 && dbCount == 0) { - log() << "going to remove shard: " << name; - audit::logRemoveShard(txn->getClient(), name); - - Status status = - removeConfigDocuments(txn, ShardType::ConfigNS, BSON(ShardType::name() << name)); - if (!status.isOK()) { - log() << "Error concluding removeShard operation on: " << name - << "; err: " << status.reason(); - return status; - } - - grid.shardRegistry()->remove(name); - grid.shardRegistry()->reload(txn); - conn.done(); - - // Record finish in changelog - logChange(txn, "removeShard", "", BSON("shard" << name)); - return ShardDrainingStatus::COMPLETED; - } - - // case 3: draining ongoing - return ShardDrainingStatus::ONGOING; -} - -StatusWith<OpTimePair<DatabaseType>> CatalogManagerLegacy::getDatabase(OperationContext* txn, - const std::string& dbName) { - if (!NamespaceString::validDBName(dbName)) { - return {ErrorCodes::InvalidNamespace, stream() << dbName << " is not a valid db name"}; - } - - // The two databases that are hosted on the config server are config and admin - if (dbName == "config" || dbName == "admin") { - DatabaseType dbt; - dbt.setName(dbName); - dbt.setSharded(false); - dbt.setPrimary("config"); - - return OpTimePair<DatabaseType>(dbt); - } - - ScopedDbConnection conn(_configServerConnectionString, 30.0); - - BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, BSON(DatabaseType::name(dbName))); - if (dbObj.isEmpty()) { - conn.done(); - return {ErrorCodes::NamespaceNotFound, stream() << "database " << dbName << " not found"}; - } - - conn.done(); - - auto parseStatus = DatabaseType::fromBSON(dbObj); - - if (!parseStatus.isOK()) { - return parseStatus.getStatus(); - } - - return OpTimePair<DatabaseType>(parseStatus.getValue()); -} - -StatusWith<OpTimePair<CollectionType>> CatalogManagerLegacy::getCollection( - OperationContext* txn, const std::string& collNs) { - ScopedDbConnection conn(_configServerConnectionString, 30.0); - - BSONObj collObj = conn->findOne(CollectionType::ConfigNS, BSON(CollectionType::fullNs(collNs))); - if (collObj.isEmpty()) { - conn.done(); - return Status(ErrorCodes::NamespaceNotFound, - stream() << "collection " << collNs << " not found"); - } - - conn.done(); - - auto parseStatus = CollectionType::fromBSON(collObj); - - if (!parseStatus.isOK()) { - return parseStatus.getStatus(); - } - - return OpTimePair<CollectionType>(parseStatus.getValue()); -} - -Status CatalogManagerLegacy::getCollections(OperationContext* txn, - const std::string* dbName, - std::vector<CollectionType>* collections, - repl::OpTime* optime) { - BSONObjBuilder b; - if (dbName) { - invariant(!dbName->empty()); - b.appendRegex(CollectionType::fullNs(), - (string) "^" + pcrecpp::RE::QuoteMeta(*dbName) + "\\."); - } - - ScopedDbConnection conn(_configServerConnectionString, 30.0); - - std::unique_ptr<DBClientCursor> cursor( - _safeCursor(conn->query(CollectionType::ConfigNS, b.obj()))); - - while (cursor->more()) { - const BSONObj collObj = cursor->next(); - - const auto collectionResult = CollectionType::fromBSON(collObj); - if (!collectionResult.isOK()) { - conn.done(); - collections->clear(); - return Status(ErrorCodes::FailedToParse, - str::stream() << "error while parsing " << CollectionType::ConfigNS - << " document: " << collObj << " : " - << collectionResult.getStatus().toString()); - } - - collections->push_back(collectionResult.getValue()); - } - - conn.done(); - return Status::OK(); -} - -Status CatalogManagerLegacy::dropCollection(OperationContext* txn, const NamespaceString& ns) { - logChange(txn, "dropCollection.start", ns.ns(), BSONObj()); - - auto shardsStatus = getAllShards(txn); - if (!shardsStatus.isOK()) { - return shardsStatus.getStatus(); - } - vector<ShardType> allShards = std::move(shardsStatus.getValue().value); - - LOG(1) << "dropCollection " << ns << " started"; - - // Lock the collection globally so that split/migrate cannot run - stdx::chrono::seconds waitFor(DistLockManager::kDefaultLockTimeout); - MONGO_FAIL_POINT_BLOCK(setSCCCDropCollDistLockWait, customWait) { - const BSONObj& data = customWait.getData(); - waitFor = stdx::chrono::seconds(data["waitForSecs"].numberInt()); - } - - auto scopedDistLock = getDistLockManager()->lock(txn, ns.ns(), "drop", waitFor); - if (!scopedDistLock.isOK()) { - return scopedDistLock.getStatus(); - } - - LOG(1) << "dropCollection " << ns << " locked"; - - std::map<string, BSONObj> errors; - auto* shardRegistry = grid.shardRegistry(); - - for (const auto& shardEntry : allShards) { - auto dropResult = shardRegistry->runCommandWithNotMasterRetries( - txn, shardEntry.getName(), ns.db().toString(), BSON("drop" << ns.coll())); - - if (!dropResult.isOK()) { - return dropResult.getStatus(); - } - - auto dropStatus = getStatusFromCommandResult(dropResult.getValue()); - if (!dropStatus.isOK()) { - if (dropStatus.code() == ErrorCodes::NamespaceNotFound) { - continue; - } - - errors.emplace(shardEntry.getHost(), dropResult.getValue()); - } - } - - if (!errors.empty()) { - StringBuilder sb; - sb << "Dropping collection failed on the following hosts: "; - - for (auto it = errors.cbegin(); it != errors.cend(); ++it) { - if (it != errors.cbegin()) { - sb << ", "; - } - - sb << it->first << ": " << it->second; - } - - return {ErrorCodes::OperationFailed, sb.str()}; - } - - LOG(1) << "dropCollection " << ns << " shard data deleted"; - - // Remove chunk data - Status result = removeConfigDocuments(txn, ChunkType::ConfigNS, BSON(ChunkType::ns(ns.ns()))); - if (!result.isOK()) { - return result; - } - - LOG(1) << "dropCollection " << ns << " chunk data deleted"; - - // Mark the collection as dropped - CollectionType coll; - coll.setNs(ns); - coll.setDropped(true); - coll.setEpoch(ChunkVersion::DROPPED().epoch()); - coll.setUpdatedAt(grid.shardRegistry()->getNetwork()->now()); - - result = updateCollection(txn, ns.ns(), coll); - if (!result.isOK()) { - return result; - } - - LOG(1) << "dropCollection " << ns << " collection marked as dropped"; - - for (const auto& shardEntry : allShards) { - SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( - grid.shardRegistry()->getConfigServerConnectionString(), - shardEntry.getName(), - fassertStatusOK(28753, ConnectionString::parse(shardEntry.getHost())), - ns, - ChunkVersion::DROPPED(), - true); - - auto ssvResult = shardRegistry->runCommandWithNotMasterRetries( - txn, shardEntry.getName(), "admin", ssv.toBSON()); - - if (!ssvResult.isOK()) { - return ssvResult.getStatus(); - } - - auto ssvStatus = getStatusFromCommandResult(ssvResult.getValue()); - if (!ssvStatus.isOK()) { - return ssvStatus; - } - - auto unsetShardingStatus = shardRegistry->runCommandWithNotMasterRetries( - txn, shardEntry.getName(), "admin", BSON("unsetSharding" << 1)); - - if (!unsetShardingStatus.isOK()) { - return unsetShardingStatus.getStatus(); - } - - auto unsetShardingResult = getStatusFromCommandResult(unsetShardingStatus.getValue()); - if (!unsetShardingResult.isOK()) { - return unsetShardingResult; - } - } - - LOG(1) << "dropCollection " << ns << " completed"; - - logChange(txn, "dropCollection", ns.ns(), BSONObj()); - - return Status::OK(); -} - -StatusWith<SettingsType> CatalogManagerLegacy::getGlobalSettings(OperationContext* txn, - const string& key) { - try { - ScopedDbConnection conn(_configServerConnectionString, 30); - BSONObj settingsDoc = conn->findOne(SettingsType::ConfigNS, BSON(SettingsType::key(key))); - conn.done(); - - if (settingsDoc.isEmpty()) { - return Status(ErrorCodes::NoMatchingDocument, - str::stream() << "can't find settings document with key: " << key); - } - - StatusWith<SettingsType> settingsResult = SettingsType::fromBSON(settingsDoc); - if (!settingsResult.isOK()) { - return Status(ErrorCodes::FailedToParse, - str::stream() << "error while parsing settings document: " << settingsDoc - << " : " << settingsResult.getStatus().toString()); - } - - const SettingsType& settings = settingsResult.getValue(); - - Status validationStatus = settings.validate(); - if (!validationStatus.isOK()) { - return validationStatus; - } - - return settingsResult; - } catch (const DBException& ex) { - return Status(ErrorCodes::OperationFailed, - str::stream() << "unable to successfully obtain " - << "config.settings document: " << causedBy(ex)); - } -} - -Status CatalogManagerLegacy::getDatabasesForShard(OperationContext* txn, - const string& shardName, - vector<string>* dbs) { - dbs->clear(); - - try { - ScopedDbConnection conn(_configServerConnectionString, 30.0); - std::unique_ptr<DBClientCursor> cursor(_safeCursor( - conn->query(DatabaseType::ConfigNS, Query(BSON(DatabaseType::primary(shardName)))))); - if (!cursor.get()) { - conn.done(); - return Status(ErrorCodes::HostUnreachable, - str::stream() << "unable to open cursor for " << DatabaseType::ConfigNS); - } - - while (cursor->more()) { - BSONObj dbObj = cursor->nextSafe(); - - string dbName; - Status status = bsonExtractStringField(dbObj, DatabaseType::name(), &dbName); - if (!status.isOK()) { - dbs->clear(); - return status; - } - - dbs->push_back(dbName); - } - - conn.done(); - } catch (const DBException& ex) { - return ex.toStatus(); - } - - return Status::OK(); -} - -Status CatalogManagerLegacy::getChunks(OperationContext* txn, - const BSONObj& query, - const BSONObj& sort, - boost::optional<int> limit, - vector<ChunkType>* chunks, - repl::OpTime* opTime) { - chunks->clear(); - - try { - ScopedDbConnection conn(_configServerConnectionString, 30.0); - - const Query queryWithSort(Query(query).sort(sort)); - - std::unique_ptr<DBClientCursor> cursor( - _safeCursor(conn->query(ChunkType::ConfigNS, queryWithSort, limit.get_value_or(0)))); - if (!cursor.get()) { - conn.done(); - return Status(ErrorCodes::HostUnreachable, "unable to open chunk cursor"); - } - - while (cursor->more()) { - BSONObj chunkObj = cursor->nextSafe(); - - StatusWith<ChunkType> chunkRes = ChunkType::fromBSON(chunkObj); - if (!chunkRes.isOK()) { - conn.done(); - chunks->clear(); - return {ErrorCodes::FailedToParse, - stream() << "Failed to parse chunk with id (" - << chunkObj[ChunkType::name()].toString() - << "): " << chunkRes.getStatus().toString()}; - } - - chunks->push_back(chunkRes.getValue()); - } - - conn.done(); - } catch (const DBException& ex) { - return ex.toStatus(); - } - - return Status::OK(); -} - -Status CatalogManagerLegacy::getTagsForCollection(OperationContext* txn, - const std::string& collectionNs, - std::vector<TagsType>* tags) { - tags->clear(); - - try { - ScopedDbConnection conn(_configServerConnectionString, 30); - std::unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query( - TagsType::ConfigNS, Query(BSON(TagsType::ns(collectionNs))).sort(TagsType::min())))); - if (!cursor.get()) { - conn.done(); - return Status(ErrorCodes::HostUnreachable, "unable to open tags cursor"); - } - - while (cursor->more()) { - BSONObj tagObj = cursor->nextSafe(); - - StatusWith<TagsType> tagRes = TagsType::fromBSON(tagObj); - if (!tagRes.isOK()) { - tags->clear(); - conn.done(); - return Status(ErrorCodes::FailedToParse, - str::stream() - << "Failed to parse tag: " << tagRes.getStatus().toString()); - } - - tags->push_back(tagRes.getValue()); - } - - conn.done(); - } catch (const DBException& ex) { - return ex.toStatus(); - } - - return Status::OK(); -} - -StatusWith<string> CatalogManagerLegacy::getTagForChunk(OperationContext* txn, - const std::string& collectionNs, - const ChunkType& chunk) { - BSONObj tagDoc; - - try { - ScopedDbConnection conn(_configServerConnectionString, 30); - - Query query(BSON(TagsType::ns(collectionNs) - << TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max() - << BSON("$gte" << chunk.getMax()))); - - tagDoc = conn->findOne(TagsType::ConfigNS, query); - conn.done(); - } catch (const DBException& ex) { - return ex.toStatus(); - } - - if (tagDoc.isEmpty()) { - return std::string(""); - } - - auto status = TagsType::fromBSON(tagDoc); - if (status.isOK()) { - return status.getValue().getTag(); - } - - return status.getStatus(); -} - -StatusWith<OpTimePair<std::vector<ShardType>>> CatalogManagerLegacy::getAllShards( - OperationContext* txn) { - std::vector<ShardType> shards; - ScopedDbConnection conn(_configServerConnectionString, 30.0); - std::unique_ptr<DBClientCursor> cursor( - _safeCursor(conn->query(ShardType::ConfigNS, BSONObj()))); - while (cursor->more()) { - BSONObj shardObj = cursor->nextSafe(); - - StatusWith<ShardType> shardRes = ShardType::fromBSON(shardObj); - if (!shardRes.isOK()) { - shards.clear(); - conn.done(); - return Status(ErrorCodes::FailedToParse, - str::stream() << "Failed to parse shard with id (" - << shardObj[ShardType::name()].toString() - << "): " << shardRes.getStatus().toString()); - } - - shards.push_back(shardRes.getValue()); - } - conn.done(); - - return OpTimePair<std::vector<ShardType>>{std::move(shards)}; -} - -bool CatalogManagerLegacy::runUserManagementWriteCommand(OperationContext* txn, - const string& commandName, - const string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - DBClientMultiCommand dispatcher(true); - for (const ConnectionString& configServer : _configServers) { - dispatcher.addCommand(configServer, dbname, cmdObj); - } - - auto scopedDistLock = getDistLockManager()->lock(txn, "authorizationData", commandName); - if (!scopedDistLock.isOK()) { - return Command::appendCommandStatus(*result, scopedDistLock.getStatus()); - } - - dispatcher.sendAll(); - - BSONObj responseObj; - - Status prevStatus{Status::OK()}; - Status currStatus{Status::OK()}; - - BSONObjBuilder responses; - unsigned failedCount = 0; - bool sameError = true; - while (dispatcher.numPending() > 0) { - ConnectionString host; - RawBSONSerializable responseCmdSerial; - - Status dispatchStatus = dispatcher.recvAny(&host, &responseCmdSerial); - - if (!dispatchStatus.isOK()) { - return Command::appendCommandStatus(*result, dispatchStatus); - } - - responseObj = responseCmdSerial.toBSON(); - responses.append(host.toString(), responseObj); - - currStatus = Command::getStatusFromCommandResult(responseObj); - if (!currStatus.isOK()) { - // same error <=> adjacent error statuses are the same - if (failedCount > 0 && prevStatus != currStatus) { - sameError = false; - } - failedCount++; - prevStatus = currStatus; - } - } - - if (failedCount == 0) { - result->appendElements(responseObj); - return true; - } - - // if the command succeeds on at least one config server and fails on at least one, - // manual intervention is required - if (failedCount < _configServers.size()) { - Status status(ErrorCodes::ManualInterventionRequired, - str::stream() << "Config write was not consistent - " - << "user management command failed on at least one " - << "config server but passed on at least one other. " - << "Manual intervention may be required. " - << "Config responses: " << responses.obj().toString()); - return Command::appendCommandStatus(*result, status); - } - - if (sameError) { - result->appendElements(responseObj); - return false; - } - - Status status(ErrorCodes::ManualInterventionRequired, - str::stream() << "Config write was not consistent - " - << "user management command produced inconsistent results. " - << "Manual intervention may be required. " - << "Config responses: " << responses.obj().toString()); - return Command::appendCommandStatus(*result, status); -} - -bool CatalogManagerLegacy::runUserManagementReadCommand(OperationContext* txn, - const string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - return _runReadCommand(txn, dbname, cmdObj, result); -} - -bool CatalogManagerLegacy::_runReadCommand(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - try { - // let SyncClusterConnection handle connecting to the first config server - // that is reachable and returns data - ScopedDbConnection conn(_configServerConnectionString, 30); - - BSONObj cmdResult; - const bool ok = conn->runCommand(dbname, cmdObj, cmdResult); - result->appendElements(cmdResult); - conn.done(); - return ok; - } catch (const DBException& ex) { - return Command::appendCommandStatus(*result, ex.toStatus()); - } -} - -Status CatalogManagerLegacy::applyChunkOpsDeprecated(OperationContext* txn, - const BSONArray& updateOps, - const BSONArray& preCondition, - const std::string& nss, - const ChunkVersion& lastChunkVersion) { - BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition); - BSONObj cmdResult; - try { - ScopedDbConnection conn(_configServerConnectionString, 30); - conn->runCommand("config", cmd, cmdResult); - conn.done(); - } catch (const DBException& ex) { - return ex.toStatus(); - } - - Status status = Command::getStatusFromCommandResult(cmdResult); - - if (MONGO_FAIL_POINT(failApplyChunkOps)) { - status = Status(ErrorCodes::InternalError, "Failpoint 'failApplyChunkOps' generated error"); - } - - if (!status.isOK()) { - string errMsg; - - // This could be a blip in the network connectivity. Check if the commit request made it. - // - // If all the updates were successfully written to the chunks collection, the last - // document in the list of updates should be returned from a query to the chunks - // collection. The last chunk can be identified by namespace and version number. - - warning() << "chunk operation commit failed and metadata will be revalidated" - << causedBy(status); - - std::vector<ChunkType> newestChunk; - BSONObjBuilder query; - lastChunkVersion.addToBSON(query, ChunkType::DEPRECATED_lastmod()); - query.append(ChunkType::ns(), nss); - Status chunkStatus = getChunks(txn, query.obj(), BSONObj(), 1, &newestChunk, nullptr); - - if (!chunkStatus.isOK()) { - warning() << "getChunks function failed, unable to validate chunk operation metadata" - << causedBy(chunkStatus); - errMsg = str::stream() << "getChunks function failed, unable to validate chunk " - << "operation metadata: " << causedBy(chunkStatus) - << ". applyChunkOpsDeprecated failed to get confirmation " - << "of commit. Unable to save chunk ops. Command: " << cmd - << ". Result: " << cmdResult; - } else if (!newestChunk.empty()) { - invariant(newestChunk.size() == 1); - log() << "chunk operation commit confirmed"; - return Status::OK(); - } else { - errMsg = str::stream() << "chunk operation commit failed: version " - << lastChunkVersion.toString() << " doesn't exist in namespace" - << nss << ". Unable to save chunk ops. Command: " << cmd - << ". Result: " << cmdResult; - } - - return Status(status.code(), errMsg); - } - - return Status::OK(); -} - -void CatalogManagerLegacy::writeConfigServerDirect(OperationContext* txn, - const BatchedCommandRequest& request, - BatchedCommandResponse* response) { - // check if config servers are consistent - if (!_isConsistentFromLastCheck()) { - toBatchError(Status(ErrorCodes::ConfigServersInconsistent, - "Data inconsistency detected amongst config servers"), - response); - return; - } - - // We only support batch sizes of one for config writes - if (request.sizeWriteOps() != 1) { - toBatchError(Status(ErrorCodes::InvalidOptions, - str::stream() << "Writes to config servers must have batch size of 1, " - << "found " << request.sizeWriteOps()), - response); - - return; - } - - // We only support {w: 0}, {w: 1}, and {w: 'majority'} write concern for config writes - if (request.isWriteConcernSet() && !validConfigWC(request.getWriteConcern())) { - toBatchError(Status(ErrorCodes::InvalidOptions, - str::stream() << "Invalid write concern for write to " - << "config servers: " << request.getWriteConcern()), - response); - - return; - } - - DBClientMultiCommand dispatcher(true); - if (_configServers.size() > 1) { - // We can't support no-_id upserts to multiple config servers - the _ids will differ - if (BatchedCommandRequest::containsNoIDUpsert(request)) { - toBatchError( - Status(ErrorCodes::InvalidOptions, - str::stream() << "upserts to multiple config servers must include _id"), - response); - return; - } - } - - ConfigCoordinator exec(&dispatcher, _configServerConnectionString); - exec.executeBatch(request, response); -} - -Status CatalogManagerLegacy::insertConfigDocument(OperationContext* txn, - const std::string& ns, - const BSONObj& doc) { - const NamespaceString nss(ns); - invariant(nss.db() == "config"); - invariant(doc.hasField("_id")); - - auto insert(stdx::make_unique<BatchedInsertRequest>()); - insert->addToDocuments(doc); - - BatchedCommandRequest request(insert.release()); - request.setNS(nss); - - BatchedCommandResponse response; - writeConfigServerDirect(txn, request, &response); - - return response.toStatus(); -} - -StatusWith<bool> CatalogManagerLegacy::updateConfigDocument(OperationContext* txn, - const string& ns, - const BSONObj& query, - const BSONObj& update, - bool upsert) { - const NamespaceString nss(ns); - invariant(nss.db() == "config"); - - const BSONElement idField = query.getField("_id"); - invariant(!idField.eoo()); - - unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument()); - updateDoc->setQuery(query); - updateDoc->setUpdateExpr(update); - updateDoc->setUpsert(upsert); - updateDoc->setMulti(false); - - unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest()); - updateRequest->addToUpdates(updateDoc.release()); - - BatchedCommandRequest request(updateRequest.release()); - request.setNS(nss); - - BatchedCommandResponse response; - writeConfigServerDirect(txn, request, &response); - - Status status = response.toStatus(); - if (!status.isOK()) { - return status; - } - - const auto nSelected = response.getN(); - invariant(nSelected == 0 || nSelected == 1); - return (nSelected == 1); -} - -Status CatalogManagerLegacy::removeConfigDocuments(OperationContext* txn, - const string& ns, - const BSONObj& query) { - const NamespaceString nss(ns); - invariant(nss.db() == "config"); - - auto deleteDoc(stdx::make_unique<BatchedDeleteDocument>()); - deleteDoc->setQuery(query); - deleteDoc->setLimit(0); - - auto deleteRequest(stdx::make_unique<BatchedDeleteRequest>()); - deleteRequest->addToDeletes(deleteDoc.release()); - - BatchedCommandRequest request(deleteRequest.release()); - request.setNS(nss); - - BatchedCommandResponse response; - writeConfigServerDirect(txn, request, &response); - - return response.toStatus(); -} - -Status CatalogManagerLegacy::_checkDbDoesNotExist(OperationContext* txn, - const std::string& dbName, - DatabaseType* db) { - ScopedDbConnection conn(_configServerConnectionString, 30); - - BSONObjBuilder b; - b.appendRegex(DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbName) + "$", "i"); - - BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, b.obj()); - conn.done(); - - // If our name is exactly the same as the name we want, try loading - // the database again. - if (!dbObj.isEmpty() && dbObj[DatabaseType::name()].String() == dbName) { - if (db) { - auto parseDBStatus = DatabaseType::fromBSON(dbObj); - if (!parseDBStatus.isOK()) { - return parseDBStatus.getStatus(); - } - - *db = parseDBStatus.getValue(); - } - - return Status(ErrorCodes::NamespaceExists, - str::stream() << "database " << dbName << " already exists"); - } - - if (!dbObj.isEmpty()) { - return Status(ErrorCodes::DatabaseDifferCase, - str::stream() << "can't have 2 databases that just differ on case " - << " have: " << dbObj[DatabaseType::name()].String() - << " want to add: " << dbName); - } - - return Status::OK(); -} - -StatusWith<string> CatalogManagerLegacy::_generateNewShardName(OperationContext* txn) { - BSONObj o; - { - ScopedDbConnection conn(_configServerConnectionString, 30); - o = conn->findOne(ShardType::ConfigNS, - Query(fromjson("{" + ShardType::name() + ": /^shard/}")) - .sort(BSON(ShardType::name() << -1))); - conn.done(); - } - - int count = 0; - if (!o.isEmpty()) { - string last = o[ShardType::name()].String(); - std::istringstream is(last.substr(5)); - is >> count; - count++; - } - - // TODO fix so that we can have more than 10000 automatically generated shard names - if (count < 9999) { - std::stringstream ss; - ss << "shard" << std::setfill('0') << std::setw(4) << count; - return ss.str(); - } - - return Status(ErrorCodes::OperationFailed, "unable to generate new shard name"); -} - -Status CatalogManagerLegacy::_createCappedConfigCollection(OperationContext* txn, - StringData collName, - int cappedSize) { - try { - const NamespaceString nss("config", collName); - ScopedDbConnection conn(_configServerConnectionString, 30.0); - - BSONObj result; - const int maxNumDocuments = 0; - conn->createCollection(nss.ns(), cappedSize, true, maxNumDocuments, &result); - conn.done(); - - return getStatusFromCommandResult(result); - } catch (const DBException& ex) { - return ex.toStatus(); - } -} - -size_t CatalogManagerLegacy::_getShardCount(const BSONObj& query) const { - ScopedDbConnection conn(_configServerConnectionString, 30.0); - long long shardCount = conn->count(ShardType::ConfigNS, query); - conn.done(); - - return shardCount; -} - -DistLockManager* CatalogManagerLegacy::getDistLockManager() { - invariant(_distLockManager); - return _distLockManager.get(); -} - -Status CatalogManagerLegacy::_checkConfigServersConsistent(const unsigned tries) const { - if (tries <= 0) { - return {ErrorCodes::ConfigServersInconsistent, - "too many retries after unsuccessful checks"}; - } - - unsigned firstGood = 0; - int up = 0; - // becomes false if we are able to get any response (even a failed one) from any of the config - // servers - bool networkError = true; - vector<BSONObj> res; - - // The last error we saw on a config server - string errMsg; - - for (unsigned i = 0; i < _configServers.size(); i++) { - BSONObj result; - std::unique_ptr<ScopedDbConnection> conn; - - try { - conn.reset(new ScopedDbConnection(_configServers[i], 30.0)); - - if (!conn->get()->runCommand( - "config", - BSON("dbhash" << 1 << "collections" << BSON_ARRAY("chunks" - << "databases" - << "collections" - << "shards" - << "version")), - result)) { - errMsg = result["errmsg"].eoo() ? "" : result["errmsg"].String(); - if (!result["assertion"].eoo()) - errMsg = result["assertion"].String(); - - warning() << "couldn't check dbhash on config server " << _configServers[i] - << causedBy(result.toString()); - - result = BSONObj(); - } else { - result = result.getOwned(); - if (up == 0) - firstGood = i; - up++; - } - // Network errors throw, so if we got this far there wasn't a network error - networkError = false; - conn->done(); - } catch (const DBException& excep) { - if (conn) { - conn->kill(); - } - - if (excep.getCode() == ErrorCodes::IncompatibleCatalogManager) { - return excep.toStatus(); - } - - // We need to catch DBExceptions b/c sometimes we throw them - // instead of socket exceptions when findN fails - - errMsg = excep.toString(); - warning() << " couldn't check dbhash on config server " << _configServers[i] - << causedBy(excep); - } - res.push_back(result); - } - - if (_configServers.size() == 1) { - return Status::OK(); - } - - if (up == 0) { - return {networkError ? ErrorCodes::HostUnreachable : ErrorCodes::UnknownError, - str::stream() << "no config servers successfully contacted" << causedBy(errMsg)}; - } else if (up == 1) { - warning() << "only 1 config server reachable, continuing"; - return Status::OK(); - } - - BSONObj base = res[firstGood]; - for (unsigned i = firstGood + 1; i < res.size(); i++) { - if (res[i].isEmpty()) - continue; - - string chunksHash1 = base.getFieldDotted("collections.chunks"); - string chunksHash2 = res[i].getFieldDotted("collections.chunks"); - - string databaseHash1 = base.getFieldDotted("collections.databases"); - string databaseHash2 = res[i].getFieldDotted("collections.databases"); - - string collectionsHash1 = base.getFieldDotted("collections.collections"); - string collectionsHash2 = res[i].getFieldDotted("collections.collections"); - - string shardHash1 = base.getFieldDotted("collections.shards"); - string shardHash2 = res[i].getFieldDotted("collections.shards"); - - string versionHash1 = base.getFieldDotted("collections.version"); - string versionHash2 = res[i].getFieldDotted("collections.version"); - - if (chunksHash1 == chunksHash2 && databaseHash1 == databaseHash2 && - collectionsHash1 == collectionsHash2 && shardHash1 == shardHash2 && - versionHash1 == versionHash2) { - continue; - } - - warning() << "config servers " << _configServers[firstGood].toString() << " and " - << _configServers[i].toString() << " differ"; - - if (tries <= 1) { - return {ErrorCodes::ConfigServersInconsistent, - str::stream() << "hash from " << _configServers[firstGood].toString() << ": " - << base["collections"].Obj() << " vs hash from " - << _configServers[i].toString() << ": " - << res[i]["collections"].Obj()}; - } - - return _checkConfigServersConsistent(tries - 1); - } - - return Status::OK(); -} - -void CatalogManagerLegacy::_consistencyChecker() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_inShutdown) { - lk.unlock(); - const auto status = _checkConfigServersConsistent(); - - lk.lock(); - _consistentFromLastCheck = status.isOK(); - if (_inShutdown) - break; - _consistencyCheckerCV.wait_for(lk, Seconds(60)); - } - LOG(1) << "Consistency checker thread shutting down"; -} - -bool CatalogManagerLegacy::_isConsistentFromLastCheck() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - return _consistentFromLastCheck; -} - -Status CatalogManagerLegacy::appendInfoForConfigServerDatabases(OperationContext* txn, - BSONArrayBuilder* builder) { - BSONObjBuilder resultBuilder; - if (!_runReadCommand(txn, "admin", BSON("listDatabases" << 1), &resultBuilder)) { - return getStatusFromCommandResult(resultBuilder.obj()); - } - - auto listDBResponse = resultBuilder.done(); - BSONElement dbListArray; - auto dbListStatus = bsonExtractTypedField(listDBResponse, "databases", Array, &dbListArray); - if (!dbListStatus.isOK()) { - return dbListStatus; - } - - BSONObjIterator iter(dbListArray.Obj()); - - while (iter.more()) { - auto dbEntry = iter.next().Obj(); - string name; - auto parseStatus = bsonExtractStringField(dbEntry, "name", &name); - - if (!parseStatus.isOK()) { - return parseStatus; - } - - if (name == "config" || name == "admin") { - builder->append(dbEntry); - } - } - - return Status::OK(); -} - -} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h deleted file mode 100644 index 3c99ec4c978..00000000000 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/client/connection_string.h" -#include "mongo/s/catalog/catalog_manager_common.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/thread.h" - -namespace mongo { - -/** - * Implements the catalog manager using the legacy 3-config server protocol. - */ -class CatalogManagerLegacy final : public CatalogManagerCommon { -public: - CatalogManagerLegacy(); - ~CatalogManagerLegacy(); - - ConfigServerMode getMode() override { - return ConfigServerMode::SCCC; - } - - /** - * Initializes the catalog manager with the hosts, which will be used as a configuration - * server. Can only be called once for the lifetime. - */ - Status init(const ConnectionString& configCS, const std::string& distLockProcessId); - - /** - * Can terminate the server if called more than once. - */ - Status startup(OperationContext* txn, bool allowNetworking) override; - - void shutDown(OperationContext* txn, bool allowNetworking) override; - - Status shardCollection(OperationContext* txn, - const std::string& ns, - const ShardKeyPattern& fieldsAndOrder, - bool unique, - const std::vector<BSONObj>& initPoints, - const std::set<ShardId>& initShardIds) override; - - StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, - const std::string& name) override; - - StatusWith<OpTimePair<DatabaseType>> getDatabase(OperationContext* txn, - const std::string& dbName) override; - - StatusWith<OpTimePair<CollectionType>> getCollection(OperationContext* txn, - const std::string& collNs) override; - - Status getCollections(OperationContext* txn, - const std::string* dbName, - std::vector<CollectionType>* collections, - repl::OpTime* optime); - - Status dropCollection(OperationContext* txn, const NamespaceString& ns) override; - - Status getDatabasesForShard(OperationContext* txn, - const std::string& shardName, - std::vector<std::string>* dbs) override; - - Status getChunks(OperationContext* txn, - const BSONObj& query, - const BSONObj& sort, - boost::optional<int> limit, - std::vector<ChunkType>* chunks, - repl::OpTime* opTime) override; - - Status getTagsForCollection(OperationContext* txn, - const std::string& collectionNs, - std::vector<TagsType>* tags) override; - - StatusWith<std::string> getTagForChunk(OperationContext* txn, - const std::string& collectionNs, - const ChunkType& chunk) override; - - StatusWith<OpTimePair<std::vector<ShardType>>> getAllShards(OperationContext* txn) override; - - /** - * Grabs a distributed lock and runs the command on all config servers. - */ - bool runUserManagementWriteCommand(OperationContext* txn, - const std::string& commandName, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) override; - - bool runUserManagementReadCommand(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) override; - - Status applyChunkOpsDeprecated(OperationContext* txn, - const BSONArray& updateOps, - const BSONArray& preCondition, - const std::string& nss, - const ChunkVersion& lastChunkVersion) override; - - StatusWith<SettingsType> getGlobalSettings(OperationContext* txn, - const std::string& key) override; - - void writeConfigServerDirect(OperationContext* txn, - const BatchedCommandRequest& request, - BatchedCommandResponse* response) override; - - Status insertConfigDocument(OperationContext* txn, - const std::string& ns, - const BSONObj& doc) override; - - StatusWith<bool> updateConfigDocument(OperationContext* txn, - const std::string& ns, - const BSONObj& query, - const BSONObj& update, - bool upsert) override; - - Status removeConfigDocuments(OperationContext* txn, - const std::string& ns, - const BSONObj& query) override; - - DistLockManager* getDistLockManager() override; - - Status initConfigVersion(OperationContext* txn) override; - - Status appendInfoForConfigServerDatabases(OperationContext* txn, - BSONArrayBuilder* builder) override; - -private: - Status _checkDbDoesNotExist(OperationContext* txn, - const std::string& dbName, - DatabaseType* db) override; - - StatusWith<std::string> _generateNewShardName(OperationContext* txn) override; - - Status _createCappedConfigCollection(OperationContext* txn, - StringData collName, - int cappedSize) override; - - /** - * Starts the thread that periodically checks data consistency amongst the config servers. - * Note: this is not thread safe and can only be called once for the lifetime. - */ - Status _startConfigServerChecker(); - - /** - * Returns the number of shards recognized by the config servers - * in this sharded cluster. - * Optional: use query parameter to filter shard count. - */ - size_t _getShardCount(const BSONObj& query) const; - - /** - * Returns OK if all config servers that were contacted have the same state. - * If inconsistency detected on first attempt, checks at most 3 more times. - */ - Status _checkConfigServersConsistent(const unsigned tries = 4) const; - - /** - * Checks data consistency amongst config servers every 60 seconds. - */ - void _consistencyChecker(); - - /** - * Returns true if the config servers have the same contents since the last - * check was performed. - */ - bool _isConsistentFromLastCheck(); - - /** - * Sends a read only command to the config server. - */ - bool _runReadCommand(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result); - - // Parsed config server hosts, as specified on the command line. - ConnectionString _configServerConnectionString; - std::vector<ConnectionString> _configServers; - - // Distributed lock manager singleton. - std::unique_ptr<DistLockManager> _distLockManager; - - // protects _inShutdown, _consistentFromLastCheck; used by _consistencyCheckerCV - stdx::mutex _mutex; - - // True if CatalogManagerLegacy::shutDown has been called. False, otherwise. - bool _inShutdown = false; - - // Set to true once startup() has been called and returned an OK status. Allows startup() to be - // called multiple times with any time after the first successful call being a no-op. - bool _started = false; - - // used by consistency checker thread to check if config - // servers are consistent - bool _consistentFromLastCheck = false; - - // Thread that runs dbHash on config servers for checking data consistency. - stdx::thread _consistencyCheckerThread; - - // condition variable used by the consistency checker thread to wait - // for <= 60s, on every iteration, until shutDown is called - stdx::condition_variable _consistencyCheckerCV; -}; - -} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/cluster_client_internal.cpp b/src/mongo/s/catalog/legacy/cluster_client_internal.cpp deleted file mode 100644 index cc03875aa87..00000000000 --- a/src/mongo/s/catalog/legacy/cluster_client_internal.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright (C) 2012 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/s/catalog/legacy/cluster_client_internal.h" - -#include "mongo/client/connpool.h" -#include "mongo/util/stringutils.h" - -namespace mongo { - -using std::unique_ptr; -using mongoutils::str::stream; - -// Helper function for safe cursors -DBClientCursor* _safeCursor(unique_ptr<DBClientCursor> cursor) { - // TODO: Make error handling more consistent, it's annoying that cursors error out by - // throwing exceptions *and* being empty - uassert(16625, str::stream() << "cursor not found, transport error", cursor.get()); - return cursor.release(); -} -} diff --git a/src/mongo/s/catalog/legacy/cluster_client_internal.h b/src/mongo/s/catalog/legacy/cluster_client_internal.h deleted file mode 100644 index af4d9f423ef..00000000000 --- a/src/mongo/s/catalog/legacy/cluster_client_internal.h +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Copyright (C) 2012 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include "mongo/stdx/memory.h" - -namespace mongo { - -class DBClientCursor; - -// -// Needed to normalize exception behavior of connections and cursors -// TODO: Remove when we refactor the client connection interface to something more consistent. -// - -// Helper function which throws for invalid cursor initialization. -// Note: cursor ownership will be passed to this function. -DBClientCursor* _safeCursor(std::unique_ptr<DBClientCursor> cursor); -} diff --git a/src/mongo/s/catalog/legacy/config_coordinator.cpp b/src/mongo/s/catalog/legacy/config_coordinator.cpp deleted file mode 100644 index cdfd7e93b62..00000000000 --- a/src/mongo/s/catalog/legacy/config_coordinator.cpp +++ /dev/null @@ -1,430 +0,0 @@ -/** - * Copyright (C) 2013 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/s/catalog/legacy/config_coordinator.h" - -#include "mongo/base/owned_pointer_vector.h" -#include "mongo/db/field_parser.h" -#include "mongo/db/lasterror.h" -#include "mongo/db/namespace_string.h" -#include "mongo/s/client/multi_command_dispatch.h" -#include "mongo/s/set_shard_version_request.h" -#include "mongo/s/write_ops/batched_command_request.h" -#include "mongo/s/write_ops/batched_command_response.h" -#include "mongo/util/log.h" - -namespace mongo { - -using std::string; -using std::vector; - -namespace { - -/** - * A BSON serializable object representing a setShardVersion command response. - */ -class SSVResponse : public BSONSerializable { - MONGO_DISALLOW_COPYING(SSVResponse); - -public: - static const BSONField<int> ok; - static const BSONField<int> errCode; - static const BSONField<string> errMessage; - - - SSVResponse() { - clear(); - } - - bool isValid(std::string* errMsg) const { - return _isOkSet; - } - - BSONObj toBSON() const { - BSONObjBuilder builder; - - if (_isOkSet) - builder << ok(_ok); - if (_isErrCodeSet) - builder << errCode(_errCode); - if (_isErrMessageSet) - builder << errMessage(_errMessage); - - return builder.obj(); - } - - bool parseBSON(const BSONObj& source, std::string* errMsg) { - FieldParser::FieldState result; - - result = FieldParser::extractNumber(source, ok, &_ok, errMsg); - if (result == FieldParser::FIELD_INVALID) { - return false; - } - _isOkSet = result != FieldParser::FIELD_NONE; - - result = FieldParser::extract(source, errCode, &_errCode, errMsg); - if (result == FieldParser::FIELD_INVALID) { - return false; - } - _isErrCodeSet = result != FieldParser::FIELD_NONE; - - result = FieldParser::extract(source, errMessage, &_errMessage, errMsg); - if (result == FieldParser::FIELD_INVALID) { - return false; - } - _isErrMessageSet = result != FieldParser::FIELD_NONE; - - return true; - } - - void clear() { - _ok = false; - _isOkSet = false; - - _errCode = 0; - _isErrCodeSet = false; - - _errMessage = ""; - _isErrMessageSet = false; - } - - string toString() const { - return toBSON().toString(); - } - - int getOk() { - dassert(_isOkSet); - return _ok; - } - - void setOk(int ok) { - _ok = ok; - _isOkSet = true; - } - - int getErrCode() { - if (_isErrCodeSet) { - return _errCode; - } else { - return errCode.getDefault(); - } - } - - void setErrCode(int errCode) { - _errCode = errCode; - _isErrCodeSet = true; - } - - bool isErrCodeSet() const { - return _isErrCodeSet; - } - - const string& getErrMessage() { - dassert(_isErrMessageSet); - return _errMessage; - } - - void setErrMessage(StringData errMsg) { - _errMessage = errMsg.toString(); - _isErrMessageSet = true; - } - -private: - int _ok; - bool _isOkSet; - - int _errCode; - bool _isErrCodeSet; - - string _errMessage; - bool _isErrMessageSet; -}; - -const BSONField<int> SSVResponse::ok("ok"); -const BSONField<int> SSVResponse::errCode("code"); -const BSONField<string> SSVResponse::errMessage("errmsg"); - - -struct ConfigResponse { - ConnectionString configHost; - BatchedCommandResponse response; -}; - -void buildErrorFrom(const Status& status, BatchedCommandResponse* response) { - response->setOk(false); - response->setErrCode(static_cast<int>(status.code())); - response->setErrMessage(status.reason()); - - dassert(response->isValid(NULL)); -} - -bool areResponsesEqual(const BatchedCommandResponse& responseA, - const BatchedCommandResponse& responseB) { - // Note: This needs to also take into account comparing responses from legacy writes - // and write commands. - - // TODO: Better reporting of why not equal - if (responseA.getOk() != responseB.getOk()) { - return false; - } - - if (responseA.getN() != responseB.getN()) { - return false; - } - - if (responseA.isUpsertDetailsSet()) { - // TODO: - } - - if (responseA.getOk()) { - return true; - } - - // TODO: Compare errors here - - return true; -} - -bool areAllResponsesEqual(const vector<ConfigResponse*>& responses) { - BatchedCommandResponse* lastResponse = NULL; - - for (vector<ConfigResponse*>::const_iterator it = responses.begin(); it != responses.end(); - ++it) { - BatchedCommandResponse* response = &(*it)->response; - - if (lastResponse != NULL) { - if (!areResponsesEqual(*lastResponse, *response)) { - return false; - } - } - - lastResponse = response; - } - - return true; -} - -void combineResponses(const vector<ConfigResponse*>& responses, - BatchedCommandResponse* clientResponse) { - if (areAllResponsesEqual(responses)) { - responses.front()->response.cloneTo(clientResponse); - return; - } - - BSONObjBuilder builder; - for (vector<ConfigResponse*>::const_iterator it = responses.begin(); it != responses.end(); - ++it) { - builder.append((*it)->configHost.toString(), (*it)->response.toBSON()); - } - - clientResponse->setOk(false); - clientResponse->setErrCode(ErrorCodes::ManualInterventionRequired); - clientResponse->setErrMessage( - "config write was not consistent, " - "manual intervention may be required. " - "config responses: " + - builder.obj().toString()); -} - -} // namespace - - -ConfigCoordinator::ConfigCoordinator(MultiCommandDispatch* dispatcher, - const ConnectionString& configServerConnectionString) - : _dispatcher(dispatcher), _configServerConnectionString(configServerConnectionString) {} - -bool ConfigCoordinator::_checkConfigString(BatchedCommandResponse* clientResponse) { - // - // Send side - // - - for (const HostAndPort& server : _configServerConnectionString.getServers()) { - SetShardVersionRequest ssv = SetShardVersionRequest::makeForInit( - _configServerConnectionString, "config", _configServerConnectionString); - _dispatcher->addCommand(ConnectionString(server), "admin", ssv.toBSON()); - } - - _dispatcher->sendAll(); - - // - // Recv side - // - - bool ssvError = false; - while (_dispatcher->numPending() > 0) { - ConnectionString configHost; - SSVResponse response; - - // We've got to recv everything, no matter what - even if some failed. - Status dispatchStatus = _dispatcher->recvAny(&configHost, &response); - - if (ssvError) { - // record only the first failure. - continue; - } - - if (!dispatchStatus.isOK()) { - ssvError = true; - clientResponse->setOk(false); - clientResponse->setErrCode(static_cast<int>(dispatchStatus.code())); - clientResponse->setErrMessage(dispatchStatus.reason()); - } else if (!response.getOk()) { - ssvError = true; - clientResponse->setOk(false); - clientResponse->setErrMessage(response.getErrMessage()); - - if (response.isErrCodeSet()) { - clientResponse->setErrCode(response.getErrCode()); - } - } - } - - return !ssvError; -} - -/** - * The core config write functionality. - * - * Config writes run in two passes - the first is a quick check to ensure the config servers - * are all reachable, the second runs the actual write. - * - * TODO: Upgrade and move this logic to the config servers, a state machine implementation - * is probably the next step. - */ -void ConfigCoordinator::executeBatch(const BatchedCommandRequest& clientRequest, - BatchedCommandResponse* clientResponse) { - const NamespaceString nss(clientRequest.getNS()); - - // Should never use it for anything other than DBs residing on the config server - dassert(nss.db() == "config" || nss.db() == "admin"); - dassert(clientRequest.sizeWriteOps() == 1u); - - // This is an opportunistic check that all config servers look healthy by calling - // getLastError on each one of them. If there was some form of write/journaling error, get - // last error would fail. - { - for (const HostAndPort& server : _configServerConnectionString.getServers()) { - _dispatcher->addCommand( - ConnectionString(server), "admin", BSON("getLastError" << true << "fsync" << true)); - } - - _dispatcher->sendAll(); - - bool error = false; - while (_dispatcher->numPending()) { - ConnectionString host; - RawBSONSerializable response; - - Status status = _dispatcher->recvAny(&host, &response); - if (status.isOK()) { - BSONObj obj = response.toBSON(); - - LOG(3) << "Response " << obj.toString(); - - // If the ok field is anything other than 1, count it as error - if (!obj["ok"].trueValue()) { - error = true; - log() << "Config server check for host " << host - << " returned error: " << response; - } - } else if (status == ErrorCodes::IncompatibleCatalogManager) { - uassertStatusOK(status); - } else { - error = true; - log() << "Config server check for host " << host - << " failed with status: " << status; - } - } - - // All responses should have been gathered by this point - if (error) { - clientResponse->setOk(false); - clientResponse->setErrCode(ErrorCodes::RemoteValidationError); - clientResponse->setErrMessage( - "Could not verify that config servers were active" - " and reachable before write"); - return; - } - } - - if (!_checkConfigString(clientResponse)) { - return; - } - - // - // Do the actual writes - // - - BatchedCommandRequest configRequest(clientRequest.getBatchType()); - clientRequest.cloneTo(&configRequest); - configRequest.setNS(nss); - - OwnedPointerVector<ConfigResponse> responsesOwned; - vector<ConfigResponse*>& responses = responsesOwned.mutableVector(); - - // - // Send the actual config writes - // - - // Get as many batches as we can at once - for (const HostAndPort& server : _configServerConnectionString.getServers()) { - _dispatcher->addCommand(ConnectionString(server), nss.db(), configRequest.toBSON()); - } - - // Send them all out - _dispatcher->sendAll(); - - // - // Recv side - // - - while (_dispatcher->numPending() > 0) { - // Get the response - responses.push_back(new ConfigResponse()); - - ConfigResponse& configResponse = *responses.back(); - Status dispatchStatus = - _dispatcher->recvAny(&configResponse.configHost, &configResponse.response); - - if (!dispatchStatus.isOK()) { - if (dispatchStatus == ErrorCodes::IncompatibleCatalogManager) { - uassertStatusOK(dispatchStatus); - } - - buildErrorFrom(dispatchStatus, &configResponse.response); - } - } - - combineResponses(responses, clientResponse); -} - -} // namespace mongo diff --git a/src/mongo/s/catalog/legacy/config_coordinator.h b/src/mongo/s/catalog/legacy/config_coordinator.h deleted file mode 100644 index b1a3e18ca88..00000000000 --- a/src/mongo/s/catalog/legacy/config_coordinator.h +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Copyright (C) 2013 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <vector> - -#include "mongo/client/dbclientinterface.h" -#include "mongo/s/write_ops/batched_command_request.h" -#include "mongo/s/write_ops/batched_command_response.h" - -namespace mongo { - -class MultiCommandDispatch; - -class ConfigCoordinator { -public: - ConfigCoordinator(MultiCommandDispatch* dispatcher, - const ConnectionString& configServerConnectionString); - - void executeBatch(const BatchedCommandRequest& request, BatchedCommandResponse* response); - -private: - /** - * Initialize configDB string in config server or if already initialized, - * check that it matches. Returns false if an error occured. - */ - bool _checkConfigString(BatchedCommandResponse* clientResponse); - - - // Not owned here - MultiCommandDispatch* const _dispatcher; - - const ConnectionString _configServerConnectionString; -}; -} diff --git a/src/mongo/s/catalog/legacy/distlock.cpp b/src/mongo/s/catalog/legacy/distlock.cpp deleted file mode 100644 index c3cbe9e7158..00000000000 --- a/src/mongo/s/catalog/legacy/distlock.cpp +++ /dev/null @@ -1,783 +0,0 @@ -/* Copyright 2009 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/s/catalog/legacy/distlock.h" - -#include "mongo/client/dbclientcursor.h" -#include "mongo/client/connpool.h" -#include "mongo/s/catalog/type_locks.h" -#include "mongo/s/catalog/type_lockpings.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/log.h" -#include "mongo/util/timer.h" - -namespace mongo { - -MONGO_FP_DECLARE(setSCCCDistLockTimeout); - -using std::endl; -using std::list; -using std::set; -using std::string; -using std::stringstream; -using std::unique_ptr; -using std::vector; - -LabeledLevel DistributedLock::logLvl(1); -DistributedLock::LastPings DistributedLock::lastPings; - -LockException::LockException(StringData msg, int code) : LockException(msg, code, OID()) {} - -LockException::LockException(StringData msg, int code, DistLockHandle lockID) - : DBException(msg.toString(), code), _mustUnlockID(lockID) {} - -DistLockHandle LockException::getMustUnlockID() const { - return _mustUnlockID; -} - -/** - * Create a new distributed lock, potentially with a custom sleep and takeover time. If a custom - * sleep time is specified (time between pings) - */ -DistributedLock::DistributedLock(const ConnectionString& conn, - const string& name, - const std::string& processId, - unsigned long long lockTimeout) - : _conn(conn), - _name(name), - _processId(processId), - _lockId(str::stream() << _processId << ":" << getThreadName() << ":" << rand()), - _lockTimeout(lockTimeout == 0 ? LOCK_TIMEOUT : lockTimeout), - _maxClockSkew(_lockTimeout / LOCK_SKEW_FACTOR), - _maxNetSkew(_maxClockSkew), - _lockPing(_maxClockSkew) { - LOG(logLvl) << "created new distributed lock for " << name << " on " << conn - << " ( lock timeout : " << _lockTimeout << ", ping interval : " << _lockPing - << ", processId: " << _processId << ", lockId: " << _lockId << " )"; -} - -DistLockPingInfo DistributedLock::LastPings::getLastPing(const ConnectionString& conn, - const string& lockName) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _lastPings[std::make_pair(conn.toString(), lockName)]; -} - -void DistributedLock::LastPings::setLastPing(const ConnectionString& conn, - const string& lockName, - const DistLockPingInfo& pd) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - _lastPings[std::make_pair(conn.toString(), lockName)] = pd; -} - -bool DistributedLock::isRemoteTimeSkewed() const { - return !DistributedLock::checkSkew(_conn, NUM_LOCK_SKEW_CHECKS, _maxClockSkew, _maxNetSkew); -} - -const ConnectionString& DistributedLock::getRemoteConnection() const { - return _conn; -} - -const string& DistributedLock::getProcessId() const { - return _processId; -} - -const string& DistributedLock::getDistLockId() const { - return _lockId; -} - -/** - * Returns the remote time as reported by the cluster or server. The maximum difference between the - * reported time and the actual time on the remote server (at the completion of the function) is the - * maxNetSkew - */ -Date_t DistributedLock::remoteTime(const ConnectionString& cluster, unsigned long long maxNetSkew) { - ConnectionString server(*cluster.getServers().begin()); - - // Get result and delay if successful, errMsg if not - bool success = false; - BSONObj result; - string errMsg; - Milliseconds delay{0}; - - unique_ptr<ScopedDbConnection> connPtr; - try { - connPtr.reset(new ScopedDbConnection(server.toString())); - ScopedDbConnection& conn = *connPtr; - - Date_t then = jsTime(); - success = conn->runCommand(string("admin"), BSON("serverStatus" << 1), result); - delay = jsTime() - then; - - if (!success) - errMsg = result.toString(); - conn.done(); - } catch (const DBException& ex) { - if (connPtr && connPtr->get()->isFailed()) { - // Return to the pool so the pool knows about the failure - connPtr->done(); - } - - success = false; - errMsg = ex.toString(); - } - - if (!success) { - throw TimeNotFoundException(str::stream() << "could not get status from server " - << server.toString() << " in cluster " - << cluster.toString() << " to check time" - << causedBy(errMsg), - 13647); - } - - // Make sure that our delay is not more than 2x our maximum network skew, since this is the max - // our remote time value can be off by if we assume a response in the middle of the delay. - if (delay > Milliseconds(maxNetSkew * 2)) { - throw TimeNotFoundException( - str::stream() << "server " << server.toString() << " in cluster " << cluster.toString() - << " did not respond within max network delay of " << maxNetSkew << "ms", - 13648); - } - - return result["localTime"].Date() - (delay / 2); -} - -bool DistributedLock::checkSkew(const ConnectionString& cluster, - unsigned skewChecks, - unsigned long long maxClockSkew, - unsigned long long maxNetSkew) { - vector<HostAndPort> servers = cluster.getServers(); - - if (servers.size() < 1) - return true; - - vector<long long> avgSkews; - - for (unsigned i = 0; i < skewChecks; i++) { - // Find the average skew for each server - unsigned s = 0; - for (vector<HostAndPort>::iterator si = servers.begin(); si != servers.end(); ++si, s++) { - if (i == 0) - avgSkews.push_back(0); - - // Could check if this is self, but shouldn't matter since local network connection - // should be fast. - ConnectionString server(*si); - - vector<long long> skew; - - BSONObj result; - - Date_t remote = remoteTime(server, maxNetSkew); - Date_t local = jsTime(); - - // Remote time can be delayed by at most MAX_NET_SKEW - - // Skew is how much time we'd have to add to local to get to remote - avgSkews[s] += durationCount<Milliseconds>(remote - local); - - LOG(logLvl + 1) << "skew from remote server " << server - << " found: " << (remote - local); - } - } - - // Analyze skews - - long long serverMaxSkew = 0; - long long serverMinSkew = 0; - - for (unsigned s = 0; s < avgSkews.size(); s++) { - long long avgSkew = (avgSkews[s] /= skewChecks); - - // Keep track of max and min skews - if (s == 0) { - serverMaxSkew = avgSkew; - serverMinSkew = avgSkew; - } else { - if (avgSkew > serverMaxSkew) - serverMaxSkew = avgSkew; - if (avgSkew < serverMinSkew) - serverMinSkew = avgSkew; - } - } - - long long totalSkew = serverMaxSkew - serverMinSkew; - - // Make sure our max skew is not more than our pre-set limit - if (totalSkew > (long long)maxClockSkew) { - LOG(logLvl + 1) << "total clock skew of " << totalSkew << "ms for servers " << cluster - << " is out of " << maxClockSkew << "ms bounds." << endl; - return false; - } - - LOG(logLvl + 1) << "total clock skew of " << totalSkew << "ms for servers " << cluster - << " is in " << maxClockSkew << "ms bounds." << endl; - return true; -} - -Status DistributedLock::checkStatus(double timeout) { - BSONObj lockObj; - try { - ScopedDbConnection conn(_conn.toString(), timeout); - lockObj = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))).getOwned(); - conn.done(); - } catch (DBException& e) { - return e.toStatus(); - } - - if (lockObj.isEmpty()) { - return Status(ErrorCodes::LockFailed, - str::stream() << "no lock for " << _name - << " exists in the locks collection"); - } - - if (lockObj[LocksType::state()].numberInt() < LocksType::LOCKED) { - return Status(ErrorCodes::LockFailed, - str::stream() << "lock " << _name << " current state is not held (" - << lockObj[LocksType::state()].numberInt() << ")"); - } - - if (lockObj[LocksType::process()].String() != _processId) { - return Status(ErrorCodes::LockFailed, - str::stream() << "lock " << _name << " is currently being held by " - << "another process (" << lockObj[LocksType::process()].String() - << ")"); - } - - return Status::OK(); -} - -static void logErrMsgOrWarn(StringData messagePrefix, - StringData lockName, - StringData errMsg, - StringData altErrMsg) { - if (errMsg.empty()) { - LOG(DistributedLock::logLvl - 1) << messagePrefix << " '" << lockName << "' " << altErrMsg - << std::endl; - } else { - warning() << messagePrefix << " '" << lockName << "' " << causedBy(errMsg.toString()); - } -} - -// Semantics of this method are basically that if the lock cannot be acquired, returns false, -// can be retried. If the lock should not be tried again (some unexpected error), -// a LockException is thrown. -bool DistributedLock::lock_try(const OID& lockID, - const string& why, - BSONObj* other, - double timeout) { - // This should always be true, if not, we are using the lock incorrectly. - verify(_name != ""); - - auto lockTimeout = _lockTimeout; - MONGO_FAIL_POINT_BLOCK(setSCCCDistLockTimeout, customTimeout) { - const BSONObj& data = customTimeout.getData(); - lockTimeout = data["timeoutMs"].numberInt(); - } - LOG(logLvl) << "trying to acquire new distributed lock for " << _name << " on " << _conn - << " ( lock timeout : " << lockTimeout << ", ping interval : " << _lockPing - << ", process : " << _processId << " )" << endl; - - // write to dummy if 'other' is null - BSONObj dummyOther; - if (other == NULL) - other = &dummyOther; - - ScopedDbConnection conn(_conn.toString(), timeout); - - BSONObjBuilder queryBuilder; - queryBuilder.append(LocksType::name(), _name); - queryBuilder.append(LocksType::state(), LocksType::UNLOCKED); - - { - // make sure its there so we can use simple update logic below - BSONObj o = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))).getOwned(); - - // Case 1: No locks - if (o.isEmpty()) { - try { - LOG(logLvl) << "inserting initial doc in " << LocksType::ConfigNS << " for lock " - << _name << endl; - conn->insert(LocksType::ConfigNS, - BSON(LocksType::name(_name) << LocksType::state(LocksType::UNLOCKED) - << LocksType::who("") - << LocksType::lockID(OID()))); - } catch (UserException& e) { - warning() << "could not insert initial doc for distributed lock " << _name - << causedBy(e) << endl; - } - } - - // Case 2: A set lock that we might be able to force - else if (o[LocksType::state()].numberInt() > LocksType::UNLOCKED) { - string lockName = - o[LocksType::name()].String() + string("/") + o[LocksType::process()].String(); - - BSONObj lastPing = conn->findOne( - LockpingsType::ConfigNS, o[LocksType::process()].wrap(LockpingsType::process())); - if (lastPing.isEmpty()) { - LOG(logLvl) << "empty ping found for process in lock '" << lockName << "'" << endl; - // TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, - // so will a lot. - lastPing = BSON(LockpingsType::process(o[LocksType::process()].String()) - << LockpingsType::ping(Date_t())); - } - - unsigned long long elapsed = 0; - unsigned long long takeover = lockTimeout; - - DistLockPingInfo lastPingEntry = getLastPing(); - - LOG(logLvl) << "checking last ping for lock '" << lockName << "' against process " - << lastPingEntry.processId << " and ping " << lastPingEntry.lastPing; - - try { - Date_t remote = remoteTime(_conn); - - auto pingDocProcessId = lastPing[LockpingsType::process()].String(); - auto pingDocPingValue = lastPing[LockpingsType::ping()].Date(); - - // Timeout the elapsed time using comparisons of remote clock - // For non-finalized locks, timeout 15 minutes since last seen (ts) - // For finalized locks, timeout 15 minutes since last ping - bool recPingChange = o[LocksType::state()].numberInt() == LocksType::LOCKED && - (lastPingEntry.processId != pingDocProcessId || - lastPingEntry.lastPing != pingDocPingValue); - bool recTSChange = lastPingEntry.lockSessionId != o[LocksType::lockID()].OID(); - - if (recPingChange || recTSChange) { - // If the ping has changed since we last checked, mark the current date and time - setLastPing(DistLockPingInfo(pingDocProcessId, - pingDocPingValue, - remote, - o[LocksType::lockID()].OID(), - OID())); - } else { - // GOTCHA! Due to network issues, it is possible that the current time - // is less than the remote time. We *have* to check this here, otherwise - // we overflow and our lock breaks. - if (lastPingEntry.configLocalTime >= remote) - elapsed = 0; - else - elapsed = - durationCount<Milliseconds>(remote - lastPingEntry.configLocalTime); - } - } catch (LockException& e) { - // Remote server cannot be found / is not responsive - warning() << "Could not get remote time from " << _conn << causedBy(e); - // If our config server is having issues, forget all the pings until we can see it - // again - resetLastPing(); - } - - if (elapsed <= takeover) { - LOG(1) << "could not force lock '" << lockName << "' because elapsed time " - << elapsed << " <= takeover time " << takeover; - *other = o; - other->getOwned(); - conn.done(); - return false; - } - - LOG(0) << "forcing lock '" << lockName << "' because elapsed time " << elapsed - << " > takeover time " << takeover; - - if (elapsed > takeover) { - // Lock may forced, reset our timer if succeeds or fails - // Ensures that another timeout must happen if something borks up here, and resets - // our pristine ping state if acquired. - resetLastPing(); - - try { - // Check the clock skew again. If we check this before we get a lock - // and after the lock times out, we can be pretty sure the time is - // increasing at the same rate on all servers and therefore our - // timeout is accurate - if (isRemoteTimeSkewed()) { - string msg(str::stream() << "remote time in cluster " << _conn.toString() - << " is now skewed, cannot force lock."); - throw LockException(msg, ErrorCodes::DistributedClockSkewed); - } - - // Make sure we break the lock with the correct "ts" (OID) value, otherwise - // we can overwrite a new lock inserted in the meantime. - conn->update(LocksType::ConfigNS, - BSON(LocksType::name(_name) - << LocksType::state() << o[LocksType::state()].numberInt() - << LocksType::lockID(o[LocksType::lockID()].OID())), - BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED)))); - - BSONObj err = conn->getLastErrorDetailed(); - string errMsg = DBClientWithCommands::getLastErrorString(err); - - // TODO: Clean up all the extra code to exit this method, probably with a - // refactor - if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) { - logErrMsgOrWarn( - "Could not force lock", lockName, errMsg, "(another force won"); - *other = o; - other->getOwned(); - conn.done(); - return false; - } - - } catch (UpdateNotTheSame&) { - // Ok to continue since we know we forced at least one lock document, and all - // lock docs are required for a lock to be held. - warning() << "lock forcing " << lockName << " inconsistent" << endl; - } catch (const LockException&) { - // Let the exception go up and don't repackage the exception. - throw; - } catch (std::exception& e) { - conn.done(); - string msg(str::stream() << "exception forcing distributed lock " << lockName - << causedBy(e)); - throw LockException(msg, 13660); - } - - } else { - // Not strictly necessary, but helpful for small timeouts where thread - // scheduling is significant. This ensures that two attempts are still - // required for a force if not acquired, and resets our state if we - // are acquired. - resetLastPing(); - - // Test that the lock is held by trying to update the finalized state of the lock to - // the same state if it does not update or does not update on all servers, we can't - // re-enter. - try { - // Test the lock with the correct "ts" (OID) value - conn->update(LocksType::ConfigNS, - BSON(LocksType::name(_name) - << LocksType::state(LocksType::LOCKED) - << LocksType::lockID(o[LocksType::lockID()].OID())), - BSON("$set" << BSON(LocksType::state(LocksType::LOCKED)))); - - BSONObj err = conn->getLastErrorDetailed(); - string errMsg = DBClientWithCommands::getLastErrorString(err); - - // TODO: Clean up all the extra code to exit this method, probably with a - // refactor - if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) { - logErrMsgOrWarn( - "Could not re-enter lock", lockName, errMsg, "(not sure lock is held"); - *other = o; - other->getOwned(); - conn.done(); - return false; - } - - } catch (UpdateNotTheSame&) { - // NOT ok to continue since our lock isn't held by all servers, so isn't valid. - warning() << "inconsistent state re-entering lock, lock " << lockName - << " not held" << endl; - *other = o; - other->getOwned(); - conn.done(); - return false; - } catch (std::exception& e) { - conn.done(); - string msg(str::stream() << "exception re-entering distributed lock " - << lockName << causedBy(e)); - throw LockException(msg, 13660); - } - - LOG(logLvl - 1) << "re-entered distributed lock '" << lockName << "'" << endl; - *other = o.getOwned(); - conn.done(); - return true; - } - - LOG(logLvl - 1) << "lock '" << lockName << "' successfully forced" << endl; - - // We don't need the ts value in the query, since we will only ever replace locks with - // state=0. - } - // Case 3: We have an expired lock - else if (o[LocksType::lockID()].type()) { - queryBuilder.append(o[LocksType::lockID()]); - } - } - - // Always reset our ping if we're trying to get a lock, since getting a lock implies the lock - // state is open and no locks need to be forced. If anything goes wrong, we don't want to - // remember an old lock. - resetLastPing(); - - bool gotLock = false; - BSONObj currLock; - - BSONObj lockDetails = - BSON(LocksType::state(LocksType::LOCK_PREP) - << LocksType::who(getDistLockId()) << LocksType::process(_processId) - << LocksType::when(jsTime()) << LocksType::why(why) << LocksType::lockID(lockID)); - BSONObj whatIWant = BSON("$set" << lockDetails); - - BSONObj query = queryBuilder.obj(); - - string lockName = _name + string("/") + _processId; - - try { - // Main codepath to acquire lock - - LOG(logLvl) << "about to acquire distributed lock '" << lockName << "'"; - - LOG(logLvl + 1) << "trying to acquire lock " << query.toString(false, true) - << " with details " << lockDetails.toString(false, true) << endl; - - conn->update(LocksType::ConfigNS, query, whatIWant); - - BSONObj err = conn->getLastErrorDetailed(); - string errMsg = DBClientWithCommands::getLastErrorString(err); - - currLock = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))); - - if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) { - logErrMsgOrWarn("could not acquire lock", lockName, errMsg, "(another update won)"); - *other = currLock; - other->getOwned(); - gotLock = false; - } else { - gotLock = true; - } - - } catch (UpdateNotTheSame& up) { - // this means our update got through on some, but not others - warning() << "distributed lock '" << lockName << " did not propagate properly." - << causedBy(up) << endl; - - // Overall protection derives from: - // All unlocking updates use the ts value when setting state to 0 - // This ensures that during locking, we can override all smaller ts locks with - // our own safe ts value and not be unlocked afterward. - for (unsigned i = 0; i < up.size(); i++) { - ScopedDbConnection indDB(up[i].first); - BSONObj indUpdate; - - try { - indUpdate = indDB->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))); - const auto currentLockID = indUpdate[LocksType::lockID()].OID(); - // If we override this lock in any way, grab and protect it. - // We assume/ensure that if a process does not have all lock documents, it is no - // longer holding the lock. - // Note - finalized locks may compete too, but we know they've won already if - // competing in this round. Cleanup of crashes during finalizing may take a few - // tries. - if (currentLockID < lockID || - indUpdate[LocksType::state()].numberInt() == LocksType::UNLOCKED) { - BSONObj grabQuery = - BSON(LocksType::name(_name) << LocksType::lockID(currentLockID)); - - // Change ts so we won't be forced, state so we won't be relocked - BSONObj grabChanges = - BSON(LocksType::lockID(lockID) << LocksType::state(LocksType::LOCK_PREP)); - - // Either our update will succeed, and we'll grab the lock, or it will fail b/c - // some other process grabbed the lock (which will change the ts), but the lock - // will be set until forcing - indDB->update(LocksType::ConfigNS, grabQuery, BSON("$set" << grabChanges)); - - indUpdate = indDB->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))); - - // The tournament was interfered and it is not safe to proceed further. - // One case this could happen is when the LockPinger processes old - // entries from addUnlockOID. See SERVER-10688 for more detailed - // description of race. - if (indUpdate[LocksType::state()].numberInt() <= LocksType::UNLOCKED) { - LOG(logLvl - 1) << "lock tournament interrupted, " - << "so no lock was taken; " - << "new state of lock: " << indUpdate << endl; - - // We now break and set our currLock lockID value to zero, so that - // we know that we did not acquire the lock below. Later code will - // cleanup failed entries. - currLock = BSON(LocksType::lockID(OID())); - indDB.done(); - break; - } - } - // else our lock is the same, in which case we're safe, or it's a bigger lock, - // in which case we won't need to protect anything since we won't have the lock. - - } catch (std::exception& e) { - conn.done(); - string msg(str::stream() << "distributed lock " << lockName - << " had errors communicating with individual server " - << up[1].first << causedBy(e)); - throw LockException(msg, 13661, lockID); - } - - verify(!indUpdate.isEmpty()); - - // Find max TS value - if (currLock.isEmpty() || - currLock[LocksType::lockID()] < indUpdate[LocksType::lockID()]) { - currLock = indUpdate.getOwned(); - } - - indDB.done(); - } - - // Locks on all servers are now set and safe until forcing - - if (currLock[LocksType::lockID()].OID() == lockID) { - LOG(logLvl - 1) << "lock update won, completing lock propagation for '" << lockName - << "'" << endl; - gotLock = true; - } else { - LOG(logLvl - 1) << "lock update lost, lock '" << lockName << "' not propagated." - << endl; - gotLock = false; - } - } catch (std::exception& e) { - conn.done(); - string msg(str::stream() << "exception creating distributed lock " << lockName - << causedBy(e)); - throw LockException(msg, 13663, lockID); - } - - // Complete lock propagation - if (gotLock) { - // This is now safe, since we know that no new locks will be placed on top of the ones we've - // checked for at least 15 minutes. Sets the state = 2, so that future clients can - // determine that the lock is truly set. The invariant for rollbacks is that we will never - // force locks with state = 2 and active pings, since that indicates the lock is active, but - // this means the process creating/destroying them must explicitly poll when something goes - // wrong. - try { - BSONObjBuilder finalLockDetails; - BSONObjIterator bi(lockDetails); - while (bi.more()) { - BSONElement el = bi.next(); - if ((string)(el.fieldName()) == LocksType::state()) - finalLockDetails.append(LocksType::state(), LocksType::LOCKED); - else - finalLockDetails.append(el); - } - - conn->update(LocksType::ConfigNS, - BSON(LocksType::name(_name) << LocksType::state() - << BSON("$eq" << LocksType::LOCK_PREP)), - BSON("$set" << finalLockDetails.obj())); - - BSONObj err = conn->getLastErrorDetailed(); - string errMsg = DBClientWithCommands::getLastErrorString(err); - - currLock = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))); - - if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 || - currLock[LocksType::lockID()].OID() != lockID) { - warning() << "could not finalize winning lock " << lockName - << (!errMsg.empty() ? causedBy(errMsg) : " (did not update lock) ") - << ", winning lock: " << currLock; - gotLock = false; - } else { - // SUCCESS! - gotLock = true; - } - - } catch (std::exception& e) { - conn.done(); - string msg(str::stream() << "exception finalizing winning lock" << causedBy(e)); - // Inform caller about the potential orphan lock. - throw LockException(msg, 13662, lockID); - } - } - - *other = currLock; - other->getOwned(); - - // Log our lock results - if (gotLock) - LOG(logLvl - 1) << "distributed lock '" << lockName << "' acquired for '" << why - << "', ts : " << currLock[LocksType::lockID()].OID(); - else - LOG(logLvl - 1) << "distributed lock '" << lockName << "' was not acquired."; - - conn.done(); - - return gotLock; -} - -// This function *must not* throw exceptions, since it can be used in destructors - failure -// results in queuing and trying again later. -bool DistributedLock::unlock(const DistLockHandle& lockID) { - verify(_name != ""); - string lockName = _name + string("/") + _processId; - - const int maxAttempts = 3; - int attempted = 0; - - while (++attempted <= maxAttempts) { - // Awkward, but necessary since the constructor itself throws exceptions - unique_ptr<ScopedDbConnection> connPtr; - - try { - connPtr.reset(new ScopedDbConnection(_conn.toString())); - ScopedDbConnection& conn = *connPtr; - - // Use ts when updating lock, so that new locks can be sure they won't get trampled. - conn->update(LocksType::ConfigNS, - BSON(LocksType::name(_name) << LocksType::lockID(lockID)), - BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED)))); - - // Check that the lock was actually unlocked... if not, try again - BSONObj err = conn->getLastErrorDetailed(); - string errMsg = DBClientWithCommands::getLastErrorString(err); - - if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) { - warning() << "distributed lock unlock update failed, retrying " - << (errMsg.empty() ? causedBy("( update not registered )") - : causedBy(errMsg)) << endl; - conn.done(); - continue; - } - - LOG(0) << "distributed lock '" << lockName << "' unlocked. "; - conn.done(); - return true; - } catch (const UpdateNotTheSame&) { - LOG(0) << "distributed lock '" << lockName << "' unlocked (messily). "; - // This isn't a connection problem, so don't throw away the conn - connPtr->done(); - return true; - } catch (std::exception& e) { - warning() << "distributed lock '" << lockName << "' failed unlock attempt." - << causedBy(e) << endl; - - // TODO: If our lock timeout is small, sleeping this long may be unsafe. - if (attempted != maxAttempts) - sleepsecs(1 << attempted); - } - } - - return false; -} -} diff --git a/src/mongo/s/catalog/legacy/distlock.h b/src/mongo/s/catalog/legacy/distlock.h deleted file mode 100644 index 4247d057d2a..00000000000 --- a/src/mongo/s/catalog/legacy/distlock.h +++ /dev/null @@ -1,240 +0,0 @@ -// distlock.h - -/* Copyright 2009 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include "mongo/client/connpool.h" -#include "mongo/client/syncclusterconnection.h" -#include "mongo/logger/labeled_level.h" -#include "mongo/s/catalog/dist_lock_manager.h" -#include "mongo/s/catalog/dist_lock_ping_info.h" - -namespace mongo { - -namespace { - -enum TimeConstants { - LOCK_TIMEOUT = 15 * 60 * 1000, - LOCK_SKEW_FACTOR = 30, - LOCK_PING = LOCK_TIMEOUT / LOCK_SKEW_FACTOR, - MAX_LOCK_NET_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR, - MAX_LOCK_CLOCK_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR, - NUM_LOCK_SKEW_CHECKS = 3, -}; - -// The maximum clock skew we need to handle between config servers is -// 2 * MAX_LOCK_NET_SKEW + MAX_LOCK_CLOCK_SKEW. - -// Net effect of *this* clock being slow is effectively a multiplier on the max net skew -// and a linear increase or decrease of the max clock skew. -} - -/** - * Exception class to encapsulate exceptions while managing distributed locks - */ -class LockException : public DBException { -public: - LockException(StringData msg, int code); - - /** - * Use this to signal that a lock with lockID needs to be unlocked. For example, in cases - * where the final lock acquisition was not propagated properly to all config servers. - */ - LockException(StringData msg, int code, DistLockHandle lockID); - - /** - * Returns the OID of the lock that needs to be unlocked. - */ - DistLockHandle getMustUnlockID() const; - - virtual ~LockException() = default; - -private: - // The identifier of a lock that needs to be unlocked. - DistLockHandle _mustUnlockID; -}; - -/** - * Indicates an error in retrieving time values from remote servers. - */ -class TimeNotFoundException : public LockException { -public: - TimeNotFoundException(const char* msg, int code) : LockException(msg, code) {} - TimeNotFoundException(const std::string& msg, int code) : LockException(msg, code) {} - virtual ~TimeNotFoundException() = default; -}; - -/** - * The distributed lock is a configdb backed way of synchronizing system-wide tasks. A task - * must be identified by a unique name across the system (e.g., "balancer"). A lock is taken - * by writing a document in the configdb's locks collection with that name. - * - * To be maintained, each taken lock needs to be revalidated ("pinged") within a - * pre-established amount of time. This class does this maintenance automatically once a - * DistributedLock object was constructed. The ping procedure records the local time to - * the ping document, but that time is untrusted and is only used as a point of reference - * of whether the ping was refreshed or not. Ultimately, the clock a configdb is the source - * of truth when determining whether a ping is still fresh or not. This is achieved by - * (1) remembering the ping document time along with config server time when unable to - * take a lock, and (2) ensuring all config servers report similar times and have similar - * time rates (the difference in times must start and stay small). - * - * Lock states include: - * 0: unlocked - * 1: about to be locked - * 2: locked - * - * Valid state transitions: - * 0 -> 1 - * 1 -> 2 - * 2 -> 0 - * - * Note that at any point in time, a lock can be force unlocked if the ping for the lock - * becomes too stale. - */ -class DistributedLock { -public: - static logger::LabeledLevel logLvl; - - class LastPings { - public: - DistLockPingInfo getLastPing(const ConnectionString& conn, const std::string& lockName); - void setLastPing(const ConnectionString& conn, - const std::string& lockName, - const DistLockPingInfo& pd); - - stdx::mutex _mutex; - std::map<std::pair<std::string, std::string>, DistLockPingInfo> _lastPings; - }; - - static LastPings lastPings; - - /** - * The constructor does not connect to the configdb yet and constructing does not mean the lock - * was acquired. Construction does trigger a lock "pinging" mechanism, though. - * - * @param conn address of config(s) server(s) - * @param name identifier for the lock - * @param processId this process's distributed lock process ID - * @param lockTimeout how long can the log go "unpinged" before a new attempt to lock steals it - * (in minutes). - * - */ - DistributedLock(const ConnectionString& conn, - const std::string& name, - const std::string& processId, - unsigned long long lockTimeout = 0); - ~DistributedLock(){}; - - /** - * Attempts to acquire 'this' lock, checking if it could or should be stolen from the previous - * holder. Please consider using the dist_lock_try construct to acquire this lock in an - * exception safe way. - * - * @param lockID the lockID to use for acquiring the lock. - * @param why human readable description of why the lock is being taken (used to log) - * @param other configdb's lock document that is currently holding the lock, if lock is taken, - * or our own lock details if not - * @return true if it managed to grab the lock - */ - bool lock_try(const OID& lockID, - const std::string& why, - BSONObj* other = 0, - double timeout = 0.0); - - /** - * Returns OK if this lock is held (but does not guarantee that this owns it) and - * it was possible to confirm that, within 'timeout' seconds, if provided, with the - * config servers. - */ - Status checkStatus(double timeout); - - /** - * Releases a previously taken lock. Returns true on success. - */ - bool unlock(const OID& lockID); - - bool isRemoteTimeSkewed() const; - - const std::string& getProcessId() const; - - const std::string& getDistLockId() const; - - const ConnectionString& getRemoteConnection() const; - - /** - * Checks the skew among a cluster of servers and returns true if the min and max clock - * times among the servers are within maxClockSkew. - */ - static bool checkSkew(const ConnectionString& cluster, - unsigned skewChecks = NUM_LOCK_SKEW_CHECKS, - unsigned long long maxClockSkew = MAX_LOCK_CLOCK_SKEW, - unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW); - - /** - * Get the remote time from a server or cluster - */ - static Date_t remoteTime(const ConnectionString& cluster, - unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW); - - /** - * Namespace for lock pings - */ - static const std::string lockPingNS; - - /** - * Namespace for locks - */ - static const std::string locksNS; - - const ConnectionString _conn; - const std::string _name; - const std::string _processId; - const std::string _lockId; - - // Timeout for lock, usually LOCK_TIMEOUT - const unsigned long long _lockTimeout; - const unsigned long long _maxClockSkew; - const unsigned long long _maxNetSkew; - const unsigned long long _lockPing; - -private: - void resetLastPing() { - lastPings.setLastPing(_conn, _name, DistLockPingInfo()); - } - - void setLastPing(const DistLockPingInfo& pd) { - lastPings.setLastPing(_conn, _name, pd); - } - - DistLockPingInfo getLastPing() { - return lastPings.getLastPing(_conn, _name); - } -}; -} diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp deleted file mode 100644 index 4b3eb1134fa..00000000000 --- a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp +++ /dev/null @@ -1,229 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/s/catalog/legacy/legacy_dist_lock_manager.h" - -#include "mongo/s/catalog/type_locks.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/log.h" -#include "mongo/util/timer.h" - -namespace mongo { - -using std::string; -using std::unique_ptr; -using stdx::chrono::milliseconds; - - -namespace { -const stdx::chrono::seconds kDefaultSocketTimeout(30); -const milliseconds kDefaultPingInterval(30 * 1000); -} // unnamed namespace - -bool LegacyDistLockManager::_pingerEnabled = true; - -LegacyDistLockManager::LegacyDistLockManager(ConnectionString configServer, - const std::string& processId) - : _configServer(std::move(configServer)), _processId(processId), _isStopped(false) {} - -void LegacyDistLockManager::startUp() { - stdx::lock_guard<stdx::mutex> sl(_mutex); - invariant(!_pinger); - _pinger = stdx::make_unique<LegacyDistLockPinger>(); - - if (_pingerEnabled) { - uassertStatusOK(_pinger->startup(_configServer, _processId, kDefaultPingInterval)); - } -} - -void LegacyDistLockManager::shutDown(OperationContext* txn, bool allowNetworking) { - stdx::unique_lock<stdx::mutex> sl(_mutex); - _isStopped = true; - - while (!_lockMap.empty()) { - _noLocksCV.wait(sl); - } - - if (_pinger) { - _pinger->shutdown(allowNetworking); - } -} - -std::string LegacyDistLockManager::getProcessID() { - return _processId; -} - -StatusWith<DistLockManager::ScopedDistLock> LegacyDistLockManager::lock( - OperationContext* txn, - StringData name, - StringData whyMessage, - milliseconds waitFor, - milliseconds lockTryInterval) { - auto distLock = stdx::make_unique<DistributedLock>(_configServer, name.toString(), _processId); - - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - - if (_isStopped) { - return Status(ErrorCodes::LockBusy, "legacy distlock manager is stopped"); - } - } - - auto lastStatus = - Status(ErrorCodes::LockBusy, str::stream() << "timed out waiting for " << name); - - Timer timer; - Timer msgTimer; - while (waitFor <= milliseconds::zero() || milliseconds(timer.millis()) < waitFor) { - bool acquired = false; - BSONObj lockDoc; - OID lockID(OID::gen()); - - try { - acquired = distLock->lock_try(lockID, - whyMessage.toString(), - &lockDoc, - durationCount<Seconds>(kDefaultSocketTimeout)); - - if (!acquired) { - lastStatus = Status(ErrorCodes::LockBusy, - str::stream() << "Lock for " << whyMessage << " is taken."); - // cleanup failed attempt to acquire lock. - _pinger->addUnlockOID(lockID); - } - } catch (const LockException& lockExcep) { - OID needUnlockID(lockExcep.getMustUnlockID()); - if (needUnlockID.isSet()) { - _pinger->addUnlockOID(needUnlockID); - } - - lastStatus = lockExcep.toStatus(); - } catch (...) { - lastStatus = exceptionToStatus(); - _pinger->addUnlockOID(lockID); - } - - if (acquired) { - verify(!lockDoc.isEmpty()); - - auto locksTypeResult = LocksType::fromBSON(lockDoc); - if (!locksTypeResult.isOK()) { - return StatusWith<ScopedDistLock>( - ErrorCodes::UnsupportedFormat, - str::stream() << "error while parsing lock document: " << lockDoc << " : " - << locksTypeResult.getStatus().toString()); - } - const LocksType& lock = locksTypeResult.getValue(); - dassert(lock.isLockIDSet()); - - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - _lockMap.insert(std::make_pair(lock.getLockID(), std::move(distLock))); - } - - return ScopedDistLock(txn, lock.getLockID(), this); - } - - if (waitFor == milliseconds::zero()) - break; - - if (lastStatus != ErrorCodes::LockBusy) { - return lastStatus; - } - - // Periodically message for debugging reasons - if (msgTimer.seconds() > 10) { - log() << "waited " << timer.seconds() << "s for distributed lock " << name << " for " - << whyMessage << ": " << lastStatus.toString(); - - msgTimer.reset(); - } - - milliseconds timeRemaining = - std::max(milliseconds::zero(), waitFor - milliseconds(timer.millis())); - sleepFor(std::min(lockTryInterval, timeRemaining)); - } - - return lastStatus; -} - -void LegacyDistLockManager::unlock(OperationContext* txn, - const DistLockHandle& lockHandle) BOOST_NOEXCEPT { - unique_ptr<DistributedLock> distLock; - - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - auto iter = _lockMap.find(lockHandle); - invariant(iter != _lockMap.end()); - - distLock = std::move(iter->second); - _lockMap.erase(iter); - } - - if (!distLock->unlock(lockHandle)) { - _pinger->addUnlockOID(lockHandle); - } - - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - if (_lockMap.empty()) { - _noLocksCV.notify_all(); - } - } -} - -Status LegacyDistLockManager::checkStatus(OperationContext* txn, const DistLockHandle& lockHandle) { - // Note: this should not happen when locks are managed through ScopedDistLock. - if (_pinger->willUnlockOID(lockHandle)) { - return Status(ErrorCodes::LockFailed, - str::stream() << "lock " << lockHandle << " is not held and " - << "is currently being scheduled for lazy unlock"); - } - - DistributedLock* distLock = nullptr; - - { - // Assumption: lockHandles are never shared across threads. - stdx::lock_guard<stdx::mutex> sl(_mutex); - auto iter = _lockMap.find(lockHandle); - invariant(iter != _lockMap.end()); - - distLock = iter->second.get(); - } - - return distLock->checkStatus(durationCount<Seconds>(kDefaultSocketTimeout)); -} - -void LegacyDistLockManager::unlockAll(OperationContext* txn, const std::string& processID) { - fassertFailed(34367); // Only supported for CSRS -} -} diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h deleted file mode 100644 index 3f0d00db115..00000000000 --- a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <map> -#include <memory> - -#include "mongo/bson/oid.h" -#include "mongo/client/connection_string.h" -#include "mongo/s/catalog/dist_lock_manager.h" -#include "mongo/s/catalog/legacy/distlock.h" -#include "mongo/s/catalog/legacy/legacy_dist_lock_pinger.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/thread.h" - -namespace mongo { - -class DistributedLock; - -class LegacyDistLockManager : public DistLockManager { -public: - explicit LegacyDistLockManager(ConnectionString configServer, const std::string& processId); - - virtual ~LegacyDistLockManager() = default; - - virtual void startUp() override; - virtual void shutDown(OperationContext* txn, bool allowNetworking) override; - - virtual std::string getProcessID() override; - - virtual StatusWith<DistLockManager::ScopedDistLock> lock( - OperationContext* txn, - StringData name, - StringData whyMessage, - stdx::chrono::milliseconds waitFor, - stdx::chrono::milliseconds lockTryInterval) override; - - virtual void unlockAll(OperationContext* txn, const std::string& processID) override; - - // For testing only. Must be called before any calls to startUp(). - static void disablePinger() { - _pingerEnabled = false; - } - -protected: - virtual void unlock(OperationContext* txn, - const DistLockHandle& lockHandle) BOOST_NOEXCEPT override; - - virtual Status checkStatus(OperationContext* txn, const DistLockHandle& lockHandle) override; - -private: - const ConnectionString _configServer; - - // Identifier of this process for determining ownership of distributed locks. - std::string _processId; - - stdx::mutex _mutex; - stdx::condition_variable _noLocksCV; - std::map<DistLockHandle, std::unique_ptr<DistributedLock>> _lockMap; - - std::unique_ptr<LegacyDistLockPinger> _pinger; - - bool _isStopped; - static bool _pingerEnabled; -}; -} diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp deleted file mode 100644 index 951c48d67cf..00000000000 --- a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp +++ /dev/null @@ -1,337 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/s/catalog/legacy/legacy_dist_lock_pinger.h" - -#include "mongo/client/connpool.h" -#include "mongo/s/catalog/legacy/distlock.h" -#include "mongo/s/catalog/type_lockpings.h" -#include "mongo/s/catalog/type_locks.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/exit.h" -#include "mongo/util/log.h" - -namespace mongo { - -using std::string; - -namespace { -string pingThreadId(const ConnectionString& conn, const string& processId) { - return conn.toString() + "/" + processId; -} -} - -void LegacyDistLockPinger::_distLockPingThread(ConnectionString addr, - const string& process, - Milliseconds sleepTime) { - setThreadName("LockPinger"); - - string pingId = pingThreadId(addr, process); - - LOG(0) << "creating distributed lock ping thread for " << addr << " and process " << process - << " (sleeping for " << sleepTime << ")"; - - static int loops = 0; - Date_t lastPingTime = jsTime(); - while (!shouldStopPinging(addr, process)) { - LOG(3) << "distributed lock pinger '" << pingId << "' about to ping."; - - Date_t pingTime; - - try { - ScopedDbConnection conn(addr.toString(), 30.0); - - pingTime = jsTime(); - const auto elapsed = pingTime - lastPingTime; - if (elapsed > 10 * sleepTime) { - warning() << "Lock pinger for addr: " << addr << ", proc: " << process - << " was inactive for " << elapsed; - } - - lastPingTime = pingTime; - - // Refresh the entry corresponding to this process in the lockpings collection. - conn->update(LockpingsType::ConfigNS, - BSON(LockpingsType::process(process)), - BSON("$set" << BSON(LockpingsType::ping(pingTime))), - true); - - string err = conn->getLastError(); - if (!err.empty()) { - warning() << "pinging failed for distributed lock pinger '" << pingId << "'." - << causedBy(err); - conn.done(); - - if (!shouldStopPinging(addr, process)) { - waitTillNextPingTime(sleepTime); - } - continue; - } - - // Remove really old entries from the lockpings collection if they're not - // holding a lock. This may happen if an instance of a process was taken down - // and no new instance came up to replace it for a quite a while. - // NOTE this is NOT the same as the standard take-over mechanism, which forces - // the lock entry. - BSONObj fieldsToReturn = BSON(LocksType::state() << 1 << LocksType::process() << 1); - auto activeLocks = conn->query(LocksType::ConfigNS, - BSON(LocksType::state() << NE << LocksType::UNLOCKED)); - - uassert(16060, - str::stream() << "cannot query locks collection on config server " - << conn.getHost(), - activeLocks.get()); - - std::set<string> pids; - while (activeLocks->more()) { - BSONObj lock = activeLocks->nextSafe(); - - if (!lock[LocksType::process()].eoo()) { - pids.insert(lock[LocksType::process()].str()); - } else { - warning() << "found incorrect lock document during lock ping cleanup: " - << lock.toString(); - } - } - - // This can potentially delete ping entries that are actually active (if the clock - // of another pinger is too skewed). This is still fine as the lock logic only - // checks if there is a change in the ping document and the document going away - // is a valid change. - Date_t fourDays = pingTime - stdx::chrono::hours{4 * 24}; - conn->remove(LockpingsType::ConfigNS, - BSON(LockpingsType::process() << NIN << pids << LockpingsType::ping() << LT - << fourDays)); - err = conn->getLastError(); - - if (!err.empty()) { - warning() << "ping cleanup for distributed lock pinger '" << pingId << " failed." - << causedBy(err); - conn.done(); - - if (!shouldStopPinging(addr, process)) { - waitTillNextPingTime(sleepTime); - } - continue; - } - - LOG(1 - (loops % 10 == 0 ? 1 : 0)) << "cluster " << addr << " pinged successfully at " - << pingTime << " by distributed lock pinger '" - << pingId << "', sleeping for " << sleepTime; - - // Remove old locks, if possible - // Make sure no one else is adding to this list at the same time - stdx::lock_guard<stdx::mutex> lk(_mutex); - - int numOldLocks = _unlockList.size(); - if (numOldLocks > 0) { - LOG(0) << "trying to delete " << _unlockList.size() - << " old lock entries for process " << process; - } - - bool removed = false; - for (auto iter = _unlockList.begin(); iter != _unlockList.end(); - iter = (removed ? _unlockList.erase(iter) : ++iter)) { - removed = false; - try { - // Got DistLockHandle from lock, so we don't need to specify _id again - conn->update(LocksType::ConfigNS, - BSON(LocksType::lockID(*iter)), - BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED)))); - - // Either the update went through or it didn't, - // either way we're done trying to unlock. - LOG(0) << "handled late remove of old distributed lock with ts " << *iter; - removed = true; - } catch (UpdateNotTheSame&) { - LOG(0) << "partially removed old distributed lock with ts " << *iter; - removed = true; - } catch (std::exception& e) { - warning() << "could not remove old distributed lock with ts " << *iter - << causedBy(e); - } - } - - if (numOldLocks > 0 && _unlockList.size() > 0) { - LOG(0) << "not all old lock entries could be removed for process " << process; - } - - conn.done(); - - } catch (std::exception& e) { - warning() << "distributed lock pinger '" << pingId - << "' detected an exception while pinging." << causedBy(e); - } - - if (!shouldStopPinging(addr, process)) { - waitTillNextPingTime(sleepTime); - } - } - - warning() << "removing distributed lock ping thread '" << pingId << "'"; - - if (shouldStopPinging(addr, process)) { - acknowledgeStopPing(addr, process); - } -} - -void LegacyDistLockPinger::distLockPingThread(ConnectionString addr, - long long clockSkew, - const std::string& processId, - stdx::chrono::milliseconds sleepTime) { - try { - jsTimeVirtualThreadSkew(clockSkew); - _distLockPingThread(addr, processId, sleepTime); - } catch (std::exception& e) { - error() << "unexpected error while running distributed lock pinger for " << addr - << ", process " << processId << causedBy(e); - } catch (...) { - error() << "unknown error while running distributed lock pinger for " << addr - << ", process " << processId; - } -} - -Status LegacyDistLockPinger::startup(const ConnectionString& configServerConnectionString, - const std::string& processID, - Milliseconds sleepTime) { - string pingID = pingThreadId(configServerConnectionString, processID); - - { - // Make sure we don't start multiple threads for a process id. - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_inShutdown) { - return Status(ErrorCodes::ShutdownInProgress, "shutting down, will not start ping"); - } - - // Ignore if we already have a pinging thread for this process. - if (_seen.count(pingID) > 0) { - return Status::OK(); - } - - stdx::thread thread(stdx::bind(&LegacyDistLockPinger::distLockPingThread, - this, - configServerConnectionString, - getJSTimeVirtualThreadSkew(), - processID, - sleepTime)); - _pingThreads.insert(std::make_pair(pingID, std::move(thread))); - - _seen.insert(pingID); - } - - return Status::OK(); -} - -void LegacyDistLockPinger::addUnlockOID(const DistLockHandle& lockID) { - // Modifying the lock from some other thread - stdx::lock_guard<stdx::mutex> lk(_mutex); - _unlockList.push_back(lockID); -} - -bool LegacyDistLockPinger::willUnlockOID(const DistLockHandle& lockID) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return find(_unlockList.begin(), _unlockList.end(), lockID) != _unlockList.end(); -} - -void LegacyDistLockPinger::stopPing(const ConnectionString& conn, const string& processId) { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - string pingId = pingThreadId(conn, processId); - - verify(_seen.count(pingId) > 0); - _kill.insert(pingId); - _pingStoppedCV.notify_all(); - } -} - -void LegacyDistLockPinger::shutdown(bool allowNetworking) { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _inShutdown = true; - _allowNetworkingInShutdown = allowNetworking; - _pingStoppedCV.notify_all(); - } - - // Don't grab _mutex, otherwise will deadlock trying to join. Safe to read - // _pingThreads since it cannot be modified once _shutdown is true. - for (auto& idToThread : _pingThreads) { - if (idToThread.second.joinable()) { - idToThread.second.join(); - } - } -} - -bool LegacyDistLockPinger::shouldStopPinging(const ConnectionString& conn, - const string& processId) { - if (inShutdown()) { - return true; - } - - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_inShutdown) { - return true; - } - - return _kill.count(pingThreadId(conn, processId)) > 0; -} - -void LegacyDistLockPinger::acknowledgeStopPing(const ConnectionString& addr, - const string& processId) { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - string pingId = pingThreadId(addr, processId); - - _kill.erase(pingId); - _seen.erase(pingId); - - if (!_allowNetworkingInShutdown) { - return; - } - } - - try { - ScopedDbConnection conn(addr.toString(), 30.0); - conn->remove(LockpingsType::ConfigNS, BSON(LockpingsType::process(processId))); - } catch (const DBException& ex) { - warning() << "Error encountered while stopping ping on " << processId << causedBy(ex); - } -} - -void LegacyDistLockPinger::waitTillNextPingTime(stdx::chrono::milliseconds duration) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _pingStoppedCV.wait_for(lk, duration); -} -} diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h deleted file mode 100644 index 1155ca544ca..00000000000 --- a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <list> -#include <set> -#include <string> - -#include "mongo/base/status.h" -#include "mongo/client/dbclientinterface.h" -#include "mongo/s/catalog/dist_lock_manager.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/concurrency/mutex.h" -#include "mongo/util/time_support.h" - -namespace mongo { - -class DistributedLock; - -class LegacyDistLockPinger { -public: - LegacyDistLockPinger() = default; - - /** - * Starts the pinger thread for a given processID. - * Note: this pinger does not support being started up after it was stopped, either by a call - * to stopPing or shutdown. - */ - Status startup(const ConnectionString& configServerConnectionString, - const std::string& processID, - Milliseconds sleepTime); - - /** - * Adds a distributed lock that has the given id to the unlock list. The unlock list - * contains the list of locks that this pinger will repeatedly attempt to unlock until - * it succeeds. - */ - void addUnlockOID(const DistLockHandle& lockID); - - /** - * Returns true if the given lock id is currently in the unlock queue. - */ - bool willUnlockOID(const DistLockHandle& lockID); - - /** - * For testing only: non-blocking call to stop pinging the given process id. - */ - void stopPing(const ConnectionString& conn, const std::string& processId); - - /** - * Kills all ping threads and wait for them to cleanup. - */ - void shutdown(bool allowNetworking); - -private: - /** - * Helper method for calling _distLockPingThread. - */ - void distLockPingThread(ConnectionString addr, - long long clockSkew, - const std::string& processId, - Milliseconds sleepTime); - - /** - * Function for repeatedly pinging the process id. Also attempts to unlock all the - * locks in the unlock list. - */ - void _distLockPingThread(ConnectionString addr, - const std::string& process, - Milliseconds sleepTime); - - /** - * Returns true if a request has been made to stop pinging the give process id. - */ - bool shouldStopPinging(const ConnectionString& conn, const std::string& processId); - - /** - * Acknowledge the stop ping request and performs the necessary cleanup. - */ - void acknowledgeStopPing(const ConnectionString& conn, const std::string& processId); - - /** - * Blocks until duration has elapsed or if the ping thread is interrupted. - */ - void waitTillNextPingTime(Milliseconds duration); - - // - // All member variables are labeled with one of the following codes indicating the - // synchronization rules for accessing them. - // - // (M) Must hold _mutex for access. - - stdx::mutex _mutex; - - // Triggered everytime a pinger thread is stopped. - stdx::condition_variable _pingStoppedCV; // (M) - - // pingID -> thread - // This can contain multiple elements in tests, but in tne normal case, this will - // contain only a single element. - // Note: can be safely read when _inShutdown is true. - std::map<std::string, stdx::thread> _pingThreads; // (M*) - - // Contains the list of process id to stopPing. - std::set<std::string> _kill; // (M) - - // Contains all of the process id to ping. - std::set<std::string> _seen; // (M) - - // Contains all lock ids to keeping on retrying to unlock until success. - std::list<DistLockHandle> _unlockList; // (M) - - bool _inShutdown = false; // (M) - bool _allowNetworkingInShutdown = true; // (M) -}; -} diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index 52feade2277..53a30764490 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -48,10 +48,6 @@ public: explicit CatalogManagerReplicaSet(std::unique_ptr<DistLockManager> distLockManager); virtual ~CatalogManagerReplicaSet(); - ConfigServerMode getMode() override { - return ConfigServerMode::CSRS; - } - /** * Safe to call multiple times as long as they */ diff --git a/src/mongo/s/client/sharding_connection_hook.cpp b/src/mongo/s/client/sharding_connection_hook.cpp index 05bf53722a6..7f325611db9 100644 --- a/src/mongo/s/client/sharding_connection_hook.cpp +++ b/src/mongo/s/client/sharding_connection_hook.cpp @@ -136,14 +136,13 @@ void ShardingConnectionHook::onCreate(DBClientBase* conn) { return _shardingRequestMetadataWriter(_shardedConnections, metadataBob, hostStringData); }); - // For every SCC created, add a hook that will allow fastest-config-first config reads if - // the appropriate server options are set. if (conn->type() == ConnectionString::SYNC) { - SyncClusterConnection* scc = dynamic_cast<SyncClusterConnection*>(conn); - if (scc) { - scc->attachQueryHandler(new SCCFastQueryHandler); - } - } else if (conn->type() == ConnectionString::MASTER) { + throw UserException(ErrorCodes::UnsupportedFormat, + str::stream() << "Unrecognized connection string type: " << conn->type() + << "."); + } + + if (conn->type() == ConnectionString::MASTER) { BSONObj isMasterResponse; if (!conn->runCommand("admin", BSON("ismaster" << 1), isMasterResponse)) { uassertStatusOK(getStatusFromCommandResult(isMasterResponse)); @@ -163,12 +162,6 @@ void ShardingConnectionHook::onCreate(DBClientBase* conn) { << ". Expected either 0 or 1", configServerModeNumber == 0 || configServerModeNumber == 1); - BSONElement setName = isMasterResponse["setName"]; - status = grid.forwardingCatalogManager()->scheduleReplaceCatalogManagerIfNeeded( - configServerModeNumber == 0 ? CatalogManager::ConfigServerMode::SCCC - : CatalogManager::ConfigServerMode::CSRS, - setName.type() == String ? setName.valueStringData() : StringData(), - static_cast<DBClientConnection*>(conn)->getServerHostAndPort()); uassertStatusOK(status); } } diff --git a/src/mongo/s/client/sharding_network_connection_hook.cpp b/src/mongo/s/client/sharding_network_connection_hook.cpp index 6652f6a19e1..0b03b2cdd82 100644 --- a/src/mongo/s/client/sharding_network_connection_hook.cpp +++ b/src/mongo/s/client/sharding_network_connection_hook.cpp @@ -58,6 +58,7 @@ Status ShardingNetworkConnectionHook::validateHostImpl( long long configServerModeNumber; auto status = bsonExtractIntegerField(isMasterReply.data, "configsvr", &configServerModeNumber); + // TODO SERVER-22320 fix should collapse the switch to only NoSuchKey handling switch (status.code()) { case ErrorCodes::OK: { @@ -67,12 +68,7 @@ Status ShardingNetworkConnectionHook::validateHostImpl( str::stream() << "Surprised to discover that " << remoteHost.toString() << " believes it is a config server"}; } - using ConfigServerMode = CatalogManager::ConfigServerMode; - const BSONElement setName = isMasterReply.data["setName"]; - return grid.forwardingCatalogManager()->scheduleReplaceCatalogManagerIfNeeded( - (configServerModeNumber == 0 ? ConfigServerMode::SCCC : ConfigServerMode::CSRS), - (setName.type() == String ? setName.valueStringData() : StringData()), - remoteHost); + return Status::OK(); } case ErrorCodes::NoSuchKey: { // The ismaster response indicates that remoteHost is not a config server, or that diff --git a/src/mongo/s/commands/cluster_is_master_cmd.cpp b/src/mongo/s/commands/cluster_is_master_cmd.cpp index 14a697808be..50f70f01352 100644 --- a/src/mongo/s/commands/cluster_is_master_cmd.cpp +++ b/src/mongo/s/commands/cluster_is_master_cmd.cpp @@ -30,7 +30,6 @@ #include "mongo/db/commands.h" #include "mongo/db/wire_version.h" -#include "mongo/s/catalog/forwarding_catalog_manager.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/batched_command_request.h" diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index 116bfba0139..0efd5353366 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -462,7 +462,7 @@ public: map<BSONObj, int> chunkSizes; { // Take distributed lock to prevent split / migration. - auto scopedDistLock = grid.forwardingCatalogManager()->distLock( + auto scopedDistLock = grid.catalogManager(txn)->distLock( txn, finalColLong, "mr-post-process", kNoDistLockTimeout); if (!scopedDistLock.isOK()) { diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp index 2f0735c4d1f..9cfc87c7c58 100644 --- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp @@ -147,8 +147,8 @@ public: << " to: " << toShard->toString(); string whyMessage(str::stream() << "Moving primary shard of " << dbname); - auto scopedDistLock = - grid.forwardingCatalogManager()->distLock(txn, dbname + "-movePrimary", whyMessage); + auto scopedDistLock = grid.catalogManager(txn)->distLock( + txn, dbname + "-movePrimary", whyMessage, DistLockManager::kSingleLockAttemptTimeout); if (!scopedDistLock.isOK()) { return appendCommandStatus(result, scopedDistLock.getStatus()); @@ -161,8 +161,6 @@ public: BSONObj moveStartDetails = _buildMoveEntry(dbname, fromShard->toString(), toShard->toString(), shardedColls); - uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange()); - auto catalogManager = grid.catalogManager(txn); catalogManager->logChange(txn, "movePrimary.start", dbname, moveStartDetails); @@ -202,8 +200,6 @@ public: return false; } - uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange()); - const string oldPrimary = fromShard->getConnString().toString(); ScopedDbConnection fromconn(fromShard->getConnString()); @@ -241,7 +237,6 @@ public: try { log() << "movePrimary dropping cloned collection " << el.String() << " on " << oldPrimary; - uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange()); fromconn->dropCollection(el.String()); } catch (DBException& e) { e.addContext(str::stream() diff --git a/src/mongo/s/d_merge.cpp b/src/mongo/s/d_merge.cpp index 07aabc79a46..6d3e4235cf2 100644 --- a/src/mongo/s/d_merge.cpp +++ b/src/mongo/s/d_merge.cpp @@ -70,7 +70,8 @@ bool mergeChunks(OperationContext* txn, // Get the distributed lock string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey << " to " << maxKey; - auto scopedDistLock = grid.forwardingCatalogManager()->distLock(txn, nss.ns(), whyMessage); + auto scopedDistLock = grid.catalogManager(txn)->distLock( + txn, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); if (!scopedDistLock.isOK()) { *errMsg = stream() << "could not acquire collection lock for " << nss.ns() diff --git a/src/mongo/s/d_sharding_server_status.cpp b/src/mongo/s/d_sharding_server_status.cpp index e8e9a2a60f3..dc78d2defa4 100644 --- a/src/mongo/s/d_sharding_server_status.cpp +++ b/src/mongo/s/d_sharding_server_status.cpp @@ -67,10 +67,7 @@ BSONObj ShardingServerStatus::generateSection(OperationContext* txn, BSONObjBuilder result; result.append("configsvrConnectionString", shardingState->getConfigServer(txn).toString()); - auto catalogManager = grid.catalogManager(txn); - if (catalogManager->getMode() == CatalogManager::ConfigServerMode::CSRS) { - grid.shardRegistry()->getConfigOpTime().append(&result, "lastSeenConfigServerOpTime"); - } + grid.shardRegistry()->getConfigOpTime().append(&result, "lastSeenConfigServerOpTime"); return result.obj(); } diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp index e2d06e61853..5fd9af4ce63 100644 --- a/src/mongo/s/d_split.cpp +++ b/src/mongo/s/d_split.cpp @@ -652,7 +652,8 @@ public: const string whyMessage(str::stream() << "splitting chunk [" << min << ", " << max << ") in " << nss.toString()); - auto scopedDistLock = grid.forwardingCatalogManager()->distLock(txn, nss.ns(), whyMessage); + auto scopedDistLock = grid.catalogManager(txn)->distLock( + txn, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); if (!scopedDistLock.isOK()) { errmsg = str::stream() << "could not acquire collection lock for " << nss.toString() << " to split chunk [" << min << "," << max << ")" diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index 1407c0edde9..94108ecdb37 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -34,7 +34,7 @@ #include "mongo/base/status_with.h" #include "mongo/s/catalog/catalog_cache.h" -#include "mongo/s/catalog/forwarding_catalog_manager.h" +#include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_settings.h" #include "mongo/s/client/shard_registry.h" #include "mongo/util/log.h" @@ -46,7 +46,7 @@ Grid grid; Grid::Grid() : _allowLocalShard(true) {} -void Grid::init(std::unique_ptr<ForwardingCatalogManager> catalogManager, +void Grid::init(std::unique_ptr<CatalogManager> catalogManager, std::unique_ptr<ShardRegistry> shardRegistry, std::unique_ptr<ClusterCursorManager> cursorManager) { invariant(!_catalogManager); @@ -136,12 +136,4 @@ void Grid::clearForUnitTests() { _cursorManager.reset(); } -ForwardingCatalogManager* Grid::forwardingCatalogManager() { - return _catalogManager.get(); -} - -CatalogManager* Grid::catalogManager(OperationContext* txn) { - return _catalogManager->getCatalogManagerToUse(txn); -} - } // namespace mongo diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index 6e09dbdc0e1..63e55687392 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -31,7 +31,7 @@ #include <string> #include <vector> -#include "mongo/s/catalog/forwarding_catalog_manager.h" +#include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/stdx/memory.h" @@ -62,7 +62,7 @@ public: * NOTE: Unit-tests are allowed to call it more than once, provided they reset the object's * state using clearForUnitTests. */ - void init(std::unique_ptr<ForwardingCatalogManager> catalogManager, + void init(std::unique_ptr<CatalogManager> catalogManager, std::unique_ptr<ShardRegistry> shardRegistry, std::unique_ptr<ClusterCursorManager> cursorManager); @@ -97,19 +97,14 @@ public: * Returns a pointer to a CatalogManager to use for accessing catalog data stored on the config * servers. */ - CatalogManager* catalogManager(OperationContext* txn); - - /** - * Returns a direct pointer to the ForwardingCatalogManager. This should only be used for - * calling methods that are specific to the ForwardingCatalogManager and not part of the generic - * CatalogManager interface, such as for taking the distributed lock and scheduling replacement - * of the underlying CatalogManager that the ForwardingCatalogManager is delegating to. - */ - ForwardingCatalogManager* forwardingCatalogManager(); + CatalogManager* catalogManager(OperationContext* txn) { + return _catalogManager.get(); + } CatalogCache* catalogCache() { return _catalogCache.get(); } + ShardRegistry* shardRegistry() { return _shardRegistry.get(); } @@ -128,7 +123,7 @@ public: void clearForUnitTests(); private: - std::unique_ptr<ForwardingCatalogManager> _catalogManager; + std::unique_ptr<CatalogManager> _catalogManager; std::unique_ptr<CatalogCache> _catalogCache; std::unique_ptr<ShardRegistry> _shardRegistry; std::unique_ptr<ClusterCursorManager> _cursorManager; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index a3b2c365de4..948e12b2af6 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -132,67 +132,6 @@ std::unique_ptr<LiteParsedQuery> transformQueryForShards(const LiteParsedQuery& lpq.isAllowPartialResults()); } -/** - * Runs a find command against the "config" shard in SyncClusterConnection (SCCC) mode. Special - * handling is required for SCCC since the config shard's NS targeter is only available if the - * config servers are in CSRS mode. - * - * 'query' is the query to run against the config shard. 'shard' must represent the config shard. - * - * On success, fills out 'results' with the documents returned from the config shard and returns the - * cursor id which should be handed back to the client. - * - * TODO: This should not be required for 3.4, since the config server mode must be config server - * replica set (CSRS) in order to upgrade. - */ -StatusWith<CursorId> runConfigServerQuerySCCC(const CanonicalQuery& query, - const Shard& shard, - std::vector<BSONObj>* results) { - BSONObj findCommand = query.getParsed().asFindCommand(); - - // XXX: This is a temporary hack. We use ScopedDbConnection and query the $cmd namespace - // explicitly because this gives us the particular host that the command ran on via - // originalHost(). We need to know the host that the remote cursor was established on in order - // to issue getMore or killCursors operations against this remote cursor. - ScopedDbConnection conn(shard.getConnString()); - auto cursor = conn->query(str::stream() << query.nss().db() << ".$cmd", - findCommand, - -1, // nToReturn - 0, // nToSkip - nullptr, // fieldsToReturn - 0); // options - if (!cursor || !cursor->more()) { - return {ErrorCodes::OperationFailed, "failed to run find command against config shard"}; - } - BSONObj result = cursor->nextSafe().getOwned(); - conn.done(); - - auto status = Command::getStatusFromCommandResult(result); - if (ErrorCodes::SendStaleConfig == status || ErrorCodes::RecvStaleConfig == status) { - throw RecvStaleConfigException("find command failed because of stale config", result); - } - - auto executorPool = grid.shardRegistry()->getExecutorPool(); - auto transformedResult = storePossibleCursor(HostAndPort(cursor->originalHost()), - result, - executorPool->getArbitraryExecutor(), - grid.getCursorManager()); - if (!transformedResult.isOK()) { - return transformedResult.getStatus(); - } - - auto outgoingCursorResponse = CursorResponse::parseFromBSON(transformedResult.getValue()); - if (!outgoingCursorResponse.isOK()) { - return outgoingCursorResponse.getStatus(); - } - - for (const auto& doc : outgoingCursorResponse.getValue().getBatch()) { - results->push_back(doc.getOwned()); - } - - return outgoingCursorResponse.getValue().getCursorId(); -} - StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, @@ -251,21 +190,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, // Use read pref to target a particular host from each shard. Also construct the find command // that we will forward to each shard. for (const auto& shard : shards) { - // The unified targeting logic only works for config server replica sets, so we need special - // handling for querying config server content with legacy 3-host config servers. - if (shard->isConfig() && shard->getConnString().type() == ConnectionString::SYNC) { - invariant(shards.size() == 1U); - try { - return runConfigServerQuerySCCC(query, *shard, results); - } catch (const DBException& e) { - if (e.getCode() != ErrorCodes::IncompatibleCatalogManager) { - throw; - } - grid.forwardingCatalogManager()->waitForCatalogManagerChange(txn); - // Fall through to normal code path now that the catalog manager mode has been - // swapped and the config servers are a normal replica set. - } - } + invariant(!shard->isConfig() || shard->getConnString().type() != ConnectionString::SYNC); // Build the find command, and attach shard version if necessary. BSONObjBuilder cmdBuilder; diff --git a/src/mongo/s/s_sharding_server_status.cpp b/src/mongo/s/s_sharding_server_status.cpp index 809521c6fa2..0ef408cef96 100644 --- a/src/mongo/s/s_sharding_server_status.cpp +++ b/src/mongo/s/s_sharding_server_status.cpp @@ -65,10 +65,7 @@ BSONObj ShardingServerStatus::generateSection(OperationContext* txn, result.append("configsvrConnectionString", grid.shardRegistry()->getConfigServerConnectionString().toString()); - auto catalogManager = grid.catalogManager(txn); - if (catalogManager->getMode() == CatalogManager::ConfigServerMode::CSRS) { - grid.shardRegistry()->getConfigOpTime().append(&result, "lastSeenConfigServerOpTime"); - } + grid.shardRegistry()->getConfigOpTime().append(&result, "lastSeenConfigServerOpTime"); return result.obj(); } diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 6cb3aff4857..8803ae0e235 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -59,7 +59,7 @@ #include "mongo/db/wire_version.h" #include "mongo/platform/process_id.h" #include "mongo/s/balance.h" -#include "mongo/s/catalog/forwarding_catalog_manager.h" +#include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_connection_hook.h" diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 7d5da250958..0ad8aa554f5 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -48,11 +48,13 @@ #include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/metadata_hook.h" #include "mongo/rpc/metadata/config_server_metadata.h" -#include "mongo/s/catalog/forwarding_catalog_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_network_connection_hook.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/grid.h" +#include "mongo/s/catalog/replset/catalog_manager_replica_set.h" +#include "mongo/s/catalog/replset/dist_lock_catalog_impl.h" +#include "mongo/s/catalog/replset/replset_dist_lock_manager.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -68,6 +70,28 @@ using executor::NetworkInterfaceThreadPool; using executor::TaskExecutorPool; using executor::ThreadPoolTaskExecutor; + +std::unique_ptr<CatalogManager> makeCatalogManager(ServiceContext* service, + ShardRegistry* shardRegistry, + const HostAndPort& thisHost) { + std::unique_ptr<SecureRandom> rng(SecureRandom::create()); + std::string distLockProcessId = str::stream() + << thisHost.toString() << ':' + << durationCount<Seconds>(service->getClockSource()->now().toDurationSinceEpoch()) << ':' + << static_cast<int32_t>(rng->nextInt64()); + + auto distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry); + auto distLockManager = + stdx::make_unique<ReplSetDistLockManager>(service, + distLockProcessId, + std::move(distLockCatalog), + ReplSetDistLockManager::kDistLockPingInterval, + ReplSetDistLockManager::kDistLockExpirationTime); + + return stdx::make_unique<CatalogManagerReplicaSet>(std::move(distLockManager)); +} + + // Same logic as sharding_connection_hook.cpp. class ShardingEgressMetadataHook final : public rpc::EgressMetadataHook { public: @@ -153,6 +177,11 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(std::unique_ptr<NetworkIn Status initializeGlobalShardingState(OperationContext* txn, const ConnectionString& configCS, bool allowNetworking) { + if (configCS.type() == ConnectionString::SYNC) { + return {ErrorCodes::UnsupportedFormat, + "SYNC config server connection string is not allowed."}; + } + SyncClusterConnection::setConnectionValidationHook( [](const HostAndPort& target, const executor::RemoteCommandResponse& isMasterReply) { return ShardingNetworkConnectionHook::validateHostImpl(target, isMasterReply); @@ -170,12 +199,9 @@ Status initializeGlobalShardingState(OperationContext* txn, "NetworkInterfaceASIO-ShardRegistry-TaskExecutor")), configCS)); - std::unique_ptr<ForwardingCatalogManager> catalogManager = - stdx::make_unique<ForwardingCatalogManager>( - getGlobalServiceContext(), - configCS, - shardRegistry.get(), - HostAndPort(getHostName(), serverGlobalParams.port)); + auto catalogManager = makeCatalogManager(getGlobalServiceContext(), + shardRegistry.get(), + HostAndPort(getHostName(), serverGlobalParams.port)); shardRegistry->startup(); grid.init(std::move(catalogManager), @@ -193,12 +219,6 @@ Status initializeGlobalShardingState(OperationContext* txn, return Status::OK(); } catch (const DBException& ex) { Status status = ex.toStatus(); - if (status == ErrorCodes::ConfigServersInconsistent) { - // Legacy catalog manager can return ConfigServersInconsistent. When that happens - // we should immediately fail initialization. For all other failures we should - // retry. - return status; - } if (status == ErrorCodes::ReplicaSetNotFound) { // ReplicaSetNotFound most likely means we've been waiting for the config replica // set to come up for so long that the ReplicaSetMonitor stopped monitoring the set. diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index e51df67a8a5..c5bc4158f7d 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -131,9 +131,7 @@ void ShardingTestFixture::setUp() { // For now initialize the global grid object. All sharding objects will be accessible // from there until we get rid of it. - auto shardRegistryPtr = shardRegistry.get(); - grid.init(stdx::make_unique<ForwardingCatalogManager>( - _service.get(), std::move(cm), shardRegistryPtr, HostAndPort("somehost")), + grid.init(std::move(cm), std::move(shardRegistry), stdx::make_unique<ClusterCursorManager>(_service->getClockSource())); } diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp index 7f1a9f48c9d..5914fc7b460 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/strategy.cpp @@ -200,7 +200,6 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) { return; int loops = 5; - bool cmChangeAttempted = false; while (true) { try { @@ -254,20 +253,13 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) { if (loops < 4) versionManager.forceRemoteCheckShardVersionCB(txn, staleNS); } catch (const DBException& e) { - if (e.getCode() == ErrorCodes::IncompatibleCatalogManager) { - fassert(28791, !cmChangeAttempted); - cmChangeAttempted = true; - - grid.forwardingCatalogManager()->waitForCatalogManagerChange(txn); - } else { - OpQueryReplyBuilder reply; - { - BSONObjBuilder builder(reply.bufBuilderForResults()); - Command::appendCommandStatus(builder, e.toStatus()); - } - reply.sendCommandReply(request.p(), request.m()); - return; + OpQueryReplyBuilder reply; + { + BSONObjBuilder builder(reply.bufBuilderForResults()); + Command::appendCommandStatus(builder, e.toStatus()); } + reply.sendCommandReply(request.p(), request.m()); + return; } } } |