summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2016-01-19 18:14:53 -0500
committerSpencer T Brody <spencer@mongodb.com>2016-01-21 16:22:38 -0500
commit38740c56f64ba791a681885737f975eb79640a0a (patch)
tree8c59283c4ec9e473513a58c33c83ba6788abbfe2
parentf4a4d6944b47185230fe06dfcf06bc7cf6c5ac66 (diff)
downloadmongo-38740c56f64ba791a681885737f975eb79640a0a.tar.gz
SERVER-20036 Interrupt mapReduce and movePrimary commands on catalog manager change
-rw-r--r--src/mongo/db/cloner.cpp44
-rw-r--r--src/mongo/db/cloner.h11
-rw-r--r--src/mongo/db/commands/clone.cpp13
-rw-r--r--src/mongo/db/commands/mr.cpp37
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp8
-rw-r--r--src/mongo/s/balance.cpp2
-rw-r--r--src/mongo/s/catalog/forwarding_catalog_manager.cpp24
-rw-r--r--src/mongo/s/catalog/forwarding_catalog_manager.h11
-rw-r--r--src/mongo/s/commands/cluster_move_primary_cmd.cpp31
9 files changed, 152 insertions, 29 deletions
diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp
index feb93876365..8fda1b0b17f 100644
--- a/src/mongo/db/cloner.cpp
+++ b/src/mongo/db/cloner.cpp
@@ -60,6 +60,7 @@
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/storage/storage_options.h"
+#include "mongo/s/grid.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -111,6 +112,31 @@ BSONObj fixindex(const string& newDbName, BSONObj o) {
return res;
}
+namespace {
+Status _checkForCatalogManagerChangeIfNeeded(const CloneOptions& opts) {
+ if (!opts.checkForCatalogChange) {
+ return Status::OK();
+ }
+ auto catalogManager = grid.forwardingCatalogManager();
+ invariant(catalogManager);
+
+ Status status = catalogManager->checkForPendingCatalogChange();
+ if (!status.isOK()) {
+ return status;
+ }
+
+ auto currentConfigServerMode = catalogManager->getMode();
+ if (currentConfigServerMode != opts.initialCatalogMode) {
+ invariant(opts.initialCatalogMode == CatalogManager::ConfigServerMode::SCCC &&
+ currentConfigServerMode == CatalogManager::ConfigServerMode::CSRS);
+ return Status(ErrorCodes::IncompatibleCatalogManager,
+ "CatalogManager was swapped from SCCC to CSRS mode during movePrimary."
+ "Aborting movePrimary to unblock mongos.");
+ }
+ return Status::OK();
+}
+} // namespace
+
Cloner::Cloner() {}
struct Cloner::Fun {
@@ -118,6 +144,7 @@ struct Cloner::Fun {
void operator()(DBClientCursorBatchIterator& i) {
invariant(from_collection.coll() != "system.indexes");
+ uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(_opts));
// XXX: can probably take dblock instead
unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(txn, MODE_X));
@@ -241,6 +268,7 @@ struct Cloner::Fun {
BSONObj from_options;
NamespaceString to_collection;
time_t saveLast;
+ CloneOptions _opts;
};
/* copy the specified collection
@@ -251,7 +279,7 @@ void Cloner::copy(OperationContext* txn,
const BSONObj& from_opts,
const NamespaceString& to_collection,
bool masterSameProcess,
- bool slaveOk,
+ const CloneOptions& opts,
Query query) {
LOG(2) << "\t\tcloning collection " << from_collection << " to " << to_collection << " on "
<< _conn->getServerAddress() << " with filter " << query.toString() << endl;
@@ -262,8 +290,9 @@ void Cloner::copy(OperationContext* txn,
f.from_options = from_opts;
f.to_collection = to_collection;
f.saveLast = time(0);
+ f._opts = opts;
- int options = QueryOption_NoCursorTimeout | (slaveOk ? QueryOption_SlaveOk : 0);
+ int options = QueryOption_NoCursorTimeout | (opts.slaveOk ? QueryOption_SlaveOk : 0);
{
Lock::TempRelease tempRelease(txn->lockState());
_conn->query(stdx::function<void(DBClientCursorBatchIterator&)>(f),
@@ -407,7 +436,9 @@ bool Cloner::copyCollection(OperationContext* txn,
}
// main data
- copy(txn, dbname, nss, options, nss, false, true, Query(query).snapshot());
+ CloneOptions opts;
+ opts.slaveOk = true;
+ copy(txn, dbname, nss, options, nss, false, opts, Query(query).snapshot());
/* TODO : copyIndexes bool does not seem to be implemented! */
if (!shouldCopyIndexes) {
@@ -483,6 +514,8 @@ Status Cloner::copyDb(OperationContext* txn,
clonedColls->clear();
}
+ uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts));
+
{
// getCollectionInfos may make a remote call, which may block indefinitely, so release
// the global lock that we are entering with.
@@ -580,7 +613,8 @@ Status Cloner::copyDb(OperationContext* txn,
if (opts.snapshot)
q.snapshot();
- copy(txn, toDBName, from_name, options, to_name, masterSameProcess, opts.slaveOk, q);
+ uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts));
+ copy(txn, toDBName, from_name, options, to_name, masterSameProcess, opts, q);
// Copy releases the lock, so we need to re-load the database. This should
// probably throw if the database has changed in between, but for now preserve
@@ -637,6 +671,8 @@ Status Cloner::copyDb(OperationContext* txn,
NamespaceString from_name(opts.fromDB, collectionName);
NamespaceString to_name(toDBName, collectionName);
+ uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts));
+
copyIndexes(txn,
toDBName,
from_name,
diff --git a/src/mongo/db/cloner.h b/src/mongo/db/cloner.h
index 5a89f191dd1..e0e9ba306c4 100644
--- a/src/mongo/db/cloner.h
+++ b/src/mongo/db/cloner.h
@@ -30,8 +30,9 @@
#pragma once
-#include "mongo/client/dbclientinterface.h"
#include "mongo/base/disallow_copying.h"
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/s/catalog/catalog_manager.h"
namespace mongo {
@@ -73,7 +74,7 @@ private:
const BSONObj& from_opts,
const NamespaceString& to_ns,
bool masterSameProcess,
- bool slaveOk,
+ const CloneOptions& opts,
Query q);
void copyIndexes(OperationContext* txn,
@@ -94,6 +95,10 @@ private:
* snapshot - use snapshot mode for copying collections. note this should not be used
* when it isn't required, as it will be slower. for example,
* repairDatabase need not use it.
+ * checkForCatalogChange - Internal option set for clone commands initiated by a mongos that are
+ * holding a distributed lock (such as movePrimary). Indicates that we need to
+ * be periodically checking to see if the catalog manager has swapped and fail
+ * if it has so that we don't block the mongos that initiated the command.
*/
struct CloneOptions {
std::string fromDB;
@@ -105,6 +110,8 @@ struct CloneOptions {
bool syncData = true;
bool syncIndexes = true;
+ bool checkForCatalogChange = false;
+ CatalogManager::ConfigServerMode initialCatalogMode = CatalogManager::ConfigServerMode::NONE;
};
} // namespace mongo
diff --git a/src/mongo/db/commands/clone.cpp b/src/mongo/db/commands/clone.cpp
index 0e6c7fbf1e7..b5632dd44d7 100644
--- a/src/mongo/db/commands/clone.cpp
+++ b/src/mongo/db/commands/clone.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/cloner.h"
#include "mongo/db/commands.h"
#include "mongo/db/jsobj.h"
+#include "mongo/s/grid.h"
namespace {
@@ -102,6 +103,18 @@ public:
CloneOptions opts;
opts.fromDB = dbname;
opts.slaveOk = cmdObj["slaveOk"].trueValue();
+ opts.checkForCatalogChange = cmdObj["_checkForCatalogChange"].trueValue();
+
+ if (opts.checkForCatalogChange) {
+ auto catalogManager = grid.catalogManager(txn);
+ if (!catalogManager) {
+ return appendCommandStatus(
+ result,
+ Status(ErrorCodes::NotYetInitialized,
+ "Cannot run clone command for use by sharding movePrimary command on a "
+ "node that isn't yet sharding aware"));
+ }
+ }
// See if there's any collections we should ignore
if (cmdObj["collsToIgnore"].type() == Array) {
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 468f52cd13c..ee7ece9ac9a 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -1559,6 +1559,26 @@ public:
} mapReduceCommand;
+namespace {
+Status _checkForCatalogManagerChange(ForwardingCatalogManager* catalogManager,
+ CatalogManager::ConfigServerMode initialConfigServerMode) {
+ Status status = catalogManager->checkForPendingCatalogChange();
+ if (!status.isOK()) {
+ return status;
+ }
+
+ auto currentConfigServerMode = catalogManager->getMode();
+ if (currentConfigServerMode != initialConfigServerMode) {
+ invariant(initialConfigServerMode == CatalogManager::ConfigServerMode::SCCC &&
+ currentConfigServerMode == CatalogManager::ConfigServerMode::CSRS);
+ return Status(ErrorCodes::IncompatibleCatalogManager,
+ "CatalogManager was swapped from SCCC to CSRS mode during mapreduce."
+ "Aborting mapreduce to unblock mongos.");
+ }
+ return Status::OK();
+}
+} // namespace
+
/**
* This class represents a map/reduce command executed on the output server of a sharded env
*/
@@ -1599,6 +1619,11 @@ public:
<< dbname));
}
+ // Store the initial catalog manager mode so we can check if it changes at any point.
+ CatalogManager::ConfigServerMode initialConfigServerMode =
+ grid.catalogManager(txn)->getMode();
+
+
boost::optional<DisableDocumentValidation> maybeDisableValidation;
if (shouldBypassDocumentValidationForCommand(cmdObj))
maybeDisableValidation.emplace(txn);
@@ -1696,6 +1721,12 @@ public:
BSONObj query;
BSONArrayBuilder chunkSizes;
while (true) {
+ Status status = _checkForCatalogManagerChange(grid.forwardingCatalogManager(),
+ initialConfigServerMode);
+ if (!status.isOK()) {
+ return appendCommandStatus(result, status);
+ }
+
ChunkPtr chunk;
if (chunks.size() > 0) {
chunk = chunks[index];
@@ -1714,6 +1745,12 @@ public:
int chunkSize = 0;
while (cursor.more() || !values.empty()) {
+ status = _checkForCatalogManagerChange(grid.forwardingCatalogManager(),
+ initialConfigServerMode);
+ if (!status.isOK()) {
+ return appendCommandStatus(result, status);
+ }
+
BSONObj t;
if (cursor.more()) {
t = cursor.next().getOwned();
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 02043813e6f..12e93857473 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -237,7 +237,7 @@ public:
timing.done(2);
- Status distLockStatus = distLock->checkForPendingCatalogSwap();
+ Status distLockStatus = distLock->checkForPendingCatalogChange();
if (!distLockStatus.isOK()) {
warning() << "Aborting migration due to need to swap current catalog manager"
<< causedBy(distLockStatus);
@@ -313,7 +313,7 @@ public:
timing.done(3);
- distLockStatus = distLock->checkForPendingCatalogSwap();
+ distLockStatus = distLock->checkForPendingCatalogChange();
if (!distLockStatus.isOK()) {
warning() << "Aborting migration due to need to swap current catalog manager"
<< causedBy(distLockStatus);
@@ -417,7 +417,7 @@ public:
txn->checkForInterrupt();
- distLockStatus = distLock->checkForPendingCatalogSwap();
+ distLockStatus = distLock->checkForPendingCatalogChange();
if (!distLockStatus.isOK()) {
warning() << "Aborting migration due to need to swap current catalog manager"
<< causedBy(distLockStatus);
@@ -427,7 +427,7 @@ public:
timing.done(4);
- distLockStatus = distLock->checkForPendingCatalogSwap();
+ distLockStatus = distLock->checkForPendingCatalogChange();
if (!distLockStatus.isOK()) {
warning() << "Aborting migration due to need to swap current catalog manager"
<< causedBy(distLockStatus);
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp
index 1f5715837de..a4fa019772c 100644
--- a/src/mongo/s/balance.cpp
+++ b/src/mongo/s/balance.cpp
@@ -382,7 +382,7 @@ void Balancer::_doBalanceRound(OperationContext* txn,
// For each collection, check if the balancing policy recommends moving anything around.
for (const auto& coll : collections) {
- uassertStatusOK(distLock->checkForPendingCatalogSwap());
+ uassertStatusOK(distLock->checkForPendingCatalogChange());
// Skip collections for which balancing is disabled
const NamespaceString& nss = coll.getNs();
diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.cpp b/src/mongo/s/catalog/forwarding_catalog_manager.cpp
index ecf7d289a58..cdc956fef32 100644
--- a/src/mongo/s/catalog/forwarding_catalog_manager.cpp
+++ b/src/mongo/s/catalog/forwarding_catalog_manager.cpp
@@ -202,19 +202,12 @@ ForwardingCatalogManager::ScopedDistLock& ForwardingCatalogManager::ScopedDistLo
return *this;
}
-Status ForwardingCatalogManager::ScopedDistLock::checkForPendingCatalogSwap() {
- stdx::lock_guard<stdx::mutex> lk(_fcm->_observerMutex);
- if (!_fcm->_nextConfigChangeComplete.isValid() || _fcm->_configChangeComplete) {
- return Status::OK();
- }
- return Status(ErrorCodes::IncompatibleCatalogManager,
- "Need to swap sharding catalog manager. Config server "
- "reports that it is in replica set mode, but we are still using the "
- "legacy SCCC protocol for config server communication");
+Status ForwardingCatalogManager::ScopedDistLock::checkForPendingCatalogChange() {
+ return _fcm->checkForPendingCatalogChange();
}
Status ForwardingCatalogManager::ScopedDistLock::checkStatus() {
- Status status = checkForPendingCatalogSwap();
+ Status status = checkForPendingCatalogChange();
if (!status.isOK()) {
return status;
}
@@ -268,6 +261,17 @@ void ForwardingCatalogManager::waitForCatalogManagerChange(OperationContext* txn
_shardRegistry->getExecutor()->waitForEvent(configChangeComplete);
}
+Status ForwardingCatalogManager::checkForPendingCatalogChange() {
+ stdx::lock_guard<stdx::mutex> lk(_observerMutex);
+ if (!_nextConfigChangeComplete.isValid() || _configChangeComplete) {
+ return Status::OK();
+ }
+ return Status(ErrorCodes::IncompatibleCatalogManager,
+ "Need to swap sharding catalog manager. Config server "
+ "reports that it is in replica set mode, but we are still using the "
+ "legacy SCCC protocol for config server communication");
+}
+
namespace {
template <typename T>
diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.h b/src/mongo/s/catalog/forwarding_catalog_manager.h
index 33aba11bc9c..23a450a9752 100644
--- a/src/mongo/s/catalog/forwarding_catalog_manager.h
+++ b/src/mongo/s/catalog/forwarding_catalog_manager.h
@@ -76,6 +76,8 @@ public:
// Only public because of unit tests
DistLockManager* getDistLockManager() override;
+ ConfigServerMode getMode() override;
+
/**
* If desiredMode doesn't equal _actual->getMode(), schedules work to swap the actual catalog
* manager to one of the type specified by desiredMode.
@@ -92,6 +94,11 @@ public:
void waitForCatalogManagerChange(OperationContext* txn);
/**
+ * Checks to see if we are currently waiting to swap the catalog manager.
+ */
+ Status checkForPendingCatalogChange();
+
+ /**
* Returns a ScopedDistLock which is the RAII type for holding a distributed lock.
* ScopedDistLock prevents the underlying CatalogManager from being swapped as long as it is
* in scope.
@@ -115,8 +122,6 @@ public:
BSONArrayBuilder* builder) override;
private:
- ConfigServerMode getMode() override;
-
Status startup(OperationContext* txn, bool allowNetworking) override;
void shutDown(OperationContext* txn, bool allowNetworking = true) override;
@@ -274,7 +279,7 @@ public:
* returns a non-OK status the caller must release the lock (most likely by failing the current
* operation).
*/
- Status checkForPendingCatalogSwap();
+ Status checkForPendingCatalogChange();
/**
* Queries the config server to make sure the lock is still present, as well as checking
diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
index 145c982de48..2f0735c4d1f 100644
--- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
@@ -47,6 +47,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/config.h"
#include "mongo/s/grid.h"
+#include "mongo/s/set_shard_version_request.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -160,6 +161,8 @@ public:
BSONObj moveStartDetails =
_buildMoveEntry(dbname, fromShard->toString(), toShard->toString(), shardedColls);
+ uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange());
+
auto catalogManager = grid.catalogManager(txn);
catalogManager->logChange(txn, "movePrimary.start", dbname, moveStartDetails);
@@ -168,14 +171,29 @@ public:
ScopedDbConnection toconn(toShard->getConnString());
+ {
+ // Make sure the target node is sharding aware so that it can detect catalog manager
+ // swaps.
+ auto ssvRequest = SetShardVersionRequest::makeForInitNoPersist(
+ grid.shardRegistry()->getConfigServerConnectionString(),
+ toShard->getId(),
+ toShard->getConnString());
+ BSONObj res;
+ bool ok = toconn->runCommand("admin", ssvRequest.toBSON(), res);
+ if (!ok) {
+ return appendCommandStatus(result, getStatusFromCommandResult(res));
+ }
+ }
+
// TODO ERH - we need a clone command which replays operations from clone start to now
// can just use local.oplog.$main
BSONObj cloneRes;
- bool worked = toconn->runCommand(
- dbname.c_str(),
- BSON("clone" << fromShard->getConnString().toString() << "collsToIgnore" << barr.arr()
- << bypassDocumentValidationCommandOption() << true),
- cloneRes);
+ bool worked = toconn->runCommand(dbname.c_str(),
+ BSON("clone" << fromShard->getConnString().toString()
+ << "collsToIgnore" << barr.arr()
+ << bypassDocumentValidationCommandOption()
+ << true << "_checkForCatalogChange" << true),
+ cloneRes);
toconn.done();
if (!worked) {
@@ -184,6 +202,8 @@ public:
return false;
}
+ uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange());
+
const string oldPrimary = fromShard->getConnString().toString();
ScopedDbConnection fromconn(fromShard->getConnString());
@@ -221,6 +241,7 @@ public:
try {
log() << "movePrimary dropping cloned collection " << el.String() << " on "
<< oldPrimary;
+ uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange());
fromconn->dropCollection(el.String());
} catch (DBException& e) {
e.addContext(str::stream()