diff options
author | Spencer T Brody <spencer@mongodb.com> | 2016-01-19 18:14:53 -0500 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2016-01-21 16:22:38 -0500 |
commit | 38740c56f64ba791a681885737f975eb79640a0a (patch) | |
tree | 8c59283c4ec9e473513a58c33c83ba6788abbfe2 | |
parent | f4a4d6944b47185230fe06dfcf06bc7cf6c5ac66 (diff) | |
download | mongo-38740c56f64ba791a681885737f975eb79640a0a.tar.gz |
SERVER-20036 Interrupt mapReduce and movePrimary commands on catalog manager change
-rw-r--r-- | src/mongo/db/cloner.cpp | 44 | ||||
-rw-r--r-- | src/mongo/db/cloner.h | 11 | ||||
-rw-r--r-- | src/mongo/db/commands/clone.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 37 | ||||
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/balance.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/forwarding_catalog_manager.cpp | 24 | ||||
-rw-r--r-- | src/mongo/s/catalog/forwarding_catalog_manager.h | 11 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_move_primary_cmd.cpp | 31 |
9 files changed, 152 insertions, 29 deletions
diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index feb93876365..8fda1b0b17f 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -60,6 +60,7 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" #include "mongo/db/storage/storage_options.h" +#include "mongo/s/grid.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -111,6 +112,31 @@ BSONObj fixindex(const string& newDbName, BSONObj o) { return res; } +namespace { +Status _checkForCatalogManagerChangeIfNeeded(const CloneOptions& opts) { + if (!opts.checkForCatalogChange) { + return Status::OK(); + } + auto catalogManager = grid.forwardingCatalogManager(); + invariant(catalogManager); + + Status status = catalogManager->checkForPendingCatalogChange(); + if (!status.isOK()) { + return status; + } + + auto currentConfigServerMode = catalogManager->getMode(); + if (currentConfigServerMode != opts.initialCatalogMode) { + invariant(opts.initialCatalogMode == CatalogManager::ConfigServerMode::SCCC && + currentConfigServerMode == CatalogManager::ConfigServerMode::CSRS); + return Status(ErrorCodes::IncompatibleCatalogManager, + "CatalogManager was swapped from SCCC to CSRS mode during movePrimary." + "Aborting movePrimary to unblock mongos."); + } + return Status::OK(); +} +} // namespace + Cloner::Cloner() {} struct Cloner::Fun { @@ -118,6 +144,7 @@ struct Cloner::Fun { void operator()(DBClientCursorBatchIterator& i) { invariant(from_collection.coll() != "system.indexes"); + uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(_opts)); // XXX: can probably take dblock instead unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(txn, MODE_X)); @@ -241,6 +268,7 @@ struct Cloner::Fun { BSONObj from_options; NamespaceString to_collection; time_t saveLast; + CloneOptions _opts; }; /* copy the specified collection @@ -251,7 +279,7 @@ void Cloner::copy(OperationContext* txn, const BSONObj& from_opts, const NamespaceString& to_collection, bool masterSameProcess, - bool slaveOk, + const CloneOptions& opts, Query query) { LOG(2) << "\t\tcloning collection " << from_collection << " to " << to_collection << " on " << _conn->getServerAddress() << " with filter " << query.toString() << endl; @@ -262,8 +290,9 @@ void Cloner::copy(OperationContext* txn, f.from_options = from_opts; f.to_collection = to_collection; f.saveLast = time(0); + f._opts = opts; - int options = QueryOption_NoCursorTimeout | (slaveOk ? QueryOption_SlaveOk : 0); + int options = QueryOption_NoCursorTimeout | (opts.slaveOk ? QueryOption_SlaveOk : 0); { Lock::TempRelease tempRelease(txn->lockState()); _conn->query(stdx::function<void(DBClientCursorBatchIterator&)>(f), @@ -407,7 +436,9 @@ bool Cloner::copyCollection(OperationContext* txn, } // main data - copy(txn, dbname, nss, options, nss, false, true, Query(query).snapshot()); + CloneOptions opts; + opts.slaveOk = true; + copy(txn, dbname, nss, options, nss, false, opts, Query(query).snapshot()); /* TODO : copyIndexes bool does not seem to be implemented! */ if (!shouldCopyIndexes) { @@ -483,6 +514,8 @@ Status Cloner::copyDb(OperationContext* txn, clonedColls->clear(); } + uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts)); + { // getCollectionInfos may make a remote call, which may block indefinitely, so release // the global lock that we are entering with. @@ -580,7 +613,8 @@ Status Cloner::copyDb(OperationContext* txn, if (opts.snapshot) q.snapshot(); - copy(txn, toDBName, from_name, options, to_name, masterSameProcess, opts.slaveOk, q); + uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts)); + copy(txn, toDBName, from_name, options, to_name, masterSameProcess, opts, q); // Copy releases the lock, so we need to re-load the database. This should // probably throw if the database has changed in between, but for now preserve @@ -637,6 +671,8 @@ Status Cloner::copyDb(OperationContext* txn, NamespaceString from_name(opts.fromDB, collectionName); NamespaceString to_name(toDBName, collectionName); + uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts)); + copyIndexes(txn, toDBName, from_name, diff --git a/src/mongo/db/cloner.h b/src/mongo/db/cloner.h index 5a89f191dd1..e0e9ba306c4 100644 --- a/src/mongo/db/cloner.h +++ b/src/mongo/db/cloner.h @@ -30,8 +30,9 @@ #pragma once -#include "mongo/client/dbclientinterface.h" #include "mongo/base/disallow_copying.h" +#include "mongo/client/dbclientinterface.h" +#include "mongo/s/catalog/catalog_manager.h" namespace mongo { @@ -73,7 +74,7 @@ private: const BSONObj& from_opts, const NamespaceString& to_ns, bool masterSameProcess, - bool slaveOk, + const CloneOptions& opts, Query q); void copyIndexes(OperationContext* txn, @@ -94,6 +95,10 @@ private: * snapshot - use snapshot mode for copying collections. note this should not be used * when it isn't required, as it will be slower. for example, * repairDatabase need not use it. + * checkForCatalogChange - Internal option set for clone commands initiated by a mongos that are + * holding a distributed lock (such as movePrimary). Indicates that we need to + * be periodically checking to see if the catalog manager has swapped and fail + * if it has so that we don't block the mongos that initiated the command. */ struct CloneOptions { std::string fromDB; @@ -105,6 +110,8 @@ struct CloneOptions { bool syncData = true; bool syncIndexes = true; + bool checkForCatalogChange = false; + CatalogManager::ConfigServerMode initialCatalogMode = CatalogManager::ConfigServerMode::NONE; }; } // namespace mongo diff --git a/src/mongo/db/commands/clone.cpp b/src/mongo/db/commands/clone.cpp index 0e6c7fbf1e7..b5632dd44d7 100644 --- a/src/mongo/db/commands/clone.cpp +++ b/src/mongo/db/commands/clone.cpp @@ -36,6 +36,7 @@ #include "mongo/db/cloner.h" #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" +#include "mongo/s/grid.h" namespace { @@ -102,6 +103,18 @@ public: CloneOptions opts; opts.fromDB = dbname; opts.slaveOk = cmdObj["slaveOk"].trueValue(); + opts.checkForCatalogChange = cmdObj["_checkForCatalogChange"].trueValue(); + + if (opts.checkForCatalogChange) { + auto catalogManager = grid.catalogManager(txn); + if (!catalogManager) { + return appendCommandStatus( + result, + Status(ErrorCodes::NotYetInitialized, + "Cannot run clone command for use by sharding movePrimary command on a " + "node that isn't yet sharding aware")); + } + } // See if there's any collections we should ignore if (cmdObj["collsToIgnore"].type() == Array) { diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 468f52cd13c..ee7ece9ac9a 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -1559,6 +1559,26 @@ public: } mapReduceCommand; +namespace { +Status _checkForCatalogManagerChange(ForwardingCatalogManager* catalogManager, + CatalogManager::ConfigServerMode initialConfigServerMode) { + Status status = catalogManager->checkForPendingCatalogChange(); + if (!status.isOK()) { + return status; + } + + auto currentConfigServerMode = catalogManager->getMode(); + if (currentConfigServerMode != initialConfigServerMode) { + invariant(initialConfigServerMode == CatalogManager::ConfigServerMode::SCCC && + currentConfigServerMode == CatalogManager::ConfigServerMode::CSRS); + return Status(ErrorCodes::IncompatibleCatalogManager, + "CatalogManager was swapped from SCCC to CSRS mode during mapreduce." + "Aborting mapreduce to unblock mongos."); + } + return Status::OK(); +} +} // namespace + /** * This class represents a map/reduce command executed on the output server of a sharded env */ @@ -1599,6 +1619,11 @@ public: << dbname)); } + // Store the initial catalog manager mode so we can check if it changes at any point. + CatalogManager::ConfigServerMode initialConfigServerMode = + grid.catalogManager(txn)->getMode(); + + boost::optional<DisableDocumentValidation> maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmdObj)) maybeDisableValidation.emplace(txn); @@ -1696,6 +1721,12 @@ public: BSONObj query; BSONArrayBuilder chunkSizes; while (true) { + Status status = _checkForCatalogManagerChange(grid.forwardingCatalogManager(), + initialConfigServerMode); + if (!status.isOK()) { + return appendCommandStatus(result, status); + } + ChunkPtr chunk; if (chunks.size() > 0) { chunk = chunks[index]; @@ -1714,6 +1745,12 @@ public: int chunkSize = 0; while (cursor.more() || !values.empty()) { + status = _checkForCatalogManagerChange(grid.forwardingCatalogManager(), + initialConfigServerMode); + if (!status.isOK()) { + return appendCommandStatus(result, status); + } + BSONObj t; if (cursor.more()) { t = cursor.next().getOwned(); diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 02043813e6f..12e93857473 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -237,7 +237,7 @@ public: timing.done(2); - Status distLockStatus = distLock->checkForPendingCatalogSwap(); + Status distLockStatus = distLock->checkForPendingCatalogChange(); if (!distLockStatus.isOK()) { warning() << "Aborting migration due to need to swap current catalog manager" << causedBy(distLockStatus); @@ -313,7 +313,7 @@ public: timing.done(3); - distLockStatus = distLock->checkForPendingCatalogSwap(); + distLockStatus = distLock->checkForPendingCatalogChange(); if (!distLockStatus.isOK()) { warning() << "Aborting migration due to need to swap current catalog manager" << causedBy(distLockStatus); @@ -417,7 +417,7 @@ public: txn->checkForInterrupt(); - distLockStatus = distLock->checkForPendingCatalogSwap(); + distLockStatus = distLock->checkForPendingCatalogChange(); if (!distLockStatus.isOK()) { warning() << "Aborting migration due to need to swap current catalog manager" << causedBy(distLockStatus); @@ -427,7 +427,7 @@ public: timing.done(4); - distLockStatus = distLock->checkForPendingCatalogSwap(); + distLockStatus = distLock->checkForPendingCatalogChange(); if (!distLockStatus.isOK()) { warning() << "Aborting migration due to need to swap current catalog manager" << causedBy(distLockStatus); diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index 1f5715837de..a4fa019772c 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -382,7 +382,7 @@ void Balancer::_doBalanceRound(OperationContext* txn, // For each collection, check if the balancing policy recommends moving anything around. for (const auto& coll : collections) { - uassertStatusOK(distLock->checkForPendingCatalogSwap()); + uassertStatusOK(distLock->checkForPendingCatalogChange()); // Skip collections for which balancing is disabled const NamespaceString& nss = coll.getNs(); diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.cpp b/src/mongo/s/catalog/forwarding_catalog_manager.cpp index ecf7d289a58..cdc956fef32 100644 --- a/src/mongo/s/catalog/forwarding_catalog_manager.cpp +++ b/src/mongo/s/catalog/forwarding_catalog_manager.cpp @@ -202,19 +202,12 @@ ForwardingCatalogManager::ScopedDistLock& ForwardingCatalogManager::ScopedDistLo return *this; } -Status ForwardingCatalogManager::ScopedDistLock::checkForPendingCatalogSwap() { - stdx::lock_guard<stdx::mutex> lk(_fcm->_observerMutex); - if (!_fcm->_nextConfigChangeComplete.isValid() || _fcm->_configChangeComplete) { - return Status::OK(); - } - return Status(ErrorCodes::IncompatibleCatalogManager, - "Need to swap sharding catalog manager. Config server " - "reports that it is in replica set mode, but we are still using the " - "legacy SCCC protocol for config server communication"); +Status ForwardingCatalogManager::ScopedDistLock::checkForPendingCatalogChange() { + return _fcm->checkForPendingCatalogChange(); } Status ForwardingCatalogManager::ScopedDistLock::checkStatus() { - Status status = checkForPendingCatalogSwap(); + Status status = checkForPendingCatalogChange(); if (!status.isOK()) { return status; } @@ -268,6 +261,17 @@ void ForwardingCatalogManager::waitForCatalogManagerChange(OperationContext* txn _shardRegistry->getExecutor()->waitForEvent(configChangeComplete); } +Status ForwardingCatalogManager::checkForPendingCatalogChange() { + stdx::lock_guard<stdx::mutex> lk(_observerMutex); + if (!_nextConfigChangeComplete.isValid() || _configChangeComplete) { + return Status::OK(); + } + return Status(ErrorCodes::IncompatibleCatalogManager, + "Need to swap sharding catalog manager. Config server " + "reports that it is in replica set mode, but we are still using the " + "legacy SCCC protocol for config server communication"); +} + namespace { template <typename T> diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.h b/src/mongo/s/catalog/forwarding_catalog_manager.h index 33aba11bc9c..23a450a9752 100644 --- a/src/mongo/s/catalog/forwarding_catalog_manager.h +++ b/src/mongo/s/catalog/forwarding_catalog_manager.h @@ -76,6 +76,8 @@ public: // Only public because of unit tests DistLockManager* getDistLockManager() override; + ConfigServerMode getMode() override; + /** * If desiredMode doesn't equal _actual->getMode(), schedules work to swap the actual catalog * manager to one of the type specified by desiredMode. @@ -92,6 +94,11 @@ public: void waitForCatalogManagerChange(OperationContext* txn); /** + * Checks to see if we are currently waiting to swap the catalog manager. + */ + Status checkForPendingCatalogChange(); + + /** * Returns a ScopedDistLock which is the RAII type for holding a distributed lock. * ScopedDistLock prevents the underlying CatalogManager from being swapped as long as it is * in scope. @@ -115,8 +122,6 @@ public: BSONArrayBuilder* builder) override; private: - ConfigServerMode getMode() override; - Status startup(OperationContext* txn, bool allowNetworking) override; void shutDown(OperationContext* txn, bool allowNetworking = true) override; @@ -274,7 +279,7 @@ public: * returns a non-OK status the caller must release the lock (most likely by failing the current * operation). */ - Status checkForPendingCatalogSwap(); + Status checkForPendingCatalogChange(); /** * Queries the config server to make sure the lock is still present, as well as checking diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp index 145c982de48..2f0735c4d1f 100644 --- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp @@ -47,6 +47,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" +#include "mongo/s/set_shard_version_request.h" #include "mongo/util/log.h" namespace mongo { @@ -160,6 +161,8 @@ public: BSONObj moveStartDetails = _buildMoveEntry(dbname, fromShard->toString(), toShard->toString(), shardedColls); + uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange()); + auto catalogManager = grid.catalogManager(txn); catalogManager->logChange(txn, "movePrimary.start", dbname, moveStartDetails); @@ -168,14 +171,29 @@ public: ScopedDbConnection toconn(toShard->getConnString()); + { + // Make sure the target node is sharding aware so that it can detect catalog manager + // swaps. + auto ssvRequest = SetShardVersionRequest::makeForInitNoPersist( + grid.shardRegistry()->getConfigServerConnectionString(), + toShard->getId(), + toShard->getConnString()); + BSONObj res; + bool ok = toconn->runCommand("admin", ssvRequest.toBSON(), res); + if (!ok) { + return appendCommandStatus(result, getStatusFromCommandResult(res)); + } + } + // TODO ERH - we need a clone command which replays operations from clone start to now // can just use local.oplog.$main BSONObj cloneRes; - bool worked = toconn->runCommand( - dbname.c_str(), - BSON("clone" << fromShard->getConnString().toString() << "collsToIgnore" << barr.arr() - << bypassDocumentValidationCommandOption() << true), - cloneRes); + bool worked = toconn->runCommand(dbname.c_str(), + BSON("clone" << fromShard->getConnString().toString() + << "collsToIgnore" << barr.arr() + << bypassDocumentValidationCommandOption() + << true << "_checkForCatalogChange" << true), + cloneRes); toconn.done(); if (!worked) { @@ -184,6 +202,8 @@ public: return false; } + uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange()); + const string oldPrimary = fromShard->getConnString().toString(); ScopedDbConnection fromconn(fromShard->getConnString()); @@ -221,6 +241,7 @@ public: try { log() << "movePrimary dropping cloned collection " << el.String() << " on " << oldPrimary; + uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange()); fromconn->dropCollection(el.String()); } catch (DBException& e) { e.addContext(str::stream() |