summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/balance.cpp9
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp151
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.h32
-rw-r--r--src/mongo/s/chunk.cpp7
-rw-r--r--src/mongo/s/config.cpp129
-rw-r--r--src/mongo/s/config.h7
-rw-r--r--src/mongo/s/config_server_checker_service.cpp77
-rw-r--r--src/mongo/s/config_server_checker_service.h43
-rw-r--r--src/mongo/s/server.cpp14
10 files changed, 189 insertions, 281 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 39f8484ee85..1301e20fe3e 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -255,7 +255,6 @@ env.Library(
'chunk.cpp',
'chunk_manager.cpp',
'config.cpp',
- 'config_server_checker_service.cpp',
'distlock.cpp',
'grid.cpp',
'shard_key_pattern.cpp',
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp
index a42738e512d..f0dd06271da 100644
--- a/src/mongo/s/balance.cpp
+++ b/src/mongo/s/balance.cpp
@@ -51,7 +51,6 @@
#include "mongo/s/catalog/type_settings.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/config.h"
-#include "mongo/s/config_server_checker_service.h"
#include "mongo/s/dist_lock_manager.h"
#include "mongo/s/grid.h"
#include "mongo/s/server.h"
@@ -582,14 +581,6 @@ namespace mongo {
continue;
}
- if ( !isConfigServerConsistent() ) {
- conn.done();
- warning() << "Skipping balancing round because data inconsistency"
- << " was detected amongst the config servers." << endl;
- sleepsecs( sleepTime );
- continue;
- }
-
const bool waitForDelete = (balancerConfig.isWaitForDeleteSet() ?
balancerConfig.getWaitForDelete() : false);
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
index ad31f62fbad..262d29721b7 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
@@ -64,6 +64,7 @@
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/memory.h"
+#include "mongo/util/exit.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/log.h"
@@ -367,12 +368,28 @@ namespace {
_distLockManager = stdx::make_unique<LegacyDistLockManager>(_configServerConnectionString);
_distLockManager->startUp();
+ _consistentFromLastCheck.store(true);
+
+ return Status::OK();
+ }
+
+ Status CatalogManagerLegacy::startConfigServerChecker() {
+ if (!_checkConfigServersConsistent()) {
+ return Status(ErrorCodes::IncompatibleShardingMetadata,
+ "Data inconsistency detected amongst config servers");
+ }
+
+ boost::thread t(stdx::bind(&CatalogManagerLegacy::_consistencyChecker, this));
+ _consistencyCheckerThread.swap(t);
+
return Status::OK();
}
void CatalogManagerLegacy::shutDown() {
invariant(_distLockManager);
_distLockManager->shutDown();
+
+ _consistencyCheckerThread.join();
}
Status CatalogManagerLegacy::enableSharding(const std::string& dbName) {
@@ -1198,6 +1215,15 @@ namespace {
void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& request,
BatchedCommandResponse* response) {
+ // check if config servers are consistent
+ if (!_isConsistentFromLastCheck()) {
+ toBatchError(
+ Status(ErrorCodes::IncompatibleShardingMetadata,
+ "Data inconsistency detected amongst config servers"),
+ response);
+ return;
+ }
+
// We only support batch sizes of one for config writes
if (request.sizeWriteOps() != 1) {
toBatchError(
@@ -1305,4 +1331,129 @@ namespace {
return _distLockManager.get();
}
+ bool CatalogManagerLegacy::_checkConfigServersConsistent(const unsigned tries) const {
+ if (tries <= 0)
+ return false;
+
+ unsigned firstGood = 0;
+ int up = 0;
+ vector<BSONObj> res;
+
+ // The last error we saw on a config server
+ string errMsg;
+
+ for (unsigned i = 0; i < _configServers.size(); i++) {
+ BSONObj result;
+ boost::scoped_ptr<ScopedDbConnection> conn;
+
+ try {
+ conn.reset(new ScopedDbConnection(_configServers[i], 30.0));
+
+ if (!conn->get()->runCommand("config",
+ BSON("dbhash" << 1 <<
+ "collections" << BSON_ARRAY("chunks" <<
+ "databases" <<
+ "collections" <<
+ "shards" <<
+ "version")),
+ result)) {
+
+ errMsg = result["errmsg"].eoo() ? "" : result["errmsg"].String();
+ if (!result["assertion"].eoo()) errMsg = result["assertion"].String();
+
+ warning() << "couldn't check dbhash on config server " << _configServers[i]
+ << causedBy(result.toString());
+
+ result = BSONObj();
+ }
+ else {
+ result = result.getOwned();
+ if (up == 0)
+ firstGood = i;
+ up++;
+ }
+ conn->done();
+ }
+ catch (const DBException& e) {
+ if (conn) {
+ conn->kill();
+ }
+
+ // We need to catch DBExceptions b/c sometimes we throw them
+ // instead of socket exceptions when findN fails
+
+ errMsg = e.toString();
+ warning() << " couldn't check dbhash on config server "
+ << _configServers[i] << causedBy(e);
+ }
+ res.push_back(result);
+ }
+
+ if (_configServers.size() == 1)
+ return true;
+
+ if (up == 0) {
+ // Use a ptr to error so if empty we won't add causedby
+ error() << "no config servers successfully contacted" << causedBy(&errMsg);
+ return false;
+ }
+ else if (up == 1) {
+ warning() << "only 1 config server reachable, continuing";
+ return true;
+ }
+
+ BSONObj base = res[firstGood];
+ for (unsigned i = firstGood+1; i < res.size(); i++) {
+ if (res[i].isEmpty())
+ continue;
+
+ string chunksHash1 = base.getFieldDotted("collections.chunks");
+ string chunksHash2 = res[i].getFieldDotted("collections.chunks");
+
+ string databaseHash1 = base.getFieldDotted("collections.databases");
+ string databaseHash2 = res[i].getFieldDotted("collections.databases");
+
+ string collectionsHash1 = base.getFieldDotted("collections.collections");
+ string collectionsHash2 = res[i].getFieldDotted("collections.collections");
+
+ string shardHash1 = base.getFieldDotted("collections.shards");
+ string shardHash2 = res[i].getFieldDotted("collections.shards");
+
+ string versionHash1 = base.getFieldDotted("collections.version") ;
+ string versionHash2 = res[i].getFieldDotted("collections.version");
+
+ if (chunksHash1 == chunksHash2 &&
+ databaseHash1 == databaseHash2 &&
+ collectionsHash1 == collectionsHash2 &&
+ shardHash1 == shardHash2 &&
+ versionHash1 == versionHash2) {
+ continue;
+ }
+
+ warning() << "config servers " << _configServers[firstGood].toString()
+ << " and " << _configServers[i].toString() << " differ";
+ if (tries <= 1) {
+ error() << ": " << base["collections"].Obj()
+ << " vs " << res[i]["collections"].Obj();
+ return false;
+ }
+
+ return _checkConfigServersConsistent(tries - 1);
+ }
+
+ return true;
+ }
+
+ void CatalogManagerLegacy::_consistencyChecker() {
+ while (!inShutdown()) {
+ _consistentFromLastCheck.store(_checkConfigServersConsistent());
+
+ sleepsecs(60);
+ }
+ }
+
+ bool CatalogManagerLegacy::_isConsistentFromLastCheck() {
+ return _consistentFromLastCheck.loadRelaxed();
+ }
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
index b30cab8f20b..93066b94076 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
@@ -28,11 +28,13 @@
#pragma once
+#include <boost/thread/thread.hpp>
#include <string>
#include <vector>
#include "mongo/bson/bsonobj.h"
#include "mongo/client/dbclientinterface.h"
+#include "mongo/platform/atomic_word.h"
#include "mongo/s/catalog/catalog_manager.h"
namespace mongo {
@@ -54,6 +56,12 @@ namespace mongo {
*/
Status init(const ConnectionString& configCS);
+ /**
+ * Starts the thread that periodically checks data consistency amongst the config servers.
+ * Note: this is not thread safe and can only be called once for the lifetime.
+ */
+ Status startConfigServerChecker();
+
virtual void shutDown() override;
virtual Status enableSharding(const std::string& dbName);
@@ -135,6 +143,22 @@ namespace mongo {
*/
size_t _getShardCount(const BSONObj& query = {}) const;
+ /**
+ * Returns true if all config servers have the same state.
+ * If inconsistency detected on first attempt, checks at most 3 more times.
+ */
+ bool _checkConfigServersConsistent(const unsigned tries = 4) const;
+
+ /**
+ * Checks data consistency amongst config servers every 60 seconds.
+ */
+ void _consistencyChecker();
+
+ /**
+ * Returns true if the config servers have the same contents since the last
+ * check was performed.
+ */
+ bool _isConsistentFromLastCheck();
// Parsed config server hosts, as specified on the command line.
ConnectionString _configServerConnectionString;
@@ -142,6 +166,14 @@ namespace mongo {
// Distribted lock manager singleton.
std::unique_ptr<DistLockManager> _distLockManager;
+
+ // used by consistency checker thread to check if config
+ // servers are consistent
+ AtomicWord<bool> _consistentFromLastCheck;
+
+ // Thread that runs dbHash on config servers for checking data consistency.
+ boost::thread _consistencyCheckerThread;
+
};
} // namespace mongo
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 8fe331f9c26..be18e3089a0 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -49,7 +49,6 @@
#include "mongo/s/catalog/type_settings.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/config.h"
-#include "mongo/s/config_server_checker_service.h"
#include "mongo/s/cursors.h"
#include "mongo/s/grid.h"
#include "mongo/util/concurrency/ticketholder.h"
@@ -557,12 +556,6 @@ namespace {
// this was implicit before since we did a splitVector on the same socket
ShardConnection::sync();
- if ( !isConfigServerConsistent() ) {
- RARELY warning() << "will not perform auto-split because "
- << "config servers are inconsistent";
- return false;
- }
-
LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold;
BSONObj res;
diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp
index c5d4742095e..fa72bf41572 100644
--- a/src/mongo/s/config.cpp
+++ b/src/mongo/s/config.cpp
@@ -750,135 +750,6 @@ namespace mongo {
return true;
}
- bool ConfigServer::checkConfigServersConsistent( string& errmsg , int tries ) const {
- if ( tries <= 0 )
- return false;
-
- unsigned firstGood = 0;
- int up = 0;
- vector<BSONObj> res;
- // The last error we saw on a config server
- string error;
- for ( unsigned i=0; i<_config.size(); i++ ) {
- BSONObj result;
-
- scoped_ptr<ScopedDbConnection> conn;
-
- try {
- conn.reset( new ScopedDbConnection( _config[i], 30.0 ) );
-
- if ( ! conn->get()->runCommand( "config",
- BSON( "dbhash" << 1 <<
- "collections" << BSON_ARRAY( "chunks" <<
- "databases" <<
- "collections" <<
- "shards" <<
- "version" )),
- result ) ) {
-
- // TODO: Make this a helper
- error = result["errmsg"].eoo() ? "" : result["errmsg"].String();
- if (!result["assertion"].eoo()) error = result["assertion"].String();
-
- warning() << "couldn't check dbhash on config server " << _config[i]
- << causedBy(result.toString()) << endl;
-
- result = BSONObj();
- }
- else {
- result = result.getOwned();
- if ( up == 0 )
- firstGood = i;
- up++;
- }
- conn->done();
- }
- catch ( const DBException& e ) {
- if (conn) {
- conn->kill();
- }
-
- // We need to catch DBExceptions b/c sometimes we throw them
- // instead of socket exceptions when findN fails
-
- error = e.toString();
- warning() << " couldn't check dbhash on config server " << _config[i] << causedBy(e) << endl;
- }
- res.push_back(result);
- }
-
- if ( _config.size() == 1 )
- return true;
-
- if ( up == 0 ) {
- // Use a ptr to error so if empty we won't add causedby
- errmsg = str::stream() << "no config servers successfully contacted" << causedBy(&error);
- return false;
- }
-
- if ( up == 1 ) {
- warning() << "only 1 config server reachable, continuing" << endl;
- return true;
- }
-
- BSONObj base = res[firstGood];
- for ( unsigned i=firstGood+1; i<res.size(); i++ ) {
- if ( res[i].isEmpty() )
- continue;
-
- string chunksHash1 = base.getFieldDotted( "collections.chunks" );
- string chunksHash2 = res[i].getFieldDotted( "collections.chunks" );
-
- string databaseHash1 = base.getFieldDotted( "collections.databases" );
- string databaseHash2 = res[i].getFieldDotted( "collections.databases" );
-
- string collectionsHash1 = base.getFieldDotted( "collections.collections" );
- string collectionsHash2 = res[i].getFieldDotted( "collections.collections" );
-
- string shardHash1 = base.getFieldDotted( "collections.shards" );
- string shardHash2 = res[i].getFieldDotted( "collections.shards" );
-
- string versionHash1 = base.getFieldDotted( "collections.version" );
- string versionHash2 = res[i].getFieldDotted( "collections.version" );
-
- if ( chunksHash1 == chunksHash2 &&
- databaseHash1 == databaseHash2 &&
- collectionsHash1 == collectionsHash2 &&
- shardHash1 == shardHash2 &&
- versionHash1 == versionHash2 ) {
- continue;
- }
-
- stringstream ss;
- ss << "config servers " << _config[firstGood] << " and " << _config[i] << " differ";
- warning() << ss.str() << endl;
- if ( tries <= 1 ) {
- ss << ": " << base["collections"].Obj() << " vs " << res[i]["collections"].Obj();
- errmsg = ss.str();
- return false;
- }
-
- return checkConfigServersConsistent( errmsg , tries - 1 );
- }
-
- return true;
- }
-
- bool ConfigServer::ok( bool checkConsistency ) {
- if ( ! _primary.ok() )
- return false;
-
- if ( checkConsistency ) {
- string errmsg;
- if ( ! checkConfigServersConsistent( errmsg ) ) {
- error() << "could not verify that config servers are in sync" << causedBy(errmsg) << warnings;
- return false;
- }
- }
-
- return true;
- }
-
int ConfigServer::dbConfigVersion() {
ScopedDbConnection conn(_primary.getConnString(), 30.0);
int version = dbConfigVersion( conn.conn() );
diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h
index d451354031c..1bbb1d2dd41 100644
--- a/src/mongo/s/config.h
+++ b/src/mongo/s/config.h
@@ -215,13 +215,6 @@ namespace mongo {
static int VERSION;
-
- /**
- * check to see if all config servers have the same state
- * will try tries time to make sure not catching in a bad state
- */
- bool checkConfigServersConsistent( std::string& errmsg , int tries = 4 ) const;
-
private:
std::string getHost( const std::string& name , bool withPort );
diff --git a/src/mongo/s/config_server_checker_service.cpp b/src/mongo/s/config_server_checker_service.cpp
deleted file mode 100644
index 92ac1062faf..00000000000
--- a/src/mongo/s/config_server_checker_service.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Copyright (C) 2013 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/config_server_checker_service.h"
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread/thread.hpp>
-
-#include "mongo/s/config.h"
-#include "mongo/util/concurrency/mutex.h"
-#include "mongo/util/exit.h"
-
-namespace mongo {
-
- namespace {
-
- // Thread that runs dbHash on config servers for checking data consistency.
- boost::scoped_ptr<boost::thread> _checkerThread;
-
- // Protects _isConsistentFromLastCheck.
- mutex _isConsistentMutex;
- bool _isConsistentFromLastCheck = true;
-
- void checkConfigConsistency() {
- while ( !inShutdown() ) {
- bool isConsistent = configServer.ok( true );
-
- {
- boost::lock_guard<boost::mutex> sl( _isConsistentMutex );
- _isConsistentFromLastCheck = isConsistent;
- }
-
- sleepsecs( 60 );
- }
- }
- }
-
- bool isConfigServerConsistent() {
- boost::lock_guard<boost::mutex> sl( _isConsistentMutex );
- return _isConsistentFromLastCheck;
- }
-
- bool startConfigServerChecker() {
- if ( _checkerThread == NULL ) {
- _checkerThread.reset( new boost::thread( checkConfigConsistency ));
- }
-
- return _checkerThread != NULL;
- }
-}
diff --git a/src/mongo/s/config_server_checker_service.h b/src/mongo/s/config_server_checker_service.h
deleted file mode 100644
index 78ae7654aa9..00000000000
--- a/src/mongo/s/config_server_checker_service.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Copyright (C) 2013 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-namespace mongo {
-
- /**
- * Returns true if the config servers have the same contents since the last check
- * was performed. Currently checks only the config.chunks and config.databases.
- */
- bool isConfigServerConsistent();
-
- /**
- * Starts the thread that periodically checks data consistency amongst the config servers.
- * Note: this is not thread safe.
- */
- bool startConfigServerChecker();
-}
-
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index f3de417fc2e..15cc22759ca 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -63,7 +63,6 @@
#include "mongo/s/client/sharding_connection_hook.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/config.h"
-#include "mongo/s/config_server_checker_service.h"
#include "mongo/s/cursors.h"
#include "mongo/s/grid.h"
#include "mongo/s/legacy_dist_lock_manager.h"
@@ -240,6 +239,12 @@ static ExitCode runMongosServer( bool doUpgrade ) {
return EXIT_SHARDING_ERROR;
}
+ Status statusConfigChecker = catalogManager->startConfigServerChecker();
+ if (!statusConfigChecker.isOK()) {
+ mongo::log(LogComponent::kSharding) << "unable to start config servers checker thread "
+ << statusConfigChecker;;
+ return EXIT_SHARDING_ERROR;
+ }
grid.setCatalogManager(std::move(catalogManager));
if (!configServer.init(mongosGlobalParams.configdbs)) {
@@ -247,13 +252,6 @@ static ExitCode runMongosServer( bool doUpgrade ) {
return EXIT_SHARDING_ERROR;
}
- if (!configServer.ok(true)) {
- mongo::log(LogComponent::kSharding) << "configServer connection startup check failed" << endl;
- return EXIT_SHARDING_ERROR;
- }
-
- startConfigServerChecker();
-
VersionType initVersionInfo;
VersionType versionInfo;
string errMsg;