diff options
23 files changed, 531 insertions, 121 deletions
diff --git a/jstests/sharding/sharding_state_after_stepdown.js b/jstests/sharding/sharding_state_after_stepdown.js index a93578ecfa8..07a0eb45109 100644 --- a/jstests/sharding/sharding_state_after_stepdown.js +++ b/jstests/sharding/sharding_state_after_stepdown.js @@ -1,11 +1,14 @@ -// // Tests the state of sharding data after a replica set reconfig -// -var options = { rs : true, rsOptions : { nodes : 1 } }; +(function() { -var st = new ShardingTest({ shards : 2, mongos : 1, other : options }); -st.stopBalancer(); +var st = new ShardingTest({ shards: 2, + mongos: 1, + other: { + rs: true, + rsOptions: { nodes : 1 } + } + }); var mongos = st.s0; var admin = mongos.getDB("admin"); @@ -30,7 +33,6 @@ st.printShardingStatus(); // Restart both primaries to reset our sharding data var restartPrimaries = function() { - var rs0Primary = st.rs0.getPrimary(); var rs1Primary = st.rs1.getPrimary(); @@ -47,20 +49,18 @@ var restartPrimaries = function() { restartPrimaries(); -// -// -// No sharding data until shards are hit by a query -assert.eq("", - st.rs0.getPrimary().adminCommand({ getShardVersion : coll.toString() }).configServer); +// Sharding data gets initialized either when shards are hit by an unsharded query or if some +// metadata operation was run before the step down, which wrote a minOpTime recovery record. In +// this case we did a moveChunk above from shard0 to shard1, so we will have this record on +// shard0. +assert.neq("", + st.rs0.getPrimary().adminCommand({ getShardVersion : coll.toString() }).configServer); assert.eq("", st.rs1.getPrimary().adminCommand({ getShardVersion : coll.toString() }).configServer); -// -// -// Sharding data initialized when shards are hit by an unsharded query +// Doing a find only accesses the primary (rs0), which is already recovered. Ensure that the +// secondary still has no sharding knowledge. assert.neq(null, coll.findOne({})); -assert.neq("", - st.rs0.getPrimary().adminCommand({ getShardVersion : coll.toString() }).configServer); assert.eq("", st.rs1.getPrimary().adminCommand({ getShardVersion : coll.toString() }).configServer); @@ -149,3 +149,5 @@ assert.neq({}, jsTest.log( "DONE!" ); st.stop(); + +})(); diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index e30eb75d384..0ec2df52760 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -50,12 +50,12 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_catalog_entry.h" +#include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog/index_key_validate.h" #include "mongo/db/client.h" #include "mongo/db/clientcursor.h" #include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/db.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbmessage.h" @@ -70,7 +70,7 @@ #include "mongo/db/log_process_details.h" #include "mongo/db/mongod_options.h" #include "mongo/db/op_observer.h" -#include "mongo/db/operation_context_impl.h" +#include "mongo/db/operation_context.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/range_deleter_service.h" #include "mongo/db/repair_database.h" @@ -82,6 +82,7 @@ #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/restapi.h" +#include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" @@ -164,10 +165,11 @@ public: DbResponse dbresponse; { - OperationContextImpl txn; - assembleResponse(&txn, m, dbresponse, port->remote()); - // txn must go out of scope here so that the operation cannot show up in - // currentOp results after the response reaches the client. + auto opCtx = getGlobalServiceContext()->makeOperationContext(&cc()); + assembleResponse(opCtx.get(), m, dbresponse, port->remote()); + + // opCtx must go out of scope here so that the operation cannot show up in currentOp + // results after the response reaches the client } if (dbresponse.response) { @@ -191,7 +193,7 @@ public: b.appendNum(cursorid); m.appendData(b.buf(), b.len()); b.decouple(); - DEV log() << "exhaust=true sending more" << endl; + DEV log() << "exhaust=true sending more"; continue; // this goes back to top loop } } @@ -201,7 +203,7 @@ public: } }; -static void logStartup() { +static void logStartup(OperationContext* txn) { BSONObjBuilder toLog; stringstream id; id << getHostNameCached() << "-" << jsTime().asInt64(); @@ -222,25 +224,23 @@ static void logStartup() { BSONObj o = toLog.obj(); - OperationContextImpl txn; - - ScopedTransaction transaction(&txn, MODE_X); - Lock::GlobalWrite lk(txn.lockState()); - AutoGetOrCreateDb autoDb(&txn, "local", mongo::MODE_X); + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite lk(txn->lockState()); + AutoGetOrCreateDb autoDb(txn, "local", mongo::MODE_X); Database* db = autoDb.getDb(); const std::string ns = "local.startup_log"; Collection* collection = db->getCollection(ns); - WriteUnitOfWork wunit(&txn); + WriteUnitOfWork wunit(txn); if (!collection) { BSONObj options = BSON("capped" << true << "size" << 10 * 1024 * 1024); - bool shouldReplicateWrites = txn.writesAreReplicated(); - txn.setReplicatedWrites(false); - ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, &txn, shouldReplicateWrites); - uassertStatusOK(userCreateNS(&txn, db, ns, options)); + bool shouldReplicateWrites = txn->writesAreReplicated(); + txn->setReplicatedWrites(false); + ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, txn, shouldReplicateWrites); + uassertStatusOK(userCreateNS(txn, db, ns, options)); collection = db->getCollection(ns); } invariant(collection); - uassertStatusOK(collection->insertDocument(&txn, o, false)); + uassertStatusOK(collection->insertDocument(txn, o, false)); wunit.commit(); } @@ -293,12 +293,11 @@ static unsigned long long checkIfReplMissingFromCommandLine(OperationContext* tx return 0; } -static void repairDatabasesAndCheckVersion() { +static void repairDatabasesAndCheckVersion(OperationContext* txn) { LOG(1) << "enter repairDatabases (to check pdfile version #)" << endl; - OperationContextImpl txn; - ScopedTransaction transaction(&txn, MODE_X); - Lock::GlobalWrite lk(txn.lockState()); + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite lk(txn->lockState()); vector<string> dbNames; @@ -311,7 +310,7 @@ static void repairDatabasesAndCheckVersion() { const string dbName = *i; LOG(1) << " Repairing database: " << dbName << endl; - fassert(18506, repairDatabase(&txn, storageEngine, dbName)); + fassert(18506, repairDatabase(txn, storageEngine, dbName)); } } @@ -322,19 +321,19 @@ static void repairDatabasesAndCheckVersion() { // to. The local DB is special because it is not replicated. See SERVER-10927 for more // details. const bool shouldClearNonLocalTmpCollections = - !(checkIfReplMissingFromCommandLine(&txn) || replSettings.usingReplSets() || + !(checkIfReplMissingFromCommandLine(txn) || replSettings.usingReplSets() || replSettings.slave == repl::SimpleSlave); for (vector<string>::const_iterator i = dbNames.begin(); i != dbNames.end(); ++i) { const string dbName = *i; LOG(1) << " Recovering database: " << dbName << endl; - Database* db = dbHolder().openDb(&txn, dbName); + Database* db = dbHolder().openDb(txn, dbName); invariant(db); // First thing after opening the database is to check for file compatibility, // otherwise we might crash if this is a deprecated format. - if (!db->getDatabaseCatalogEntry()->currentFilesCompatible(&txn)) { + if (!db->getDatabaseCatalogEntry()->currentFilesCompatible(txn)) { log() << "****"; log() << "cannot do this upgrade without an upgrade in the middle"; log() << "please do a --repair with 2.6 and then start this version"; @@ -347,7 +346,7 @@ static void repairDatabasesAndCheckVersion() { Collection* coll = db->getCollection(systemIndexes); unique_ptr<PlanExecutor> exec( - InternalPlanner::collectionScan(&txn, systemIndexes, coll, PlanExecutor::YIELD_MANUAL)); + InternalPlanner::collectionScan(txn, systemIndexes, coll, PlanExecutor::YIELD_MANUAL)); BSONObj index; PlanExecutor::ExecState state; @@ -355,7 +354,7 @@ static void repairDatabasesAndCheckVersion() { const BSONObj key = index.getObjectField("key"); const string plugin = IndexNames::findPluginName(key); - if (db->getDatabaseCatalogEntry()->isOlderThan24(&txn)) { + if (db->getDatabaseCatalogEntry()->isOlderThan24(txn)) { if (IndexNames::existedBefore24(plugin)) { continue; } @@ -390,11 +389,11 @@ static void repairDatabasesAndCheckVersion() { if (replSettings.usingReplSets()) { // We only care about the _id index if we are in a replset - checkForIdIndexes(&txn, db); + checkForIdIndexes(txn, db); } if (shouldClearNonLocalTmpCollections || dbName == "local") { - db->clearTmpCollections(&txn); + db->clearTmpCollections(txn); } } @@ -520,17 +519,16 @@ static void _initAndListen(int listenPort) { ScriptEngine::setup(); } - repairDatabasesAndCheckVersion(); + auto startupOpCtx = getGlobalServiceContext()->makeOperationContext(&cc()); + + repairDatabasesAndCheckVersion(startupOpCtx.get()); if (storageGlobalParams.upgrade) { log() << "finished checking dbs" << endl; exitCleanly(EXIT_CLEAN); } - { - OperationContextImpl txn; - uassertStatusOK(getGlobalAuthorizationManager()->initialize(&txn)); - } + uassertStatusOK(getGlobalAuthorizationManager()->initialize(startupOpCtx.get())); /* this is for security on certain platforms (nonce generation) */ srand((unsigned)(curTimeMicros64() ^ startupSrandTimer.micros())); @@ -546,13 +544,11 @@ static void _initAndListen(int listenPort) { } { - OperationContextImpl txn; - #ifndef _WIN32 mongo::signalForkSuccess(); #endif - Status status = authindex::verifySystemIndexes(&txn); + Status status = authindex::verifySystemIndexes(startupOpCtx.get()); if (!status.isOK()) { log() << status.reason(); exitCleanly(EXIT_NEED_UPGRADE); @@ -560,8 +556,8 @@ static void _initAndListen(int listenPort) { // SERVER-14090: Verify that auth schema version is schemaVersion26Final. int foundSchemaVersion; - status = - getGlobalAuthorizationManager()->getAuthorizationVersion(&txn, &foundSchemaVersion); + status = getGlobalAuthorizationManager()->getAuthorizationVersion(startupOpCtx.get(), + &foundSchemaVersion); if (!status.isOK()) { log() << "Auth schema version is incompatible: " << "User and role management commands require auth data to have " @@ -581,11 +577,12 @@ static void _initAndListen(int listenPort) { getDeleter()->startWorkers(); - restartInProgressIndexesFromLastShutdown(&txn); + restartInProgressIndexesFromLastShutdown(startupOpCtx.get()); - repl::getGlobalReplicationCoordinator()->startReplication(&txn); + repl::getGlobalReplicationCoordinator()->startReplication(startupOpCtx.get()); - const unsigned long long missingRepl = checkIfReplMissingFromCommandLine(&txn); + const unsigned long long missingRepl = + checkIfReplMissingFromCommandLine(startupOpCtx.get()); if (missingRepl) { log() << startupWarningsLog; log() << "** WARNING: mongod started without --replSet yet " << missingRepl @@ -608,9 +605,15 @@ static void _initAndListen(int listenPort) { startFTDC(); - logStartup(); + if (!repl::getGlobalReplicationCoordinator()->isReplEnabled()) { + uassertStatusOK(ShardingStateRecovery::recover(startupOpCtx.get())); + } + + logStartup(startupOpCtx.get()); - // MessageServer::run will return when exit code closes its socket + // MessageServer::run will return when exit code closes its socket and we don't need the + // operation context anymore + startupOpCtx.reset(); server->run(); } diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h index 2c0d448cf01..2c77bf8ca2d 100644 --- a/src/mongo/db/dbhelpers.h +++ b/src/mongo/db/dbhelpers.h @@ -28,7 +28,6 @@ #pragma once -#include <iosfwd> #include <memory> #include <boost/filesystem/path.hpp> diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 4b5364f3424..0692975b668 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -170,6 +170,14 @@ public: virtual void clearShardingState() = 0; /** + * Called when the instance transitions to primary in order to notify a potentially sharded + * host to recover its sharding state. + * + * Throws on errors. + */ + virtual void recoverShardingState(OperationContext* txn) = 0; + + /** * Notifies the bgsync and syncSourceFeedback threads to choose a new sync source. */ virtual void signalApplierToChooseNewSyncSource() = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 3aa445c029a..6e97d9c4987 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -56,6 +56,7 @@ #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/executor/network_interface.h" #include "mongo/stdx/functional.h" @@ -335,6 +336,14 @@ void ReplicationCoordinatorExternalStateImpl::clearShardingState() { ShardingState::get(getGlobalServiceContext())->clearCollectionMetadata(); } +void ReplicationCoordinatorExternalStateImpl::recoverShardingState(OperationContext* txn) { + uassertStatusOK(ShardingStateRecovery::recover(txn)); + + // There is a slight chance that some stale metadata might have been loaded before the latest + // optime has been recovered, so throw out everything that we have up to now + ShardingState::get(txn)->clearCollectionMetadata(); +} + void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() { BackgroundSync::get()->clearSyncTarget(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 8dce3dbb632..83d7a3c7d34 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -67,6 +67,7 @@ public: virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* txn); virtual void clearShardingState(); + virtual void recoverShardingState(OperationContext* txn); virtual void signalApplierToChooseNewSyncSource(); virtual void signalApplierToCancelFetcher(); virtual OperationContext* createOperationContext(const std::string& threadName); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index abf8b7ad0b1..86a37f54f18 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -190,6 +190,8 @@ void ReplicationCoordinatorExternalStateMock::killAllUserOperations(OperationCon void ReplicationCoordinatorExternalStateMock::clearShardingState() {} +void ReplicationCoordinatorExternalStateMock::recoverShardingState(OperationContext* txn) {} + void ReplicationCoordinatorExternalStateMock::signalApplierToChooseNewSyncSource() {} void ReplicationCoordinatorExternalStateMock::signalApplierToCancelFetcher() { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 58d66934e77..e6a259ca0ea 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -70,6 +70,7 @@ public: virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* txn); virtual void clearShardingState(); + virtual void recoverShardingState(OperationContext* txn); virtual void signalApplierToChooseNewSyncSource(); virtual void signalApplierToCancelFetcher(); virtual OperationContext* createOperationContext(const std::string& threadName); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a6da1e44edf..8e071e9a496 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -617,8 +617,12 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { return; } lk.unlock(); + + _externalState->recoverShardingState(txn); + ScopedTransaction transaction(txn, MODE_X); Lock::GlobalWrite globalWriteLock(txn->lockState()); + lk.lock(); if (!_isWaitingForDrainToComplete) { return; diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 71f0a17d745..59ff7c88904 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -27,6 +27,7 @@ env.Library( 'migration_source_manager.cpp', 'sharded_connection_info.cpp', 'sharding_state.cpp', + 'sharding_state_recovery.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp index 7ac6885a8f7..c84cffd98a2 100644 --- a/src/mongo/db/s/migration_impl.cpp +++ b/src/mongo/db/s/migration_impl.cpp @@ -40,6 +40,7 @@ #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/operation_shard_version.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_state_recovery.h" #include "mongo/logger/ramlog.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/type_chunk.h" @@ -272,6 +273,10 @@ Status ChunkMoveOperationState::commitMigration(OperationContext* txn) { // or CollectionMetadata state. ShardingState* const shardingState = ShardingState::get(txn); + Status startStatus = ShardingStateRecovery::startMetadataOp(txn); + if (!startStatus.isOK()) + return startStatus; + shardingState->migrationSourceManager()->setInCriticalSection(true); const ChunkVersion originalCollVersion = getCollMetadata()->getCollVersion(); @@ -526,6 +531,7 @@ Status ChunkMoveOperationState::commitMigration(OperationContext* txn) { MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeLeavingCriticalSection); shardingState->migrationSourceManager()->setInCriticalSection(false); + ShardingStateRecovery::endMetadataOp(txn); // Migration is done, just log some diagnostics information BSONObj chunkInfo = diff --git a/src/mongo/db/s/sharding_state_recovery.cpp b/src/mongo/db/s/sharding_state_recovery.cpp new file mode 100644 index 00000000000..1df5eee4777 --- /dev/null +++ b/src/mongo/db/s/sharding_state_recovery.cpp @@ -0,0 +1,297 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharding_state_recovery.h" + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/connection_string.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/ops/update_lifecycle_impl.h" +#include "mongo/db/ops/update_request.h" +#include "mongo/db/ops/update.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/write_concern.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +const char kRecoveryDocumentId[] = "minOpTimeRecovery"; +const char kConfigsvrConnString[] = "configsvrConnectionString"; +const char kShardName[] = "shardName"; +const char kMinOpTime[] = "minOpTime"; +const char kMinOpTimeUpdaters[] = "minOpTimeUpdaters"; + +const Seconds kWriteTimeout(15); +const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, + WriteConcernOptions::NONE, + kWriteTimeout); + +/** + * Encapsulates the parsing and construction of the config server min opTime recovery document. + */ +class RecoveryDocument { +public: + enum ChangeType : int8_t { Increment = 1, Decrement = -1, Clear = 0 }; + + static StatusWith<RecoveryDocument> fromBSON(const BSONObj& obj) { + RecoveryDocument recDoc; + + { + std::string configsvrString; + + Status status = bsonExtractStringField(obj, kConfigsvrConnString, &configsvrString); + if (!status.isOK()) + return status; + + auto configsvrStatus = ConnectionString::parse(configsvrString); + if (!configsvrStatus.isOK()) + return configsvrStatus.getStatus(); + + recDoc._configsvr = std::move(configsvrStatus.getValue()); + } + + Status status = bsonExtractStringField(obj, kShardName, &recDoc._shardName); + if (!status.isOK()) + return status; + + status = bsonExtractOpTimeField(obj, kMinOpTime, &recDoc._minOpTime); + if (!status.isOK()) + return status; + + status = bsonExtractIntegerField(obj, kMinOpTimeUpdaters, &recDoc._minOpTimeUpdaters); + if (!status.isOK()) + return status; + + return recDoc; + } + + static BSONObj createChangeObj(ConnectionString configsvr, + std::string shardName, + repl::OpTime minOpTime, + ChangeType change) { + BSONObjBuilder cmdBuilder; + + { + BSONObjBuilder setBuilder(cmdBuilder.subobjStart("$set")); + setBuilder.append(kConfigsvrConnString, configsvr.toString()); + setBuilder.append(kShardName, shardName); + minOpTime.append(&setBuilder, kMinOpTime); + } + + if (change == Clear) { + cmdBuilder.append("$set", BSON(kMinOpTimeUpdaters << 0)); + } else { + cmdBuilder.append("$inc", BSON(kMinOpTimeUpdaters << change)); + } + + return cmdBuilder.obj(); + } + + static BSONObj getQuery() { + return BSON("_id" << kRecoveryDocumentId); + } + + BSONObj toBSON() const { + BSONObjBuilder builder; + builder.append("_id", kRecoveryDocumentId); + builder.append(kConfigsvrConnString, _configsvr.toString()); + builder.append(kShardName, _shardName); + builder.append(kMinOpTime, _minOpTime.toBSON()); + builder.append(kMinOpTimeUpdaters, _minOpTimeUpdaters); + + return builder.obj(); + } + + ConnectionString getConfigsvr() const { + return _configsvr; + } + + std::string getShardName() const { + return _shardName; + } + + repl::OpTime getMinOpTime() const { + return _minOpTime; + } + + int64_t getMinOpTimeUpdaters() const { + return _minOpTimeUpdaters; + } + +private: + RecoveryDocument() = default; + + ConnectionString _configsvr; + std::string _shardName; + repl::OpTime _minOpTime; + long long _minOpTimeUpdaters; +}; + +/** + * This method is the main entry point for updating the sharding state recovery document. The goal + * it has is to always move the opTime foward for a currently running server. It achieves this by + * serializing the modify calls and reading the current opTime under X-lock on the admin database. + */ +Status modifyRecoveryDocument(OperationContext* txn, + RecoveryDocument::ChangeType change, + const WriteConcernOptions& writeConcern) { + try { + AutoGetOrCreateDb autoGetOrCreateDb( + txn, NamespaceString::kConfigCollectionNamespace.db(), MODE_X); + + BSONObj updateObj = RecoveryDocument::createChangeObj( + grid.shardRegistry()->getConfigServerConnectionString(), + ShardingState::get(txn)->getShardName(), + grid.shardRegistry()->getConfigOpTime(), + change); + + LOG(1) << "Changing sharding recovery document " << updateObj; + + OpDebug opDebug; + UpdateRequest updateReq(NamespaceString::kConfigCollectionNamespace); + updateReq.setQuery(RecoveryDocument::getQuery()); + updateReq.setUpdates(updateObj); + updateReq.setUpsert(); + UpdateLifecycleImpl updateLifecycle(true, NamespaceString::kConfigCollectionNamespace); + updateReq.setLifecycle(&updateLifecycle); + + UpdateResult result = update(txn, autoGetOrCreateDb.getDb(), updateReq, &opDebug); + invariant(result.numDocsModified == 1); + invariant(result.numMatched <= 1); + + // Wait until the majority write concern has been satisfied + WriteConcernResult writeConcernResult; + return waitForWriteConcern(txn, + repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(), + writeConcern, + &writeConcernResult); + } catch (const DBException& ex) { + return ex.toStatus(); + } +} + +} // namespace + +Status ShardingStateRecovery::startMetadataOp(OperationContext* txn) { + Status upsertStatus = + modifyRecoveryDocument(txn, RecoveryDocument::Increment, kMajorityWriteConcern); + + if (upsertStatus == ErrorCodes::WriteConcernFailed) { + // Couldn't wait for the replication to complete, but the local write was performed. Clear + // it up fast (without any waiting for journal or replication) and still treat it as + // failure. + modifyRecoveryDocument(txn, RecoveryDocument::Decrement, WriteConcernOptions()); + } + + return upsertStatus; +} + +void ShardingStateRecovery::endMetadataOp(OperationContext* txn) { + Status status = modifyRecoveryDocument(txn, RecoveryDocument::Decrement, WriteConcernOptions()); + if (!status.isOK()) { + warning() << "Failed to decrement minOpTimeUpdaters due to " << status; + } +} + +Status ShardingStateRecovery::recover(OperationContext* txn) { + BSONObj recoveryDocBSON; + + try { + AutoGetCollection autoColl(txn, NamespaceString::kConfigCollectionNamespace, MODE_IS); + if (!Helpers::findOne( + txn, autoColl.getCollection(), RecoveryDocument::getQuery(), recoveryDocBSON)) { + return Status::OK(); + } + } catch (const DBException& ex) { + return ex.toStatus(); + } + + const auto recoveryDocStatus = RecoveryDocument::fromBSON(recoveryDocBSON); + if (!recoveryDocStatus.isOK()) + return recoveryDocStatus.getStatus(); + + const auto recoveryDoc = std::move(recoveryDocStatus.getValue()); + + log() << "Sharding state recovery process found document " << recoveryDoc.toBSON(); + + // Make sure the sharding state is initialized + ShardingState* const shardingState = ShardingState::get(txn); + + shardingState->initialize(txn, recoveryDoc.getConfigsvr().toString()); + shardingState->setShardName(recoveryDoc.getShardName()); + + if (!recoveryDoc.getMinOpTimeUpdaters()) { + // Treat the minOpTime as up-to-date + grid.shardRegistry()->advanceConfigOpTime(recoveryDoc.getMinOpTime()); + return Status::OK(); + } + + log() << "Sharding state recovery document indicates there were " + << recoveryDoc.getMinOpTimeUpdaters() + << " metadata change operations in flight. Contacting the config server primary in order " + "to retrieve the most recent opTime."; + + // Need to fetch the latest uptime from the config server, so do a logging write + Status status = + grid.catalogManager(txn)->logChange(txn, + "Sharding recovery thread", + "Sharding minOpTime recovery", + NamespaceString::kConfigCollectionNamespace.ns(), + recoveryDocBSON); + if (!status.isOK()) + return status; + + log() << "Sharding state recovered. New config server opTime is " + << grid.shardRegistry()->getConfigOpTime(); + + // Finally, clear the recovery document so next time we don't need to recover + status = modifyRecoveryDocument(txn, RecoveryDocument::Clear, kMajorityWriteConcern); + if (!status.isOK()) { + warning() << "Failed to reset sharding state recovery document due to " << status; + } + + return Status::OK(); +} + + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_state_recovery.h b/src/mongo/db/s/sharding_state_recovery.h new file mode 100644 index 00000000000..5cf38615385 --- /dev/null +++ b/src/mongo/db/s/sharding_state_recovery.h @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +namespace mongo { + +class OperationContext; +class Status; + +/** + * Manages the persistence and recovery of the sharding config metadata's min opTime. + * + * The opTime recovery document resides in the admin.system.version collection and has the + * following format: + * + * { _id: "minOpTimeRecovery", + * configsvrConnectionString: "config/server1:10000,server2:10001,server3:10002", + * shardName: "shard0000", + * minOpTime: { ts: Timestamp 1443820968000|1, t: 11 }, + * minOptimeUpdaters: 1 } + */ +class ShardingStateRecovery { +public: + /** + * Marks the beginning of a sharding metadata operation which requires recovery of the config + * server's minOpTime after node failure. It is only safe to commence the operation after this + * method returns an OK status. + */ + static Status startMetadataOp(OperationContext* txn); + + /** + * Marks the end of a sharding metadata operation, persisting the latest config server opTime at + * the time of the call. + */ + static void endMetadataOp(OperationContext* txn); + + /** + * Recovers the minimal config server opTime that the instance should be using for reading + * sharding metadata so that the instance observes all metadata modifications it did the last + * time it was active (or PRIMARY, if replica set). + * + * NOTE: This method will block until recovery completes. + * + * Returns OK if the minOpTime was successfully recovered or failure status otherwise. It is + * unsafe to read and rely on any sharding metadata before this method has returned success. + */ + static Status recover(OperationContext* txn); +}; + +} // namespace mongo diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h index 280b1c74d0e..2b5b7dcb094 100644 --- a/src/mongo/s/catalog/catalog_manager.h +++ b/src/mongo/s/catalog/catalog_manager.h @@ -341,20 +341,20 @@ public: virtual void logAction(OperationContext* txn, const ActionLogType& actionLog) = 0; /** - * Logs a diagnostic event locally and on the config server. - * - * NOTE: This method is best effort so it should never throw. + * Logs a diagnostic event locally and on the config server. If the config server write fails + * for any reason a warning will be written to the local service log and the method will return + * a failed status. * * @param clientAddress Address of the client that initiated the op that caused this change * @param what E.g. "split", "migrate" * @param ns To which collection the metadata change is being applied * @param detail Additional info about the metadata change (not interpreted) */ - virtual void logChange(OperationContext* txn, - const std::string& clientAddress, - const std::string& what, - const std::string& ns, - const BSONObj& detail) = 0; + virtual Status logChange(OperationContext* txn, + const std::string& clientAddress, + const std::string& what, + const std::string& ns, + const BSONObj& detail) = 0; /** * Returns global settings for a certain key. diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp index 879a492d6eb..ce7533eceff 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/catalog_manager_mock.cpp @@ -167,11 +167,13 @@ Status CatalogManagerMock::applyChunkOpsDeprecated(OperationContext* txn, void CatalogManagerMock::logAction(OperationContext* txn, const ActionLogType& actionLog) {} -void CatalogManagerMock::logChange(OperationContext* txn, - const string& clientAddress, - const string& what, - const string& ns, - const BSONObj& detail) {} +Status CatalogManagerMock::logChange(OperationContext* txn, + const string& clientAddress, + const string& what, + const string& ns, + const BSONObj& detail) { + return Status::OK(); +} StatusWith<SettingsType> CatalogManagerMock::getGlobalSettings(OperationContext* txn, const string& key) { diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h index 98350b8abe1..39a0d111545 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.h +++ b/src/mongo/s/catalog/catalog_manager_mock.h @@ -128,11 +128,11 @@ public: void logAction(OperationContext* txn, const ActionLogType& actionLog) override; - void logChange(OperationContext* txn, - const std::string& clientAddress, - const std::string& what, - const std::string& ns, - const BSONObj& detail) override; + Status logChange(OperationContext* txn, + const std::string& clientAddress, + const std::string& what, + const std::string& ns, + const BSONObj& detail) override; StatusWith<SettingsType> getGlobalSettings(OperationContext* txn, const std::string& key) override; diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.cpp b/src/mongo/s/catalog/forwarding_catalog_manager.cpp index b070b688743..10c464f1976 100644 --- a/src/mongo/s/catalog/forwarding_catalog_manager.cpp +++ b/src/mongo/s/catalog/forwarding_catalog_manager.cpp @@ -540,16 +540,12 @@ void ForwardingCatalogManager::logAction(OperationContext* txn, const ActionLogT }); } -void ForwardingCatalogManager::logChange(OperationContext* txn, - const std::string& clientAddress, - const std::string& what, - const std::string& ns, - const BSONObj& detail) { - retry(txn, - [&] { - _actual->logChange(txn, clientAddress, what, ns, detail); - return 1; - }); +Status ForwardingCatalogManager::logChange(OperationContext* txn, + const std::string& clientAddress, + const std::string& what, + const std::string& ns, + const BSONObj& detail) { + return retry(txn, [&] { return _actual->logChange(txn, clientAddress, what, ns, detail); }); } StatusWith<SettingsType> ForwardingCatalogManager::getGlobalSettings(OperationContext* txn, diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.h b/src/mongo/s/catalog/forwarding_catalog_manager.h index 2e8d30e481b..ed7cb2f82a1 100644 --- a/src/mongo/s/catalog/forwarding_catalog_manager.h +++ b/src/mongo/s/catalog/forwarding_catalog_manager.h @@ -200,11 +200,11 @@ private: void logAction(OperationContext* txn, const ActionLogType& actionLog) override; - void logChange(OperationContext* txn, - const std::string& clientAddress, - const std::string& what, - const std::string& ns, - const BSONObj& detail) override; + Status logChange(OperationContext* txn, + const std::string& clientAddress, + const std::string& what, + const std::string& ns, + const BSONObj& detail) override; StatusWith<SettingsType> getGlobalSettings(OperationContext* txn, const std::string& key) override; diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index 74b72e22f95..44c98e2fb65 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -700,11 +700,11 @@ void CatalogManagerLegacy::logAction(OperationContext* txn, const ActionLogType& } } -void CatalogManagerLegacy::logChange(OperationContext* txn, - const string& clientAddress, - const string& what, - const string& ns, - const BSONObj& detail) { +Status CatalogManagerLegacy::logChange(OperationContext* txn, + const string& clientAddress, + const string& what, + const string& ns, + const BSONObj& detail) { // Create the change log collection and ensure that it is capped. Wrap in try/catch, // because creating an existing collection throws. if (_changeLogCollectionCreated.load() == 0) { @@ -721,7 +721,7 @@ void CatalogManagerLegacy::logChange(OperationContext* txn, LOG(1) << "couldn't create changelog collection: " << ex; // If we couldn't create the collection don't attempt the insert otherwise we might // implicitly create the collection without it being capped. - return; + return ex.toStatus(); } } } @@ -751,6 +751,8 @@ void CatalogManagerLegacy::logChange(OperationContext* txn, warning() << "Error encountered while logging config change with ID " << changeLog.getChangeId() << ": " << result; } + + return result; } StatusWith<SettingsType> CatalogManagerLegacy::getGlobalSettings(OperationContext* txn, diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h index a9e530c5fc8..2b7f6c8548c 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h @@ -131,11 +131,11 @@ public: void logAction(OperationContext* txn, const ActionLogType& actionLog); - void logChange(OperationContext* txn, - const std::string& clientAddress, - const std::string& what, - const std::string& ns, - const BSONObj& detail) override; + Status logChange(OperationContext* txn, + const std::string& clientAddress, + const std::string& what, + const std::string& ns, + const BSONObj& detail) override; StatusWith<SettingsType> getGlobalSettings(OperationContext* txn, const std::string& key) override; diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index e4e323316d7..e4b43e8e602 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -592,11 +592,11 @@ void CatalogManagerReplicaSet::logAction(OperationContext* txn, const ActionLogT } } -void CatalogManagerReplicaSet::logChange(OperationContext* txn, - const string& clientAddress, - const string& what, - const string& ns, - const BSONObj& detail) { +Status CatalogManagerReplicaSet::logChange(OperationContext* txn, + const string& clientAddress, + const string& what, + const string& ns, + const BSONObj& detail) { if (_changeLogCollectionCreated.load() == 0) { BSONObj createCmd = BSON("create" << ChangeLogType::ConfigNS << "capped" << true << "size" << kChangeLogCollectionSize); @@ -604,7 +604,7 @@ void CatalogManagerReplicaSet::logChange(OperationContext* txn, grid.shardRegistry()->runCommandOnConfigWithNotMasterRetries("config", createCmd); if (!result.isOK()) { LOG(1) << "couldn't create changelog collection: " << causedBy(result.getStatus()); - return; + return result.getStatus(); } Status commandStatus = Command::getStatusFromCommandResult(result.getValue()); @@ -612,12 +612,12 @@ void CatalogManagerReplicaSet::logChange(OperationContext* txn, _changeLogCollectionCreated.store(1); } else { LOG(1) << "couldn't create changelog collection: " << causedBy(commandStatus); - return; + return commandStatus; } } Date_t now = grid.shardRegistry()->getExecutor()->now(); - std::string hostName = grid.shardRegistry()->getNetwork()->getHostName(); + const std::string hostName = grid.shardRegistry()->getNetwork()->getHostName(); const string changeId = str::stream() << hostName << "-" << now.toString() << "-" << OID::gen(); ChangeLogType changeLog; @@ -637,6 +637,8 @@ void CatalogManagerReplicaSet::logChange(OperationContext* txn, warning() << "Error encountered while logging config change with ID " << changeId << ": " << result; } + + return result; } StatusWith<SettingsType> CatalogManagerReplicaSet::getGlobalSettings(OperationContext* txn, diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index aeecb9a5dfd..f5a4e424441 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -125,11 +125,11 @@ public: void logAction(OperationContext* txn, const ActionLogType& actionLog) override; - void logChange(OperationContext* txn, - const std::string& clientAddress, - const std::string& what, - const std::string& ns, - const BSONObj& detail) override; + Status logChange(OperationContext* txn, + const std::string& clientAddress, + const std::string& what, + const std::string& ns, + const BSONObj& detail) override; StatusWith<SettingsType> getGlobalSettings(OperationContext* txn, const std::string& key) override; diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index 8f033427b6e..eaf0394350b 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -30,7 +30,6 @@ #include "mongo/platform/basic.h" - #include "mongo/db/audit.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" |