summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/db.cpp95
-rw-r--r--src/mongo/db/dbhelpers.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h8
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp9
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp4
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/migration_impl.cpp6
-rw-r--r--src/mongo/db/s/sharding_state_recovery.cpp297
-rw-r--r--src/mongo/db/s/sharding_state_recovery.h76
-rw-r--r--src/mongo/s/catalog/catalog_manager.h16
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.cpp12
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.h10
-rw-r--r--src/mongo/s/catalog/forwarding_catalog_manager.cpp16
-rw-r--r--src/mongo/s/catalog/forwarding_catalog_manager.h10
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp14
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.h10
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp18
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.h10
-rw-r--r--src/mongo/s/commands/cluster_move_chunk_cmd.cpp1
22 files changed, 513 insertions, 105 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index e30eb75d384..0ec2df52760 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -50,12 +50,12 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_catalog_entry.h"
+#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog/index_key_validate.h"
#include "mongo/db/client.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/concurrency/d_concurrency.h"
-#include "mongo/db/db.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbmessage.h"
@@ -70,7 +70,7 @@
#include "mongo/db/log_process_details.h"
#include "mongo/db/mongod_options.h"
#include "mongo/db/op_observer.h"
-#include "mongo/db/operation_context_impl.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/range_deleter_service.h"
#include "mongo/db/repair_database.h"
@@ -82,6 +82,7 @@
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/restapi.h"
+#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
@@ -164,10 +165,11 @@ public:
DbResponse dbresponse;
{
- OperationContextImpl txn;
- assembleResponse(&txn, m, dbresponse, port->remote());
- // txn must go out of scope here so that the operation cannot show up in
- // currentOp results after the response reaches the client.
+ auto opCtx = getGlobalServiceContext()->makeOperationContext(&cc());
+ assembleResponse(opCtx.get(), m, dbresponse, port->remote());
+
+ // opCtx must go out of scope here so that the operation cannot show up in currentOp
+ // results after the response reaches the client
}
if (dbresponse.response) {
@@ -191,7 +193,7 @@ public:
b.appendNum(cursorid);
m.appendData(b.buf(), b.len());
b.decouple();
- DEV log() << "exhaust=true sending more" << endl;
+ DEV log() << "exhaust=true sending more";
continue; // this goes back to top loop
}
}
@@ -201,7 +203,7 @@ public:
}
};
-static void logStartup() {
+static void logStartup(OperationContext* txn) {
BSONObjBuilder toLog;
stringstream id;
id << getHostNameCached() << "-" << jsTime().asInt64();
@@ -222,25 +224,23 @@ static void logStartup() {
BSONObj o = toLog.obj();
- OperationContextImpl txn;
-
- ScopedTransaction transaction(&txn, MODE_X);
- Lock::GlobalWrite lk(txn.lockState());
- AutoGetOrCreateDb autoDb(&txn, "local", mongo::MODE_X);
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lk(txn->lockState());
+ AutoGetOrCreateDb autoDb(txn, "local", mongo::MODE_X);
Database* db = autoDb.getDb();
const std::string ns = "local.startup_log";
Collection* collection = db->getCollection(ns);
- WriteUnitOfWork wunit(&txn);
+ WriteUnitOfWork wunit(txn);
if (!collection) {
BSONObj options = BSON("capped" << true << "size" << 10 * 1024 * 1024);
- bool shouldReplicateWrites = txn.writesAreReplicated();
- txn.setReplicatedWrites(false);
- ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, &txn, shouldReplicateWrites);
- uassertStatusOK(userCreateNS(&txn, db, ns, options));
+ bool shouldReplicateWrites = txn->writesAreReplicated();
+ txn->setReplicatedWrites(false);
+ ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, txn, shouldReplicateWrites);
+ uassertStatusOK(userCreateNS(txn, db, ns, options));
collection = db->getCollection(ns);
}
invariant(collection);
- uassertStatusOK(collection->insertDocument(&txn, o, false));
+ uassertStatusOK(collection->insertDocument(txn, o, false));
wunit.commit();
}
@@ -293,12 +293,11 @@ static unsigned long long checkIfReplMissingFromCommandLine(OperationContext* tx
return 0;
}
-static void repairDatabasesAndCheckVersion() {
+static void repairDatabasesAndCheckVersion(OperationContext* txn) {
LOG(1) << "enter repairDatabases (to check pdfile version #)" << endl;
- OperationContextImpl txn;
- ScopedTransaction transaction(&txn, MODE_X);
- Lock::GlobalWrite lk(txn.lockState());
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lk(txn->lockState());
vector<string> dbNames;
@@ -311,7 +310,7 @@ static void repairDatabasesAndCheckVersion() {
const string dbName = *i;
LOG(1) << " Repairing database: " << dbName << endl;
- fassert(18506, repairDatabase(&txn, storageEngine, dbName));
+ fassert(18506, repairDatabase(txn, storageEngine, dbName));
}
}
@@ -322,19 +321,19 @@ static void repairDatabasesAndCheckVersion() {
// to. The local DB is special because it is not replicated. See SERVER-10927 for more
// details.
const bool shouldClearNonLocalTmpCollections =
- !(checkIfReplMissingFromCommandLine(&txn) || replSettings.usingReplSets() ||
+ !(checkIfReplMissingFromCommandLine(txn) || replSettings.usingReplSets() ||
replSettings.slave == repl::SimpleSlave);
for (vector<string>::const_iterator i = dbNames.begin(); i != dbNames.end(); ++i) {
const string dbName = *i;
LOG(1) << " Recovering database: " << dbName << endl;
- Database* db = dbHolder().openDb(&txn, dbName);
+ Database* db = dbHolder().openDb(txn, dbName);
invariant(db);
// First thing after opening the database is to check for file compatibility,
// otherwise we might crash if this is a deprecated format.
- if (!db->getDatabaseCatalogEntry()->currentFilesCompatible(&txn)) {
+ if (!db->getDatabaseCatalogEntry()->currentFilesCompatible(txn)) {
log() << "****";
log() << "cannot do this upgrade without an upgrade in the middle";
log() << "please do a --repair with 2.6 and then start this version";
@@ -347,7 +346,7 @@ static void repairDatabasesAndCheckVersion() {
Collection* coll = db->getCollection(systemIndexes);
unique_ptr<PlanExecutor> exec(
- InternalPlanner::collectionScan(&txn, systemIndexes, coll, PlanExecutor::YIELD_MANUAL));
+ InternalPlanner::collectionScan(txn, systemIndexes, coll, PlanExecutor::YIELD_MANUAL));
BSONObj index;
PlanExecutor::ExecState state;
@@ -355,7 +354,7 @@ static void repairDatabasesAndCheckVersion() {
const BSONObj key = index.getObjectField("key");
const string plugin = IndexNames::findPluginName(key);
- if (db->getDatabaseCatalogEntry()->isOlderThan24(&txn)) {
+ if (db->getDatabaseCatalogEntry()->isOlderThan24(txn)) {
if (IndexNames::existedBefore24(plugin)) {
continue;
}
@@ -390,11 +389,11 @@ static void repairDatabasesAndCheckVersion() {
if (replSettings.usingReplSets()) {
// We only care about the _id index if we are in a replset
- checkForIdIndexes(&txn, db);
+ checkForIdIndexes(txn, db);
}
if (shouldClearNonLocalTmpCollections || dbName == "local") {
- db->clearTmpCollections(&txn);
+ db->clearTmpCollections(txn);
}
}
@@ -520,17 +519,16 @@ static void _initAndListen(int listenPort) {
ScriptEngine::setup();
}
- repairDatabasesAndCheckVersion();
+ auto startupOpCtx = getGlobalServiceContext()->makeOperationContext(&cc());
+
+ repairDatabasesAndCheckVersion(startupOpCtx.get());
if (storageGlobalParams.upgrade) {
log() << "finished checking dbs" << endl;
exitCleanly(EXIT_CLEAN);
}
- {
- OperationContextImpl txn;
- uassertStatusOK(getGlobalAuthorizationManager()->initialize(&txn));
- }
+ uassertStatusOK(getGlobalAuthorizationManager()->initialize(startupOpCtx.get()));
/* this is for security on certain platforms (nonce generation) */
srand((unsigned)(curTimeMicros64() ^ startupSrandTimer.micros()));
@@ -546,13 +544,11 @@ static void _initAndListen(int listenPort) {
}
{
- OperationContextImpl txn;
-
#ifndef _WIN32
mongo::signalForkSuccess();
#endif
- Status status = authindex::verifySystemIndexes(&txn);
+ Status status = authindex::verifySystemIndexes(startupOpCtx.get());
if (!status.isOK()) {
log() << status.reason();
exitCleanly(EXIT_NEED_UPGRADE);
@@ -560,8 +556,8 @@ static void _initAndListen(int listenPort) {
// SERVER-14090: Verify that auth schema version is schemaVersion26Final.
int foundSchemaVersion;
- status =
- getGlobalAuthorizationManager()->getAuthorizationVersion(&txn, &foundSchemaVersion);
+ status = getGlobalAuthorizationManager()->getAuthorizationVersion(startupOpCtx.get(),
+ &foundSchemaVersion);
if (!status.isOK()) {
log() << "Auth schema version is incompatible: "
<< "User and role management commands require auth data to have "
@@ -581,11 +577,12 @@ static void _initAndListen(int listenPort) {
getDeleter()->startWorkers();
- restartInProgressIndexesFromLastShutdown(&txn);
+ restartInProgressIndexesFromLastShutdown(startupOpCtx.get());
- repl::getGlobalReplicationCoordinator()->startReplication(&txn);
+ repl::getGlobalReplicationCoordinator()->startReplication(startupOpCtx.get());
- const unsigned long long missingRepl = checkIfReplMissingFromCommandLine(&txn);
+ const unsigned long long missingRepl =
+ checkIfReplMissingFromCommandLine(startupOpCtx.get());
if (missingRepl) {
log() << startupWarningsLog;
log() << "** WARNING: mongod started without --replSet yet " << missingRepl
@@ -608,9 +605,15 @@ static void _initAndListen(int listenPort) {
startFTDC();
- logStartup();
+ if (!repl::getGlobalReplicationCoordinator()->isReplEnabled()) {
+ uassertStatusOK(ShardingStateRecovery::recover(startupOpCtx.get()));
+ }
+
+ logStartup(startupOpCtx.get());
- // MessageServer::run will return when exit code closes its socket
+ // MessageServer::run will return when exit code closes its socket and we don't need the
+ // operation context anymore
+ startupOpCtx.reset();
server->run();
}
diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h
index 2c0d448cf01..2c77bf8ca2d 100644
--- a/src/mongo/db/dbhelpers.h
+++ b/src/mongo/db/dbhelpers.h
@@ -28,7 +28,6 @@
#pragma once
-#include <iosfwd>
#include <memory>
#include <boost/filesystem/path.hpp>
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 4b5364f3424..0692975b668 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -170,6 +170,14 @@ public:
virtual void clearShardingState() = 0;
/**
+ * Called when the instance transitions to primary in order to notify a potentially sharded
+ * host to recover its sharding state.
+ *
+ * Throws on errors.
+ */
+ virtual void recoverShardingState(OperationContext* txn) = 0;
+
+ /**
* Notifies the bgsync and syncSourceFeedback threads to choose a new sync source.
*/
virtual void signalApplierToChooseNewSyncSource() = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 3aa445c029a..6e97d9c4987 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/executor/network_interface.h"
#include "mongo/stdx/functional.h"
@@ -335,6 +336,14 @@ void ReplicationCoordinatorExternalStateImpl::clearShardingState() {
ShardingState::get(getGlobalServiceContext())->clearCollectionMetadata();
}
+void ReplicationCoordinatorExternalStateImpl::recoverShardingState(OperationContext* txn) {
+ uassertStatusOK(ShardingStateRecovery::recover(txn));
+
+ // There is a slight chance that some stale metadata might have been loaded before the latest
+ // optime has been recovered, so throw out everything that we have up to now
+ ShardingState::get(txn)->clearCollectionMetadata();
+}
+
void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() {
BackgroundSync::get()->clearSyncTarget();
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 8dce3dbb632..83d7a3c7d34 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -67,6 +67,7 @@ public:
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* txn);
virtual void clearShardingState();
+ virtual void recoverShardingState(OperationContext* txn);
virtual void signalApplierToChooseNewSyncSource();
virtual void signalApplierToCancelFetcher();
virtual OperationContext* createOperationContext(const std::string& threadName);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index abf8b7ad0b1..86a37f54f18 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -190,6 +190,8 @@ void ReplicationCoordinatorExternalStateMock::killAllUserOperations(OperationCon
void ReplicationCoordinatorExternalStateMock::clearShardingState() {}
+void ReplicationCoordinatorExternalStateMock::recoverShardingState(OperationContext* txn) {}
+
void ReplicationCoordinatorExternalStateMock::signalApplierToChooseNewSyncSource() {}
void ReplicationCoordinatorExternalStateMock::signalApplierToCancelFetcher() {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 58d66934e77..e6a259ca0ea 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -70,6 +70,7 @@ public:
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* txn);
virtual void clearShardingState();
+ virtual void recoverShardingState(OperationContext* txn);
virtual void signalApplierToChooseNewSyncSource();
virtual void signalApplierToCancelFetcher();
virtual OperationContext* createOperationContext(const std::string& threadName);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a6da1e44edf..8e071e9a496 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -617,8 +617,12 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
return;
}
lk.unlock();
+
+ _externalState->recoverShardingState(txn);
+
ScopedTransaction transaction(txn, MODE_X);
Lock::GlobalWrite globalWriteLock(txn->lockState());
+
lk.lock();
if (!_isWaitingForDrainToComplete) {
return;
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 71f0a17d745..59ff7c88904 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -27,6 +27,7 @@ env.Library(
'migration_source_manager.cpp',
'sharded_connection_info.cpp',
'sharding_state.cpp',
+ 'sharding_state_recovery.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp
index 7ac6885a8f7..c84cffd98a2 100644
--- a/src/mongo/db/s/migration_impl.cpp
+++ b/src/mongo/db/s/migration_impl.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/operation_shard_version.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/logger/ramlog.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/type_chunk.h"
@@ -272,6 +273,10 @@ Status ChunkMoveOperationState::commitMigration(OperationContext* txn) {
// or CollectionMetadata state.
ShardingState* const shardingState = ShardingState::get(txn);
+ Status startStatus = ShardingStateRecovery::startMetadataOp(txn);
+ if (!startStatus.isOK())
+ return startStatus;
+
shardingState->migrationSourceManager()->setInCriticalSection(true);
const ChunkVersion originalCollVersion = getCollMetadata()->getCollVersion();
@@ -526,6 +531,7 @@ Status ChunkMoveOperationState::commitMigration(OperationContext* txn) {
MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeLeavingCriticalSection);
shardingState->migrationSourceManager()->setInCriticalSection(false);
+ ShardingStateRecovery::endMetadataOp(txn);
// Migration is done, just log some diagnostics information
BSONObj chunkInfo =
diff --git a/src/mongo/db/s/sharding_state_recovery.cpp b/src/mongo/db/s/sharding_state_recovery.cpp
new file mode 100644
index 00000000000..1df5eee4777
--- /dev/null
+++ b/src/mongo/db/s/sharding_state_recovery.cpp
@@ -0,0 +1,297 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/sharding_state_recovery.h"
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/connection_string.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/update_lifecycle_impl.h"
+#include "mongo/db/ops/update_request.h"
+#include "mongo/db/ops/update.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/write_concern.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+const char kRecoveryDocumentId[] = "minOpTimeRecovery";
+const char kConfigsvrConnString[] = "configsvrConnectionString";
+const char kShardName[] = "shardName";
+const char kMinOpTime[] = "minOpTime";
+const char kMinOpTimeUpdaters[] = "minOpTimeUpdaters";
+
+const Seconds kWriteTimeout(15);
+const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
+ WriteConcernOptions::NONE,
+ kWriteTimeout);
+
+/**
+ * Encapsulates the parsing and construction of the config server min opTime recovery document.
+ */
+class RecoveryDocument {
+public:
+ enum ChangeType : int8_t { Increment = 1, Decrement = -1, Clear = 0 };
+
+ static StatusWith<RecoveryDocument> fromBSON(const BSONObj& obj) {
+ RecoveryDocument recDoc;
+
+ {
+ std::string configsvrString;
+
+ Status status = bsonExtractStringField(obj, kConfigsvrConnString, &configsvrString);
+ if (!status.isOK())
+ return status;
+
+ auto configsvrStatus = ConnectionString::parse(configsvrString);
+ if (!configsvrStatus.isOK())
+ return configsvrStatus.getStatus();
+
+ recDoc._configsvr = std::move(configsvrStatus.getValue());
+ }
+
+ Status status = bsonExtractStringField(obj, kShardName, &recDoc._shardName);
+ if (!status.isOK())
+ return status;
+
+ status = bsonExtractOpTimeField(obj, kMinOpTime, &recDoc._minOpTime);
+ if (!status.isOK())
+ return status;
+
+ status = bsonExtractIntegerField(obj, kMinOpTimeUpdaters, &recDoc._minOpTimeUpdaters);
+ if (!status.isOK())
+ return status;
+
+ return recDoc;
+ }
+
+ static BSONObj createChangeObj(ConnectionString configsvr,
+ std::string shardName,
+ repl::OpTime minOpTime,
+ ChangeType change) {
+ BSONObjBuilder cmdBuilder;
+
+ {
+ BSONObjBuilder setBuilder(cmdBuilder.subobjStart("$set"));
+ setBuilder.append(kConfigsvrConnString, configsvr.toString());
+ setBuilder.append(kShardName, shardName);
+ minOpTime.append(&setBuilder, kMinOpTime);
+ }
+
+ if (change == Clear) {
+ cmdBuilder.append("$set", BSON(kMinOpTimeUpdaters << 0));
+ } else {
+ cmdBuilder.append("$inc", BSON(kMinOpTimeUpdaters << change));
+ }
+
+ return cmdBuilder.obj();
+ }
+
+ static BSONObj getQuery() {
+ return BSON("_id" << kRecoveryDocumentId);
+ }
+
+ BSONObj toBSON() const {
+ BSONObjBuilder builder;
+ builder.append("_id", kRecoveryDocumentId);
+ builder.append(kConfigsvrConnString, _configsvr.toString());
+ builder.append(kShardName, _shardName);
+ builder.append(kMinOpTime, _minOpTime.toBSON());
+ builder.append(kMinOpTimeUpdaters, _minOpTimeUpdaters);
+
+ return builder.obj();
+ }
+
+ ConnectionString getConfigsvr() const {
+ return _configsvr;
+ }
+
+ std::string getShardName() const {
+ return _shardName;
+ }
+
+ repl::OpTime getMinOpTime() const {
+ return _minOpTime;
+ }
+
+ int64_t getMinOpTimeUpdaters() const {
+ return _minOpTimeUpdaters;
+ }
+
+private:
+ RecoveryDocument() = default;
+
+ ConnectionString _configsvr;
+ std::string _shardName;
+ repl::OpTime _minOpTime;
+ long long _minOpTimeUpdaters;
+};
+
+/**
+ * This method is the main entry point for updating the sharding state recovery document. The goal
+ * it has is to always move the opTime foward for a currently running server. It achieves this by
+ * serializing the modify calls and reading the current opTime under X-lock on the admin database.
+ */
+Status modifyRecoveryDocument(OperationContext* txn,
+ RecoveryDocument::ChangeType change,
+ const WriteConcernOptions& writeConcern) {
+ try {
+ AutoGetOrCreateDb autoGetOrCreateDb(
+ txn, NamespaceString::kConfigCollectionNamespace.db(), MODE_X);
+
+ BSONObj updateObj = RecoveryDocument::createChangeObj(
+ grid.shardRegistry()->getConfigServerConnectionString(),
+ ShardingState::get(txn)->getShardName(),
+ grid.shardRegistry()->getConfigOpTime(),
+ change);
+
+ LOG(1) << "Changing sharding recovery document " << updateObj;
+
+ OpDebug opDebug;
+ UpdateRequest updateReq(NamespaceString::kConfigCollectionNamespace);
+ updateReq.setQuery(RecoveryDocument::getQuery());
+ updateReq.setUpdates(updateObj);
+ updateReq.setUpsert();
+ UpdateLifecycleImpl updateLifecycle(true, NamespaceString::kConfigCollectionNamespace);
+ updateReq.setLifecycle(&updateLifecycle);
+
+ UpdateResult result = update(txn, autoGetOrCreateDb.getDb(), updateReq, &opDebug);
+ invariant(result.numDocsModified == 1);
+ invariant(result.numMatched <= 1);
+
+ // Wait until the majority write concern has been satisfied
+ WriteConcernResult writeConcernResult;
+ return waitForWriteConcern(txn,
+ repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(),
+ writeConcern,
+ &writeConcernResult);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+}
+
+} // namespace
+
+Status ShardingStateRecovery::startMetadataOp(OperationContext* txn) {
+ Status upsertStatus =
+ modifyRecoveryDocument(txn, RecoveryDocument::Increment, kMajorityWriteConcern);
+
+ if (upsertStatus == ErrorCodes::WriteConcernFailed) {
+ // Couldn't wait for the replication to complete, but the local write was performed. Clear
+ // it up fast (without any waiting for journal or replication) and still treat it as
+ // failure.
+ modifyRecoveryDocument(txn, RecoveryDocument::Decrement, WriteConcernOptions());
+ }
+
+ return upsertStatus;
+}
+
+void ShardingStateRecovery::endMetadataOp(OperationContext* txn) {
+ Status status = modifyRecoveryDocument(txn, RecoveryDocument::Decrement, WriteConcernOptions());
+ if (!status.isOK()) {
+ warning() << "Failed to decrement minOpTimeUpdaters due to " << status;
+ }
+}
+
+Status ShardingStateRecovery::recover(OperationContext* txn) {
+ BSONObj recoveryDocBSON;
+
+ try {
+ AutoGetCollection autoColl(txn, NamespaceString::kConfigCollectionNamespace, MODE_IS);
+ if (!Helpers::findOne(
+ txn, autoColl.getCollection(), RecoveryDocument::getQuery(), recoveryDocBSON)) {
+ return Status::OK();
+ }
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
+ const auto recoveryDocStatus = RecoveryDocument::fromBSON(recoveryDocBSON);
+ if (!recoveryDocStatus.isOK())
+ return recoveryDocStatus.getStatus();
+
+ const auto recoveryDoc = std::move(recoveryDocStatus.getValue());
+
+ log() << "Sharding state recovery process found document " << recoveryDoc.toBSON();
+
+ // Make sure the sharding state is initialized
+ ShardingState* const shardingState = ShardingState::get(txn);
+
+ shardingState->initialize(txn, recoveryDoc.getConfigsvr().toString());
+ shardingState->setShardName(recoveryDoc.getShardName());
+
+ if (!recoveryDoc.getMinOpTimeUpdaters()) {
+ // Treat the minOpTime as up-to-date
+ grid.shardRegistry()->advanceConfigOpTime(recoveryDoc.getMinOpTime());
+ return Status::OK();
+ }
+
+ log() << "Sharding state recovery document indicates there were "
+ << recoveryDoc.getMinOpTimeUpdaters()
+ << " metadata change operations in flight. Contacting the config server primary in order "
+ "to retrieve the most recent opTime.";
+
+ // Need to fetch the latest uptime from the config server, so do a logging write
+ Status status =
+ grid.catalogManager(txn)->logChange(txn,
+ "Sharding recovery thread",
+ "Sharding minOpTime recovery",
+ NamespaceString::kConfigCollectionNamespace.ns(),
+ recoveryDocBSON);
+ if (!status.isOK())
+ return status;
+
+ log() << "Sharding state recovered. New config server opTime is "
+ << grid.shardRegistry()->getConfigOpTime();
+
+ // Finally, clear the recovery document so next time we don't need to recover
+ status = modifyRecoveryDocument(txn, RecoveryDocument::Clear, kMajorityWriteConcern);
+ if (!status.isOK()) {
+ warning() << "Failed to reset sharding state recovery document due to " << status;
+ }
+
+ return Status::OK();
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/s/sharding_state_recovery.h b/src/mongo/db/s/sharding_state_recovery.h
new file mode 100644
index 00000000000..5cf38615385
--- /dev/null
+++ b/src/mongo/db/s/sharding_state_recovery.h
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+namespace mongo {
+
+class OperationContext;
+class Status;
+
+/**
+ * Manages the persistence and recovery of the sharding config metadata's min opTime.
+ *
+ * The opTime recovery document resides in the admin.system.version collection and has the
+ * following format:
+ *
+ * { _id: "minOpTimeRecovery",
+ * configsvrConnectionString: "config/server1:10000,server2:10001,server3:10002",
+ * shardName: "shard0000",
+ * minOpTime: { ts: Timestamp 1443820968000|1, t: 11 },
+ * minOptimeUpdaters: 1 }
+ */
+class ShardingStateRecovery {
+public:
+ /**
+ * Marks the beginning of a sharding metadata operation which requires recovery of the config
+ * server's minOpTime after node failure. It is only safe to commence the operation after this
+ * method returns an OK status.
+ */
+ static Status startMetadataOp(OperationContext* txn);
+
+ /**
+ * Marks the end of a sharding metadata operation, persisting the latest config server opTime at
+ * the time of the call.
+ */
+ static void endMetadataOp(OperationContext* txn);
+
+ /**
+ * Recovers the minimal config server opTime that the instance should be using for reading
+ * sharding metadata so that the instance observes all metadata modifications it did the last
+ * time it was active (or PRIMARY, if replica set).
+ *
+ * NOTE: This method will block until recovery completes.
+ *
+ * Returns OK if the minOpTime was successfully recovered or failure status otherwise. It is
+ * unsafe to read and rely on any sharding metadata before this method has returned success.
+ */
+ static Status recover(OperationContext* txn);
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h
index 280b1c74d0e..2b5b7dcb094 100644
--- a/src/mongo/s/catalog/catalog_manager.h
+++ b/src/mongo/s/catalog/catalog_manager.h
@@ -341,20 +341,20 @@ public:
virtual void logAction(OperationContext* txn, const ActionLogType& actionLog) = 0;
/**
- * Logs a diagnostic event locally and on the config server.
- *
- * NOTE: This method is best effort so it should never throw.
+ * Logs a diagnostic event locally and on the config server. If the config server write fails
+ * for any reason a warning will be written to the local service log and the method will return
+ * a failed status.
*
* @param clientAddress Address of the client that initiated the op that caused this change
* @param what E.g. "split", "migrate"
* @param ns To which collection the metadata change is being applied
* @param detail Additional info about the metadata change (not interpreted)
*/
- virtual void logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) = 0;
+ virtual Status logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) = 0;
/**
* Returns global settings for a certain key.
diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp
index 879a492d6eb..ce7533eceff 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/catalog_manager_mock.cpp
@@ -167,11 +167,13 @@ Status CatalogManagerMock::applyChunkOpsDeprecated(OperationContext* txn,
void CatalogManagerMock::logAction(OperationContext* txn, const ActionLogType& actionLog) {}
-void CatalogManagerMock::logChange(OperationContext* txn,
- const string& clientAddress,
- const string& what,
- const string& ns,
- const BSONObj& detail) {}
+Status CatalogManagerMock::logChange(OperationContext* txn,
+ const string& clientAddress,
+ const string& what,
+ const string& ns,
+ const BSONObj& detail) {
+ return Status::OK();
+}
StatusWith<SettingsType> CatalogManagerMock::getGlobalSettings(OperationContext* txn,
const string& key) {
diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h
index 98350b8abe1..39a0d111545 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.h
+++ b/src/mongo/s/catalog/catalog_manager_mock.h
@@ -128,11 +128,11 @@ public:
void logAction(OperationContext* txn, const ActionLogType& actionLog) override;
- void logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) override;
+ Status logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) override;
StatusWith<SettingsType> getGlobalSettings(OperationContext* txn,
const std::string& key) override;
diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.cpp b/src/mongo/s/catalog/forwarding_catalog_manager.cpp
index b070b688743..10c464f1976 100644
--- a/src/mongo/s/catalog/forwarding_catalog_manager.cpp
+++ b/src/mongo/s/catalog/forwarding_catalog_manager.cpp
@@ -540,16 +540,12 @@ void ForwardingCatalogManager::logAction(OperationContext* txn, const ActionLogT
});
}
-void ForwardingCatalogManager::logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) {
- retry(txn,
- [&] {
- _actual->logChange(txn, clientAddress, what, ns, detail);
- return 1;
- });
+Status ForwardingCatalogManager::logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) {
+ return retry(txn, [&] { return _actual->logChange(txn, clientAddress, what, ns, detail); });
}
StatusWith<SettingsType> ForwardingCatalogManager::getGlobalSettings(OperationContext* txn,
diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.h b/src/mongo/s/catalog/forwarding_catalog_manager.h
index 2e8d30e481b..ed7cb2f82a1 100644
--- a/src/mongo/s/catalog/forwarding_catalog_manager.h
+++ b/src/mongo/s/catalog/forwarding_catalog_manager.h
@@ -200,11 +200,11 @@ private:
void logAction(OperationContext* txn, const ActionLogType& actionLog) override;
- void logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) override;
+ Status logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) override;
StatusWith<SettingsType> getGlobalSettings(OperationContext* txn,
const std::string& key) override;
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
index 74b72e22f95..44c98e2fb65 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
@@ -700,11 +700,11 @@ void CatalogManagerLegacy::logAction(OperationContext* txn, const ActionLogType&
}
}
-void CatalogManagerLegacy::logChange(OperationContext* txn,
- const string& clientAddress,
- const string& what,
- const string& ns,
- const BSONObj& detail) {
+Status CatalogManagerLegacy::logChange(OperationContext* txn,
+ const string& clientAddress,
+ const string& what,
+ const string& ns,
+ const BSONObj& detail) {
// Create the change log collection and ensure that it is capped. Wrap in try/catch,
// because creating an existing collection throws.
if (_changeLogCollectionCreated.load() == 0) {
@@ -721,7 +721,7 @@ void CatalogManagerLegacy::logChange(OperationContext* txn,
LOG(1) << "couldn't create changelog collection: " << ex;
// If we couldn't create the collection don't attempt the insert otherwise we might
// implicitly create the collection without it being capped.
- return;
+ return ex.toStatus();
}
}
}
@@ -751,6 +751,8 @@ void CatalogManagerLegacy::logChange(OperationContext* txn,
warning() << "Error encountered while logging config change with ID "
<< changeLog.getChangeId() << ": " << result;
}
+
+ return result;
}
StatusWith<SettingsType> CatalogManagerLegacy::getGlobalSettings(OperationContext* txn,
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
index a9e530c5fc8..2b7f6c8548c 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
@@ -131,11 +131,11 @@ public:
void logAction(OperationContext* txn, const ActionLogType& actionLog);
- void logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) override;
+ Status logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) override;
StatusWith<SettingsType> getGlobalSettings(OperationContext* txn,
const std::string& key) override;
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index e4e323316d7..e4b43e8e602 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -592,11 +592,11 @@ void CatalogManagerReplicaSet::logAction(OperationContext* txn, const ActionLogT
}
}
-void CatalogManagerReplicaSet::logChange(OperationContext* txn,
- const string& clientAddress,
- const string& what,
- const string& ns,
- const BSONObj& detail) {
+Status CatalogManagerReplicaSet::logChange(OperationContext* txn,
+ const string& clientAddress,
+ const string& what,
+ const string& ns,
+ const BSONObj& detail) {
if (_changeLogCollectionCreated.load() == 0) {
BSONObj createCmd = BSON("create" << ChangeLogType::ConfigNS << "capped" << true << "size"
<< kChangeLogCollectionSize);
@@ -604,7 +604,7 @@ void CatalogManagerReplicaSet::logChange(OperationContext* txn,
grid.shardRegistry()->runCommandOnConfigWithNotMasterRetries("config", createCmd);
if (!result.isOK()) {
LOG(1) << "couldn't create changelog collection: " << causedBy(result.getStatus());
- return;
+ return result.getStatus();
}
Status commandStatus = Command::getStatusFromCommandResult(result.getValue());
@@ -612,12 +612,12 @@ void CatalogManagerReplicaSet::logChange(OperationContext* txn,
_changeLogCollectionCreated.store(1);
} else {
LOG(1) << "couldn't create changelog collection: " << causedBy(commandStatus);
- return;
+ return commandStatus;
}
}
Date_t now = grid.shardRegistry()->getExecutor()->now();
- std::string hostName = grid.shardRegistry()->getNetwork()->getHostName();
+ const std::string hostName = grid.shardRegistry()->getNetwork()->getHostName();
const string changeId = str::stream() << hostName << "-" << now.toString() << "-" << OID::gen();
ChangeLogType changeLog;
@@ -637,6 +637,8 @@ void CatalogManagerReplicaSet::logChange(OperationContext* txn,
warning() << "Error encountered while logging config change with ID " << changeId << ": "
<< result;
}
+
+ return result;
}
StatusWith<SettingsType> CatalogManagerReplicaSet::getGlobalSettings(OperationContext* txn,
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
index aeecb9a5dfd..f5a4e424441 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
@@ -125,11 +125,11 @@ public:
void logAction(OperationContext* txn, const ActionLogType& actionLog) override;
- void logChange(OperationContext* txn,
- const std::string& clientAddress,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) override;
+ Status logChange(OperationContext* txn,
+ const std::string& clientAddress,
+ const std::string& what,
+ const std::string& ns,
+ const BSONObj& detail) override;
StatusWith<SettingsType> getGlobalSettings(OperationContext* txn,
const std::string& key) override;
diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
index 8f033427b6e..eaf0394350b 100644
--- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
@@ -30,7 +30,6 @@
#include "mongo/platform/basic.h"
-
#include "mongo/db/audit.h"
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/action_type.h"