diff options
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/balance.cpp | 9 | ||||
-rw-r--r-- | src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp | 151 | ||||
-rw-r--r-- | src/mongo/s/catalog/legacy/catalog_manager_legacy.h | 32 | ||||
-rw-r--r-- | src/mongo/s/chunk.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/config.cpp | 129 | ||||
-rw-r--r-- | src/mongo/s/config.h | 7 | ||||
-rw-r--r-- | src/mongo/s/config_server_checker_service.cpp | 77 | ||||
-rw-r--r-- | src/mongo/s/config_server_checker_service.h | 43 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 14 |
10 files changed, 189 insertions, 281 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 39f8484ee85..1301e20fe3e 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -255,7 +255,6 @@ env.Library( 'chunk.cpp', 'chunk_manager.cpp', 'config.cpp', - 'config_server_checker_service.cpp', 'distlock.cpp', 'grid.cpp', 'shard_key_pattern.cpp', diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index a42738e512d..f0dd06271da 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -51,7 +51,6 @@ #include "mongo/s/catalog/type_settings.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/config.h" -#include "mongo/s/config_server_checker_service.h" #include "mongo/s/dist_lock_manager.h" #include "mongo/s/grid.h" #include "mongo/s/server.h" @@ -582,14 +581,6 @@ namespace mongo { continue; } - if ( !isConfigServerConsistent() ) { - conn.done(); - warning() << "Skipping balancing round because data inconsistency" - << " was detected amongst the config servers." << endl; - sleepsecs( sleepTime ); - continue; - } - const bool waitForDelete = (balancerConfig.isWaitForDeleteSet() ? balancerConfig.getWaitForDelete() : false); diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index ad31f62fbad..262d29721b7 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -64,6 +64,7 @@ #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" +#include "mongo/util/exit.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/log.h" @@ -367,12 +368,28 @@ namespace { _distLockManager = stdx::make_unique<LegacyDistLockManager>(_configServerConnectionString); _distLockManager->startUp(); + _consistentFromLastCheck.store(true); + + return Status::OK(); + } + + Status CatalogManagerLegacy::startConfigServerChecker() { + if (!_checkConfigServersConsistent()) { + return Status(ErrorCodes::IncompatibleShardingMetadata, + "Data inconsistency detected amongst config servers"); + } + + boost::thread t(stdx::bind(&CatalogManagerLegacy::_consistencyChecker, this)); + _consistencyCheckerThread.swap(t); + return Status::OK(); } void CatalogManagerLegacy::shutDown() { invariant(_distLockManager); _distLockManager->shutDown(); + + _consistencyCheckerThread.join(); } Status CatalogManagerLegacy::enableSharding(const std::string& dbName) { @@ -1198,6 +1215,15 @@ namespace { void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& request, BatchedCommandResponse* response) { + // check if config servers are consistent + if (!_isConsistentFromLastCheck()) { + toBatchError( + Status(ErrorCodes::IncompatibleShardingMetadata, + "Data inconsistency detected amongst config servers"), + response); + return; + } + // We only support batch sizes of one for config writes if (request.sizeWriteOps() != 1) { toBatchError( @@ -1305,4 +1331,129 @@ namespace { return _distLockManager.get(); } + bool CatalogManagerLegacy::_checkConfigServersConsistent(const unsigned tries) const { + if (tries <= 0) + return false; + + unsigned firstGood = 0; + int up = 0; + vector<BSONObj> res; + + // The last error we saw on a config server + string errMsg; + + for (unsigned i = 0; i < _configServers.size(); i++) { + BSONObj result; + boost::scoped_ptr<ScopedDbConnection> conn; + + try { + conn.reset(new ScopedDbConnection(_configServers[i], 30.0)); + + if (!conn->get()->runCommand("config", + BSON("dbhash" << 1 << + "collections" << BSON_ARRAY("chunks" << + "databases" << + "collections" << + "shards" << + "version")), + result)) { + + errMsg = result["errmsg"].eoo() ? "" : result["errmsg"].String(); + if (!result["assertion"].eoo()) errMsg = result["assertion"].String(); + + warning() << "couldn't check dbhash on config server " << _configServers[i] + << causedBy(result.toString()); + + result = BSONObj(); + } + else { + result = result.getOwned(); + if (up == 0) + firstGood = i; + up++; + } + conn->done(); + } + catch (const DBException& e) { + if (conn) { + conn->kill(); + } + + // We need to catch DBExceptions b/c sometimes we throw them + // instead of socket exceptions when findN fails + + errMsg = e.toString(); + warning() << " couldn't check dbhash on config server " + << _configServers[i] << causedBy(e); + } + res.push_back(result); + } + + if (_configServers.size() == 1) + return true; + + if (up == 0) { + // Use a ptr to error so if empty we won't add causedby + error() << "no config servers successfully contacted" << causedBy(&errMsg); + return false; + } + else if (up == 1) { + warning() << "only 1 config server reachable, continuing"; + return true; + } + + BSONObj base = res[firstGood]; + for (unsigned i = firstGood+1; i < res.size(); i++) { + if (res[i].isEmpty()) + continue; + + string chunksHash1 = base.getFieldDotted("collections.chunks"); + string chunksHash2 = res[i].getFieldDotted("collections.chunks"); + + string databaseHash1 = base.getFieldDotted("collections.databases"); + string databaseHash2 = res[i].getFieldDotted("collections.databases"); + + string collectionsHash1 = base.getFieldDotted("collections.collections"); + string collectionsHash2 = res[i].getFieldDotted("collections.collections"); + + string shardHash1 = base.getFieldDotted("collections.shards"); + string shardHash2 = res[i].getFieldDotted("collections.shards"); + + string versionHash1 = base.getFieldDotted("collections.version") ; + string versionHash2 = res[i].getFieldDotted("collections.version"); + + if (chunksHash1 == chunksHash2 && + databaseHash1 == databaseHash2 && + collectionsHash1 == collectionsHash2 && + shardHash1 == shardHash2 && + versionHash1 == versionHash2) { + continue; + } + + warning() << "config servers " << _configServers[firstGood].toString() + << " and " << _configServers[i].toString() << " differ"; + if (tries <= 1) { + error() << ": " << base["collections"].Obj() + << " vs " << res[i]["collections"].Obj(); + return false; + } + + return _checkConfigServersConsistent(tries - 1); + } + + return true; + } + + void CatalogManagerLegacy::_consistencyChecker() { + while (!inShutdown()) { + _consistentFromLastCheck.store(_checkConfigServersConsistent()); + + sleepsecs(60); + } + } + + bool CatalogManagerLegacy::_isConsistentFromLastCheck() { + return _consistentFromLastCheck.loadRelaxed(); + } + } // namespace mongo diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h index b30cab8f20b..93066b94076 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h @@ -28,11 +28,13 @@ #pragma once +#include <boost/thread/thread.hpp> #include <string> #include <vector> #include "mongo/bson/bsonobj.h" #include "mongo/client/dbclientinterface.h" +#include "mongo/platform/atomic_word.h" #include "mongo/s/catalog/catalog_manager.h" namespace mongo { @@ -54,6 +56,12 @@ namespace mongo { */ Status init(const ConnectionString& configCS); + /** + * Starts the thread that periodically checks data consistency amongst the config servers. + * Note: this is not thread safe and can only be called once for the lifetime. + */ + Status startConfigServerChecker(); + virtual void shutDown() override; virtual Status enableSharding(const std::string& dbName); @@ -135,6 +143,22 @@ namespace mongo { */ size_t _getShardCount(const BSONObj& query = {}) const; + /** + * Returns true if all config servers have the same state. + * If inconsistency detected on first attempt, checks at most 3 more times. + */ + bool _checkConfigServersConsistent(const unsigned tries = 4) const; + + /** + * Checks data consistency amongst config servers every 60 seconds. + */ + void _consistencyChecker(); + + /** + * Returns true if the config servers have the same contents since the last + * check was performed. + */ + bool _isConsistentFromLastCheck(); // Parsed config server hosts, as specified on the command line. ConnectionString _configServerConnectionString; @@ -142,6 +166,14 @@ namespace mongo { // Distribted lock manager singleton. std::unique_ptr<DistLockManager> _distLockManager; + + // used by consistency checker thread to check if config + // servers are consistent + AtomicWord<bool> _consistentFromLastCheck; + + // Thread that runs dbHash on config servers for checking data consistency. + boost::thread _consistencyCheckerThread; + }; } // namespace mongo diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 8fe331f9c26..be18e3089a0 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -49,7 +49,6 @@ #include "mongo/s/catalog/type_settings.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/config.h" -#include "mongo/s/config_server_checker_service.h" #include "mongo/s/cursors.h" #include "mongo/s/grid.h" #include "mongo/util/concurrency/ticketholder.h" @@ -557,12 +556,6 @@ namespace { // this was implicit before since we did a splitVector on the same socket ShardConnection::sync(); - if ( !isConfigServerConsistent() ) { - RARELY warning() << "will not perform auto-split because " - << "config servers are inconsistent"; - return false; - } - LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold; BSONObj res; diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp index c5d4742095e..fa72bf41572 100644 --- a/src/mongo/s/config.cpp +++ b/src/mongo/s/config.cpp @@ -750,135 +750,6 @@ namespace mongo { return true; } - bool ConfigServer::checkConfigServersConsistent( string& errmsg , int tries ) const { - if ( tries <= 0 ) - return false; - - unsigned firstGood = 0; - int up = 0; - vector<BSONObj> res; - // The last error we saw on a config server - string error; - for ( unsigned i=0; i<_config.size(); i++ ) { - BSONObj result; - - scoped_ptr<ScopedDbConnection> conn; - - try { - conn.reset( new ScopedDbConnection( _config[i], 30.0 ) ); - - if ( ! conn->get()->runCommand( "config", - BSON( "dbhash" << 1 << - "collections" << BSON_ARRAY( "chunks" << - "databases" << - "collections" << - "shards" << - "version" )), - result ) ) { - - // TODO: Make this a helper - error = result["errmsg"].eoo() ? "" : result["errmsg"].String(); - if (!result["assertion"].eoo()) error = result["assertion"].String(); - - warning() << "couldn't check dbhash on config server " << _config[i] - << causedBy(result.toString()) << endl; - - result = BSONObj(); - } - else { - result = result.getOwned(); - if ( up == 0 ) - firstGood = i; - up++; - } - conn->done(); - } - catch ( const DBException& e ) { - if (conn) { - conn->kill(); - } - - // We need to catch DBExceptions b/c sometimes we throw them - // instead of socket exceptions when findN fails - - error = e.toString(); - warning() << " couldn't check dbhash on config server " << _config[i] << causedBy(e) << endl; - } - res.push_back(result); - } - - if ( _config.size() == 1 ) - return true; - - if ( up == 0 ) { - // Use a ptr to error so if empty we won't add causedby - errmsg = str::stream() << "no config servers successfully contacted" << causedBy(&error); - return false; - } - - if ( up == 1 ) { - warning() << "only 1 config server reachable, continuing" << endl; - return true; - } - - BSONObj base = res[firstGood]; - for ( unsigned i=firstGood+1; i<res.size(); i++ ) { - if ( res[i].isEmpty() ) - continue; - - string chunksHash1 = base.getFieldDotted( "collections.chunks" ); - string chunksHash2 = res[i].getFieldDotted( "collections.chunks" ); - - string databaseHash1 = base.getFieldDotted( "collections.databases" ); - string databaseHash2 = res[i].getFieldDotted( "collections.databases" ); - - string collectionsHash1 = base.getFieldDotted( "collections.collections" ); - string collectionsHash2 = res[i].getFieldDotted( "collections.collections" ); - - string shardHash1 = base.getFieldDotted( "collections.shards" ); - string shardHash2 = res[i].getFieldDotted( "collections.shards" ); - - string versionHash1 = base.getFieldDotted( "collections.version" ); - string versionHash2 = res[i].getFieldDotted( "collections.version" ); - - if ( chunksHash1 == chunksHash2 && - databaseHash1 == databaseHash2 && - collectionsHash1 == collectionsHash2 && - shardHash1 == shardHash2 && - versionHash1 == versionHash2 ) { - continue; - } - - stringstream ss; - ss << "config servers " << _config[firstGood] << " and " << _config[i] << " differ"; - warning() << ss.str() << endl; - if ( tries <= 1 ) { - ss << ": " << base["collections"].Obj() << " vs " << res[i]["collections"].Obj(); - errmsg = ss.str(); - return false; - } - - return checkConfigServersConsistent( errmsg , tries - 1 ); - } - - return true; - } - - bool ConfigServer::ok( bool checkConsistency ) { - if ( ! _primary.ok() ) - return false; - - if ( checkConsistency ) { - string errmsg; - if ( ! checkConfigServersConsistent( errmsg ) ) { - error() << "could not verify that config servers are in sync" << causedBy(errmsg) << warnings; - return false; - } - } - - return true; - } - int ConfigServer::dbConfigVersion() { ScopedDbConnection conn(_primary.getConnString(), 30.0); int version = dbConfigVersion( conn.conn() ); diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h index d451354031c..1bbb1d2dd41 100644 --- a/src/mongo/s/config.h +++ b/src/mongo/s/config.h @@ -215,13 +215,6 @@ namespace mongo { static int VERSION; - - /** - * check to see if all config servers have the same state - * will try tries time to make sure not catching in a bad state - */ - bool checkConfigServersConsistent( std::string& errmsg , int tries = 4 ) const; - private: std::string getHost( const std::string& name , bool withPort ); diff --git a/src/mongo/s/config_server_checker_service.cpp b/src/mongo/s/config_server_checker_service.cpp deleted file mode 100644 index 92ac1062faf..00000000000 --- a/src/mongo/s/config_server_checker_service.cpp +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Copyright (C) 2013 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/s/config_server_checker_service.h" - -#include <boost/scoped_ptr.hpp> -#include <boost/thread/thread.hpp> - -#include "mongo/s/config.h" -#include "mongo/util/concurrency/mutex.h" -#include "mongo/util/exit.h" - -namespace mongo { - - namespace { - - // Thread that runs dbHash on config servers for checking data consistency. - boost::scoped_ptr<boost::thread> _checkerThread; - - // Protects _isConsistentFromLastCheck. - mutex _isConsistentMutex; - bool _isConsistentFromLastCheck = true; - - void checkConfigConsistency() { - while ( !inShutdown() ) { - bool isConsistent = configServer.ok( true ); - - { - boost::lock_guard<boost::mutex> sl( _isConsistentMutex ); - _isConsistentFromLastCheck = isConsistent; - } - - sleepsecs( 60 ); - } - } - } - - bool isConfigServerConsistent() { - boost::lock_guard<boost::mutex> sl( _isConsistentMutex ); - return _isConsistentFromLastCheck; - } - - bool startConfigServerChecker() { - if ( _checkerThread == NULL ) { - _checkerThread.reset( new boost::thread( checkConfigConsistency )); - } - - return _checkerThread != NULL; - } -} diff --git a/src/mongo/s/config_server_checker_service.h b/src/mongo/s/config_server_checker_service.h deleted file mode 100644 index 78ae7654aa9..00000000000 --- a/src/mongo/s/config_server_checker_service.h +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2013 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -namespace mongo { - - /** - * Returns true if the config servers have the same contents since the last check - * was performed. Currently checks only the config.chunks and config.databases. - */ - bool isConfigServerConsistent(); - - /** - * Starts the thread that periodically checks data consistency amongst the config servers. - * Note: this is not thread safe. - */ - bool startConfigServerChecker(); -} - diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index f3de417fc2e..15cc22759ca 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -63,7 +63,6 @@ #include "mongo/s/client/sharding_connection_hook.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/config.h" -#include "mongo/s/config_server_checker_service.h" #include "mongo/s/cursors.h" #include "mongo/s/grid.h" #include "mongo/s/legacy_dist_lock_manager.h" @@ -240,6 +239,12 @@ static ExitCode runMongosServer( bool doUpgrade ) { return EXIT_SHARDING_ERROR; } + Status statusConfigChecker = catalogManager->startConfigServerChecker(); + if (!statusConfigChecker.isOK()) { + mongo::log(LogComponent::kSharding) << "unable to start config servers checker thread " + << statusConfigChecker;; + return EXIT_SHARDING_ERROR; + } grid.setCatalogManager(std::move(catalogManager)); if (!configServer.init(mongosGlobalParams.configdbs)) { @@ -247,13 +252,6 @@ static ExitCode runMongosServer( bool doUpgrade ) { return EXIT_SHARDING_ERROR; } - if (!configServer.ok(true)) { - mongo::log(LogComponent::kSharding) << "configServer connection startup check failed" << endl; - return EXIT_SHARDING_ERROR; - } - - startConfigServerChecker(); - VersionType initVersionInfo; VersionType versionInfo; string errMsg; |