summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-10-01 09:50:29 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-10-07 18:33:14 -0400
commitcc5788013eebbbd71e87581f3bb16532ab463ef0 (patch)
tree4b7d7e47409d555825469f973ae3eebc9fefb89a /src/mongo
parent6e2fa8f483088bbbbf8dfe4575907a82d7488e08 (diff)
downloadmongo-cc5788013eebbbd71e87581f3bb16532ab463ef0.tar.gz
SERVER-19934 Sharding config minOpTime recovery
Adds a framework to record incomplete sharding metadata change operations, which can be recovered at startup or transition to primary. This version of the framework is blocking in that it cannot be interrupted until completed.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/db.cpp95
-rw-r--r--src/mongo/db/dbhelpers.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h8
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp9
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp4
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/migration_impl.cpp6
-rw-r--r--src/mongo/db/s/sharding_state_recovery.cpp297
-rw-r--r--src/mongo/db/s/sharding_state_recovery.h76
-rw-r--r--src/mongo/s/catalog/catalog_manager.h16
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.cpp12
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.h10
-rw-r--r--src/mongo/s/catalog/forwarding_catalog_manager.cpp16
-rw-r--r--src/mongo/s/catalog/forwarding_catalog_manager.h10
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp14
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.h10
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp18
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.h10
-rw-r--r--src/mongo/s/commands/cluster_move_chunk_cmd.cpp1
22 files changed, 513 insertions, 105 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index e30eb75d384..0ec2df52760 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -50,12 +50,12 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_catalog_entry.h"
+#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog/index_key_validate.h"
#include "mongo/db/client.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/concurrency/d_concurrency.h"
-#include "mongo/db/db.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbmessage.h"
@@ -70,7 +70,7 @@
#include "mongo/db/log_process_details.h"
#include "mongo/db/mongod_options.h"
#include "mongo/db/op_observer.h"
-#include "mongo/db/operation_context_impl.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/range_deleter_service.h"
#include "mongo/db/repair_database.h"
@@ -82,6 +82,7 @@
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/restapi.h"
+#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
@@ -164,10 +165,11 @@ public:
DbResponse dbresponse;
{
- OperationContextImpl txn;
- assembleResponse(&txn, m, dbresponse, port->remote());
- // txn must go out of scope here so that the operation cannot show up in
- // currentOp results after the response reaches the client.
+ auto opCtx = getGlobalServiceContext()->makeOperationContext(&cc());
+ assembleResponse(opCtx.get(), m, dbresponse, port->remote());
+
+ // opCtx must go out of scope here so that the operation cannot show up in currentOp
+ // results after the response reaches the client
}
if (dbresponse.response) {
@@ -191,7 +193,7 @@ public:
b.appendNum(cursorid);
m.appendData(b.buf(), b.len());
b.decouple();
- DEV log() << "exhaust=true sending more" << endl;
+ DEV log() << "exhaust=true sending more";
continue; // this goes back to top loop
}
}
@@ -201,7 +203,7 @@ public:
}
};
-static void logStartup() {
+static void logStartup(OperationContext* txn) {
BSONObjBuilder toLog;
stringstream id;
id << getHostNameCached() << "-" << jsTime().asInt64();
@@ -222,25 +224,23 @@ static void logStartup() {
BSONObj o = toLog.obj();
- OperationContextImpl txn;
-
- ScopedTransaction transaction(&txn, MODE_X);
- Lock::GlobalWrite lk(txn.lockState());
- AutoGetOrCreateDb autoDb(&txn, "local", mongo::MODE_X);
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lk(txn->lockState());
+ AutoGetOrCreateDb autoDb(txn, "local", mongo::MODE_X);
Database* db = autoDb.getDb();
const std::string ns = "local.startup_log";
Collection* collection = db->getCollection(ns);
- WriteUnitOfWork wunit(&txn);
+ WriteUnitOfWork wunit(txn);
if (!collection) {
BSONObj options = BSON("capped" << true << "size" << 10 * 1024 * 1024);
- bool shouldReplicateWrites = txn.writesAreReplicated();
- txn.setReplicatedWrites(false);
- ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, &txn, shouldReplicateWrites);
- uassertStatusOK(userCreateNS(&txn, db, ns, options));
+ bool shouldReplicateWrites = txn->writesAreReplicated();
+ txn->setReplicatedWrites(false);
+ ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, txn, shouldReplicateWrites);
+ uassertStatusOK(userCreateNS(txn, db, ns, options));
collection = db->getCollection(ns);
}
invariant(collection);
- uassertStatusOK(collection->insertDocument(&txn, o, false));
+ uassertStatusOK(collection->insertDocument(txn, o, false));
wunit.commit();
}
@@ -293,12 +293,11 @@ static unsigned long long checkIfReplMissingFromCommandLine(OperationContext* tx
return 0;
}
-static void repairDatabasesAndCheckVersion() {
+static void repairDatabasesAndCheckVersion(OperationContext* txn) {
LOG(1) << "enter repairDatabases (to check pdfile version #)" << endl;
- OperationContextImpl txn;
- ScopedTransaction transaction(&txn, MODE_X);
- Lock::GlobalWrite lk(txn.lockState());
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lk(txn->lockState());
vector<string> dbNames;
@@ -311,7 +310,7 @@ static void repairDatabasesAndCheckVersion() {
const string dbName = *i;
LOG(1) << " Repairing database: " << dbName << endl;
- fassert(18506, repairDatabase(&txn, storageEngine, dbName));
+ fassert(18506, repairDatabase(txn, storageEngine, dbName));
}
}
@@ -322,19 +321,19 @@ static void repairDatabasesAndCheckVersion() {
// to. The local DB is special because it is not replicated. See SERVER-10927 for more
// details.
const bool shouldClearNonLocalTmpCollections =
- !(checkIfReplMissingFromCommandLine(&txn) || replSettings.usingReplSets() ||
+ !(checkIfReplMissingFromCommandLine(txn) || replSettings.usingReplSets() ||
replSettings.slave == repl::SimpleSlave);
for (vector<string>::const_iterator i = dbNames.begin(); i != dbNames.end(); ++i) {
const string dbName = *i;
LOG(1) << " Recovering database: " << dbName << endl;
- Database* db = dbHolder().openDb(&txn, dbName);
+ Database* db = dbHolder().openDb(txn, dbName);
invariant(db);
// First thing after opening the database is to check for file compatibility,
// otherwise we might crash if this is a deprecated format.
- if (!db->getDatabaseCatalogEntry()->currentFilesCompatible(&txn)) {
+ if (!db->getDatabaseCatalogEntry()->currentFilesCompatible(txn)) {
log() << "****";
log() << "cannot do this upgrade without an upgrade in the middle";
log() << "please do a --repair with 2.6 and then start this version";
@@ -347,7 +346,7 @@ static void repairDatabasesAndCheckVersion() {
Collection* coll = db->getCollection(systemIndexes);
unique_ptr<PlanExecutor> exec(
- InternalPlanner::collectionScan(&txn, systemIndexes, coll, PlanExecutor::YIELD_MANUAL));
+ InternalPlanner::collectionScan(txn, systemIndexes, coll, PlanExecutor::YIELD_MANUAL));
BSONObj index;
PlanExecutor::ExecState state;
@@ -355,7 +354,7 @@ static void repairDatabasesAndCheckVersion() {
const BSONObj key = index.getObjectField("key");
const string plugin = IndexNames::findPluginName(key);
- if (db->getDatabaseCatalogEntry()->isOlderThan24(&txn)) {
+ if (db->getDatabaseCatalogEntry()->isOlderThan24(txn)) {
if (IndexNames::existedBefore24(plugin)) {
continue;
}
@@ -390,11 +389,11 @@ static void repairDatabasesAndCheckVersion() {
if (replSettings.usingReplSets()) {
// We only care about the _id index if we are in a replset
- checkForIdIndexes(&txn, db);
+ checkForIdIndexes(txn, db);
}
if (shouldClearNonLocalTmpCollections || dbName == "local") {
- db->clearTmpCollections(&txn);
+ db->clearTmpCollections(txn);
}
}
@@ -520,17 +519,16 @@ static void _initAndListen(int listenPort) {
ScriptEngine::setup();
}
- repairDatabasesAndCheckVersion();
+ auto startupOpCtx = getGlobalServiceContext()->makeOperationContext(&cc());
+
+ repairDatabasesAndCheckVersion(startupOpCtx.get());
if (storageGlobalParams.upgrade) {
log() << "finished checking dbs" << endl;
exitCleanly(EXIT_CLEAN);
}
- {
- OperationContextImpl txn;
- uassertStatusOK(getGlobalAuthorizationManager()->initialize(&txn));
- }
+ uassertStatusOK(getGlobalAuthorizationManager()->initialize(startupOpCtx.get()));
/* this is for security on certain platforms (nonce generation) */
srand((unsigned)(curTimeMicros64() ^ startupSrandTimer.micros()));
@@ -546,13 +544,11 @@ static void _initAndListen(int listenPort) {
}
{
- OperationContextImpl txn;
-
#ifndef _WIN32
mongo::signalForkSuccess();
#endif
- Status status = authindex::verifySystemIndexes(&txn);
+ Status status = authindex::verifySystemIndexes(startupOpCtx.get());
if (!status.isOK()) {
log() << status.reason();
exitCleanly(EXIT_NEED_UPGRADE);
@@ -560,8 +556,8 @@ static void _initAndListen(int listenPort) {
// SERVER-14090: Verify that auth schema version is schemaVersion26Final.
int foundSchemaVersion;
- status =
- getGlobalAuthorizationManager()->getAuthorizationVersion(&txn, &foundSchemaVersion);
+ status = getGlobalAuthorizationManager()->getAuthorizationVersion(startupOpCtx.get(),
+ &foundSchemaVersion);
if (!status.isOK()) {
log() << "Auth schema version is incompatible: "
<< "User and role management commands require auth data to have "
@@ -581,11 +577,12 @@ static void _initAndListen(int listenPort) {
getDeleter()->startWorkers();
- restartInProgressIndexesFromLastShutdown(&txn);
+ restartInProgressIndexesFromLastShutdown(startupOpCtx.get());
- repl::getGlobalReplicationCoordinator()->startReplication(&txn);
+ repl::getGlobalReplicationCoordinator()->startReplication(startupOpCtx.get());
- const unsigned long long missingRepl = checkIfReplMissingFromCommandLine(&txn);
+ const unsigned long long missingRepl =
+ checkIfReplMissingFromCommandLine(startupOpCtx.get());
if (missingRepl) {
log() << startupWarningsLog;
log() << "** WARNING: mongod started without --replSet yet " << missingRepl
@@ -608,9 +605,15 @@ static void _initAndListen(int listenPort) {
startFTDC();
- logStartup();
+ if (!repl::getGlobalReplicationCoordinator()->isReplEnabled()) {
+ uassertStatusOK(ShardingStateRecovery::recover(startupOpCtx.get()));
+ }
+
+ logStartup(startupOpCtx.get());
- // MessageServer::run will return when exit code closes its socket
+ // MessageServer::run will return when exit code closes its socket and we don't need the
+ // operation context anymore
+ startupOpCtx.reset();
server->run();
}
diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h
index 2c0d448cf01..2c77bf8ca2d 100644
--- a/src/mongo/db/dbhelpers.h
+++ b/src/mongo/db/dbhelpers.h
@@ -28,7 +28,6 @@
#pragma once
-#include <iosfwd>
#include <memory>
#include <boost/filesystem/path.hpp>
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 4b5364f3424..0692975b668 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -170,6 +170,14 @@ public:
virtual void clearShardingState() = 0;
/**
+ * Called when the instance transitions to primary in order to notify a potentially sharded
+ * host to recover its sharding state.
+ *
+ * Throws on errors.
+ */
+ virtual void recoverShardingState(OperationContext* txn) = 0;
+
+ /**
* Notifies the bgsync and syncSourceFeedback threads to choose a new sync source.
*/
virtual void signalApplierToChooseNewSyncSource() = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 3aa445c029a..6e97d9c4987 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/executor/network_interface.h"
#include "mongo/stdx/functional.h"
@@ -335,6 +336,14 @@ void ReplicationCoordinatorExternalStateImpl::clearShardingState() {
ShardingState::get(getGlobalServiceContext())->clearCollectionMetadata();
}
+void ReplicationCoordinatorExternalStateImpl::recoverShardingState(OperationContext* txn) {
+ uassertStatusOK(ShardingStateRecovery::recover(txn));
+
+ // There is a slight chance that some stale metadata might have been loaded before the latest
+ // optime has been recovered, so throw out everything that we have up to now
+ ShardingState::get(txn)->clearCollectionMetadata();
+}
+
void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() {
BackgroundSync::get()->clearSyncTarget();
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 8dce3dbb632..83d7a3c7d34 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -67,6 +67,7 @@ public:
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* txn);
virtual void clearShardingState();
+ virtual void recoverShardingState(OperationContext* txn);
virtual void signalApplierToChooseNewSyncSource();
virtual void signalApplierToCancelFetcher();
virtual OperationContext* createOperationContext(const std::string& threadName);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index abf8b7ad0b1..86a37f54f18 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -190,6 +190,8 @@ void ReplicationCoordinatorExternalStateMock::killAllUserOperations(OperationCon
void ReplicationCoordinatorExternalStateMock::clearShardingState() {}
+void ReplicationCoordinatorExternalStateMock::recoverShardingState(OperationContext* txn) {}
+
void ReplicationCoordinatorExternalStateMock::signalApplierToChooseNewSyncSource() {}
void ReplicationCoordinatorExternalStateMock::signalApplierToCancelFetcher() {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 58d66934e77..e6a259ca0ea 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -70,6 +70,7 @@ public:
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* txn);
virtual void clearShardingState();
+ virtual void recoverShardingState(OperationContext* txn);
virtual void signalApplierToChooseNewSyncSource();
virtual void signalApplierToCancelFetcher();
virtual OperationContext* createOperationContext(const std::string& threadName);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a6da1e44edf..8e071e9a496 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -617,8 +617,12 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
return;
}
lk.unlock();
+
+ _externalState->recoverShardingState(txn);
+
ScopedTransaction transaction(txn, MODE_X);
Lock::GlobalWrite globalWriteLock(txn->lockState());
+
lk.lock();
if (!_isWaitingForDrainToComplete) {
return;
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 71f0a17d745..59ff7c88904 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -27,6 +27,7 @@ env.Library(
'migration_source_manager.cpp',
'sharded_connection_info.cpp',
'sharding_state.cpp',
+ 'sharding_state_recovery.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp
index 7ac6885a8f7..c84cffd98a2 100644
--- a/src/mongo/db/s/migration_impl.cpp
+++ b/src/mongo/db/s/migration_impl.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/operation_shard_version.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/logger/ramlog.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/type_chunk.h"
@@ -272,6 +273,10 @@ Status ChunkMoveOperationState::commitMigration(OperationContext* txn) {
// or CollectionMetadata state.
ShardingState* const shardingState = ShardingState::get(txn);
+ Status startStatus = ShardingStateRecovery::startMetadataOp(txn);
+ if (!startStatus.isOK())
+ return startStatus;
+
shardingState->migrationSourceManager()->setInCriticalSection(true);
const ChunkVersion originalCollVersion = getCollMetadata()->getCollVersion();
@@ -526,6 +531,7 @@ Status ChunkMoveOperationState::commitMigration(OperationContext* txn) {
MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeLeavingCriticalSection);
shardingState->migrationSourceManager()->setInCriticalSection(false);
+ ShardingStateRecovery::endMetadataOp(txn);
// Migration is done, just log some diagnostics information
BSONObj chunkInfo =
diff --git a/src/mongo/db/s/sharding_state_recovery.cpp b/src/mongo/db/s/sharding_state_recovery.cpp
new file mode 100644
index 00000000000..1df5eee4777
--- /dev/null
+++ b/src/mongo/db/s/sharding_state_recovery.cpp
@@ -0,0 +1,297 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/sharding_state_recovery.h"
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/connection_string.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/update_lifecycle_impl.h"
+#include "mongo/db/ops/update_request.h"
+#include "mongo/db/ops/update.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/write_concern.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+const char kRecoveryDocumentId[] = "minOpTimeRecovery";
+const char kConfigsvrConnString[] = "configsvrConnectionString";
+const char kShardName[] = "shardName";
+const char kMinOpTime[] = "minOpTime";
+const char kMinOpTimeUpdaters[] = "minOpTimeUpdaters";
+
+const Seconds kWriteTimeout(15);
+const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
+ WriteConcernOptions::NONE,
+ kWriteTimeout);
+
+/**
+ * Encapsulates the parsing and construction of the config server min opTime recovery document.
+ */
+class RecoveryDocument {
+public:
+ enum ChangeType : int8_t { Increment = 1, Decrement = -1, Clear = 0 };
+
+ static StatusWith<RecoveryDocument> fromBSON(const BSONObj& obj) {
+ RecoveryDocument recDoc;
+
+ {
+ std::string configsvrString;
+
+ Status status = bsonExtractStringField(obj, kConfigsvrConnString, &configsvrString);
+ if (!status.isOK())
+ return status;
+
+ auto configsvrStatus = ConnectionString::parse(configsvrString);
+ if (!configsvrStatus.isOK())
+ return configsvrStatus.getStatus();
+
+ recDoc._configsvr = std::move(configsvrStatus.getValue());
+ }
+
+ Status status = bsonExtractStringField(obj, kShardName, &recDoc._shardName);
+ if (!status.isOK())
+ return status;
+
+ status = bsonExtractOpTimeField(obj, kMinOpTime, &recDoc._minOpTime);
+ if (!status.isOK())
+ return status;
+
+ status = bsonExtractIntegerField(obj, kMinOpTimeUpdaters, &recDoc._minOpTimeUpdaters);
+ if (!status.isOK())
+ return status;
+
+ return recDoc;
+ }
+
+ static BSONObj createChangeObj(ConnectionString configsvr,
+ std::string shardName,
+ repl::OpTime minOpTime,
+ ChangeType change) {
+ BSONObjBuilder cmdBuilder;
+
+ {
+ BSONObjBuilder setBuilder(cmdBuilder.subobjStart("$set"));
+ setBuilder.append(kConfigsvrConnString, configsvr.toString());
+ setBuilder.append(kShardName, shardName);
+ minOpTime.append(&setBuilder, kMinOpTime);
+ }
+
+ if (change == Clear) {
+ cmdBuilder.append("$set", BSON(kMinOpTimeUpdaters << 0));
+ } else {
+ cmdBuilder.append("$inc", BSON(kMinOpTimeUpdaters << change));
+ }
+
+ return cmdBuilder.obj();
+ }
+
+ static BSONObj getQuery() {
+ return BSON("_id" << kRecoveryDocumentId);
+ }
+
+ BSONObj toBSON() const {
+ BSONObjBuilder builder;
+ builder.append("_id", kRecoveryDocumentId);
+ builder.append(kConfigsvrConnString, _configsvr.toString());
+ builder.append(kShardName, _shardName);
+ builder.append(kMinOpTime, _minOpTime.toBSON());
+ builder.append(kMinOpTimeUpdaters, _minOpTimeUpdaters);
+
+ return builder.obj();
+ }
+
+ ConnectionString getConfigsvr() const {
+ return _configsvr;
+ }
+
+ std::string getShardName() const {
+ return _shardName;
+ }
+
+ repl::OpTime getMinOpTime() const {
+ return _minOpTime;
+ }
+
+ int64_t getMinOpTimeUpdaters() const {
+ return _minOpTimeUpdaters;
+ }
+
+private:
+ RecoveryDocument() = default;
+
+ ConnectionString _configsvr;
+ std::string _shardName;
+ repl::OpTime _minOpTime;
+ long long _minOpTimeUpdaters;
+};
+
+/**
+ * This method is the main entry point for updating the sharding state recovery document. The goal
+ * it has is to always move the opTime foward for a currently running server. It achieves this by
+ * serializing the modify calls and reading the current opTime under X-lock on the admin database.
+ */
+Status modifyRecoveryDocument(OperationContext* txn,
+ RecoveryDocument::ChangeType change,
+ const WriteConcernOptions& writeConcern) {
+ try {
+ AutoGetOrCreateDb autoGetOrCreateDb(
+ txn, NamespaceString::kConfigCollectionNamespace.db(), MODE_X);
+
+ BSONObj updateObj = RecoveryDocument::createChangeObj(
+ grid.shardRegistry()->getConfigServerConnectionString(),
+ ShardingState::get(txn)->getShardName(),
+ grid.shardRegistry()->getConfigOpTime(),
+ change);
+
+ LOG(1) << "Changing sharding recovery document " << updateObj;
+
+ OpDebug opDebug;
+ UpdateRequest updateReq(NamespaceString::kConfigCollectionNamespace);
+ updateReq.setQuery(RecoveryDocument::getQuery());
+ updateReq.setUpdates(updateObj);
+ updateReq.setUpsert();
+ UpdateLifecycleImpl updateLifecycle(true, NamespaceString::kConfigCollectionNamespace);
+ updateReq.setLifecycle(&updateLifecycle);
+
+ UpdateResult result = update(txn, autoGetOrCreateDb.getDb(), updateReq, &opDebug);
+ invariant(result.numDocsModified == 1);
+ invariant(result.numMatched <= 1);
+
+ // Wait until the majority write concern has been satisfied
+ WriteConcernResult writeConcernResult;
+ return waitForWriteConcern(txn,
+ repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(),
+ writeConcern,
+ &writeConcernResult);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+}
+
+} // namespace
+
+Status ShardingStateRecovery::startMetadataOp(OperationContext* txn) {
+ Status upsertStatus =
+ modifyRecoveryDocument(txn, RecoveryDocument::Increment, kMajorityWriteConcern);
+
+ if (upsertStatus == ErrorCodes::WriteConcernFailed) {
+ // Couldn't wait for the replication to complete, but the local write was performed. Clear
+ // it up fast (without any waiting for journal or replication) and still treat it as
+ // failure.
+ modifyRecoveryDocument(txn, RecoveryDocument::Decrement, WriteConcernOptions());
+ }
+
+ return upsertStatus;
+}
+
+void ShardingStateRecovery::endMetadataOp(OperationContext* txn) {
+ Status status = modifyRecoveryDocument(txn, RecoveryDocument::Decrement, WriteConcernOptions());
+ if (!status.isOK()) {
+ warning() << "Failed to decrement minOpTimeUpdaters due to " << status;
+ }
+}
+
+Status ShardingStateRecovery::recover(OperationContext* txn) {
+ BSONObj recoveryDocBSON;
+
+ try {
+ AutoGetCollection autoColl(txn, NamespaceString::kConfigCollectionNamespace, MODE_IS);
+ if (!Helpers::findOne(
+ txn, autoColl.getCollection(), RecoveryDocument::getQuery(), recoveryDocBSON)) {
+ return Status::OK();
+ }
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
+ const auto recoveryDocStatus = RecoveryDocument::fromBSON(recoveryDocBSON);
+ if (!recoveryDocStatus.isOK())
+ return recoveryDocStatus.getStatus();
+
+ const auto recoveryDoc = std::move(recoveryDocStatus.getValue());
+
+ log() << "Sharding state recovery process found document " << recoveryDoc.toBSON();
+
+ // Make sure the sharding state is initialized
+ ShardingState* const shardingState = ShardingState::get(txn);
+
+ shardingState->initialize(txn, recoveryDoc.getConfigsvr().toString());
+ shardingState->setShardName(recoveryDoc.getShardName());
+
+ if (!recoveryDoc.getMinOpTimeUpdaters()) {
+ // Treat the minOpTime as up-to-date
+ grid.shardRegistry()->advanceConfigOpTime(recoveryDoc.getMinOpTime());
+ return Status::OK();
+ }
+
+ log() << "Sharding state recovery document indicates there were "
+ << recoveryDoc.getMinOpTimeUpdaters()
+ << " metadata change operations in flight. Contacting the config server primary in order "
+ "to retrieve the most recent opTime.";
+
+ // Need to fetch the latest uptime from the config server, so do a logging write
+ Status status =
+ grid.catalogManager(txn)->logChange(txn,
+ "Sharding recovery thread",
+ "Sharding minOpTime recovery",
+ NamespaceString::kConfigCollectionNamespace.ns(),
+ recoveryDocBSON);
+ if (!status.isOK())
+ return status;
+
+ log() << "Sharding state recovered. New config server opTime is "
+ << grid.shardRegistry()->getConfigOpTime();
+
+ // Finally, clear the recovery document so next time we don't need to recover
+ status = modifyRecoveryDocument(txn, RecoveryDocument::Clear, kMajorityWriteConcern);
+ if (!status.isOK()) {
+ warning() << "Failed to reset sharding state recovery document due to " << status;
+ }
+
+ return Status::OK();
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/s/sharding_state_recovery.h b/src/mongo/db/s/sharding_state_recovery.h
new file mode 100644
index 00000000000..5cf38615385
--- /dev/null
+++ b/src/mongo/db/s/sharding_state_recovery.h
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+namespace mongo {
+
+class OperationContext;
+class Status;
+
+/**
+ * Manages the persistence and recovery of the sharding config metadata's min opTime.
+ *
+ * The opTime recovery document resides in the admin.system.version collection and has the
+ * following format:
+ *
+ * { _id: "minOpTimeRecovery",
+ * configsvrConnectionString: "config/server1:10000,server2:10001,server3:10002",
+ * shardName: "shard0000",
+ * minOpTime: { ts: Timestamp 1443820968000|1, t: 11 },
+ * minOptimeUpdaters: 1 }
+ */
+class ShardingStateRecovery {
+public:
+ /**
+ * Marks the beginning of a sharding metadata operation which requires recovery of the config
+ * server's minOpTime after node failure. It is only safe to commence the operation after this
+ * method returns an OK status.
+ */
+ static Status startMetadataOp(OperationContext* txn);
+
+ /**
+ * Marks the end of a sharding metadata operation, persisting the latest config server opTime at
+ * the time of the call.
+ */
+ static void endMetadataOp(OperationContext* txn);
+
+ /**
+ * Recovers the minimal config server opTime that the instance should be using for reading
+ * sharding metadata so that the instance observes all metadata modifications it did the last
+ * time it was active (or PRIMARY, if replica set).
+ *
+ * NOTE: This method will block until recovery completes.
+ *
+ * Returns OK if the minOpTime was successfully recovered or failure status otherwise. It is
+ * unsafe to read and rely on any sharding metadata before this method has returned success.
+ */
+ static Status recover(OperationContext* txn);
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h
index 280b1c74d0e..2b5b7dcb094 100644
--- a/src/mongo/s/catalog/catalog_manager.h
+++ b/src/mongo/s/catalog/catalog_manager.h
@@ -341,20 +341,20 @@ public:
virtual void logAction(OperationContext* txn, const ActionLogType& actionLog) = 0;
/**
- * Logs a diagnostic event locally and on the config server.
- *
- * NOTE: This method is best effort so it should never throw.
+ * Logs a diagnostic event locally and on the config server. If the config server write fails
+ * for any reason a warning will be written to the local service log and the method will return
+ * a failed status.
*
* @param clientAddress Address of the client that initiated the op that caused this change
* @param what E.g. "split", "migrate"
* @param ns To which collection the metadata change is being applied
* @param detail Additional info about the metadata change (not interpreted)
*/
- virtual void logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) = 0;
+ virtual Status logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) = 0;
/**
* Returns global settings for a certain key.
diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp
index 879a492d6eb..ce7533eceff 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/catalog_manager_mock.cpp
@@ -167,11 +167,13 @@ Status CatalogManagerMock::applyChunkOpsDeprecated(OperationContext* txn,
void CatalogManagerMock::logAction(OperationContext* txn, const ActionLogType& actionLog) {}
-void CatalogManagerMock::logChange(OperationContext* txn,
- const string& clientAddress,
- const string& what,
- const string& ns,
- const BSONObj& detail) {}
+Status CatalogManagerMock::logChange(OperationContext* txn,
+ const string& clientAddress,
+ const string& what,
+ const string& ns,
+ const BSONObj& detail) {
+ return Status::OK();
+}
StatusWith<SettingsType> CatalogManagerMock::getGlobalSettings(OperationContext* txn,
const string& key) {
diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h
index 98350b8abe1..39a0d111545 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.h
+++ b/src/mongo/s/catalog/catalog_manager_mock.h
@@ -128,11 +128,11 @@ public:
void logAction(OperationContext* txn, const ActionLogType& actionLog) override;
- void logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) override;
+ Status logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) override;
StatusWith<SettingsType> getGlobalSettings(OperationContext* txn,
const std::string& key) override;
diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.cpp b/src/mongo/s/catalog/forwarding_catalog_manager.cpp
index b070b688743..10c464f1976 100644
--- a/src/mongo/s/catalog/forwarding_catalog_manager.cpp
+++ b/src/mongo/s/catalog/forwarding_catalog_manager.cpp
@@ -540,16 +540,12 @@ void ForwardingCatalogManager::logAction(OperationContext* txn, const ActionLogT
});
}
-void ForwardingCatalogManager::logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) {
- retry(txn,
- [&] {
- _actual->logChange(txn, clientAddress, what, ns, detail);
- return 1;
- });
+Status ForwardingCatalogManager::logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) {
+ return retry(txn, [&] { return _actual->logChange(txn, clientAddress, what, ns, detail); });
}
StatusWith<SettingsType> ForwardingCatalogManager::getGlobalSettings(OperationContext* txn,
diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.h b/src/mongo/s/catalog/forwarding_catalog_manager.h
index 2e8d30e481b..ed7cb2f82a1 100644
--- a/src/mongo/s/catalog/forwarding_catalog_manager.h
+++ b/src/mongo/s/catalog/forwarding_catalog_manager.h
@@ -200,11 +200,11 @@ private:
void logAction(OperationContext* txn, const ActionLogType& actionLog) override;
- void logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) override;
+ Status logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) override;
StatusWith<SettingsType> getGlobalSettings(OperationContext* txn,
const std::string& key) override;
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
index 74b72e22f95..44c98e2fb65 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
@@ -700,11 +700,11 @@ void CatalogManagerLegacy::logAction(OperationContext* txn, const ActionLogType&
}
}
-void CatalogManagerLegacy::logChange(OperationContext* txn,
- const string& clientAddress,
- const string& what,
- const string& ns,
- const BSONObj& detail) {
+Status CatalogManagerLegacy::logChange(OperationContext* txn,
+ const string& clientAddress,
+ const string& what,
+ const string& ns,
+ const BSONObj& detail) {
// Create the change log collection and ensure that it is capped. Wrap in try/catch,
// because creating an existing collection throws.
if (_changeLogCollectionCreated.load() == 0) {
@@ -721,7 +721,7 @@ void CatalogManagerLegacy::logChange(OperationContext* txn,
LOG(1) << "couldn't create changelog collection: " << ex;
// If we couldn't create the collection don't attempt the insert otherwise we might
// implicitly create the collection without it being capped.
- return;
+ return ex.toStatus();
}
}
}
@@ -751,6 +751,8 @@ void CatalogManagerLegacy::logChange(OperationContext* txn,
warning() << "Error encountered while logging config change with ID "
<< changeLog.getChangeId() << ": " << result;
}
+
+ return result;
}
StatusWith<SettingsType> CatalogManagerLegacy::getGlobalSettings(OperationContext* txn,
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
index a9e530c5fc8..2b7f6c8548c 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
@@ -131,11 +131,11 @@ public:
void logAction(OperationContext* txn, const ActionLogType& actionLog);
- void logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) override;
+ Status logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) override;
StatusWith<SettingsType> getGlobalSettings(OperationContext* txn,
const std::string& key) override;
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index e4e323316d7..e4b43e8e602 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -592,11 +592,11 @@ void CatalogManagerReplicaSet::logAction(OperationContext* txn, const ActionLogT
}
}
-void CatalogManagerReplicaSet::logChange(OperationContext* txn,
- const string& clientAddress,
- const string& what,
- const string& ns,
- const BSONObj& detail) {
+Status CatalogManagerReplicaSet::logChange(OperationContext* txn,
+ const string& clientAddress,
+ const string& what,
+ const string& ns,
+ const BSONObj& detail) {
if (_changeLogCollectionCreated.load() == 0) {
BSONObj createCmd = BSON("create" << ChangeLogType::ConfigNS << "capped" << true << "size"
<< kChangeLogCollectionSize);
@@ -604,7 +604,7 @@ void CatalogManagerReplicaSet::logChange(OperationContext* txn,
grid.shardRegistry()->runCommandOnConfigWithNotMasterRetries("config", createCmd);
if (!result.isOK()) {
LOG(1) << "couldn't create changelog collection: " << causedBy(result.getStatus());
- return;
+ return result.getStatus();
}
Status commandStatus = Command::getStatusFromCommandResult(result.getValue());
@@ -612,12 +612,12 @@ void CatalogManagerReplicaSet::logChange(OperationContext* txn,
_changeLogCollectionCreated.store(1);
} else {
LOG(1) << "couldn't create changelog collection: " << causedBy(commandStatus);
- return;
+ return commandStatus;
}
}
Date_t now = grid.shardRegistry()->getExecutor()->now();
- std::string hostName = grid.shardRegistry()->getNetwork()->getHostName();
+ const std::string hostName = grid.shardRegistry()->getNetwork()->getHostName();
const string changeId = str::stream() << hostName << "-" << now.toString() << "-" << OID::gen();
ChangeLogType changeLog;
@@ -637,6 +637,8 @@ void CatalogManagerReplicaSet::logChange(OperationContext* txn,
warning() << "Error encountered while logging config change with ID " << changeId << ": "
<< result;
}
+
+ return result;
}
StatusWith<SettingsType> CatalogManagerReplicaSet::getGlobalSettings(OperationContext* txn,
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
index aeecb9a5dfd..f5a4e424441 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
@@ -125,11 +125,11 @@ public:
void logAction(OperationContext* txn, const ActionLogType& actionLog) override;
- void logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) override;
+ Status logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) override;
StatusWith<SettingsType> getGlobalSettings(OperationContext* txn,
const std::string& key) override;
diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
index 8f033427b6e..eaf0394350b 100644
--- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
@@ -30,7 +30,6 @@
#include "mongo/platform/basic.h"
-
#include "mongo/db/audit.h"
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/action_type.h"