summaryrefslogtreecommitdiff
path: root/src/mongo/s/version_manager.cpp
diff options
context:
space:
mode:
authorAlberto Lerner <alerner@10gen.com>2012-12-20 21:29:40 -0500
committerAlberto Lerner <alerner@10gen.com>2012-12-21 17:53:06 -0500
commitf9f37613fc04e97373ae421dd9990970b79223e7 (patch)
tree324dbe2e4b5c4ca19ecf63d2d54fc6282ffeca1b /src/mongo/s/version_manager.cpp
parenta0b0cfa89c7b46dd0db9ff2f8a1d31cf76f9cd0f (diff)
downloadmongo-f9f37613fc04e97373ae421dd9990970b79223e7.tar.gz
Put version management class/header in its own module.
Diffstat (limited to 'src/mongo/s/version_manager.cpp')
-rw-r--r--src/mongo/s/version_manager.cpp292
1 files changed, 292 insertions, 0 deletions
diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp
new file mode 100644
index 00000000000..1496ebc1407
--- /dev/null
+++ b/src/mongo/s/version_manager.cpp
@@ -0,0 +1,292 @@
+// @file version_manager.cpp
+
+/**
+* Copyright (C) 2010 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "mongo/s/version_manager.h"
+
+#include "mongo/s/chunk.h"
+#include "mongo/s/chunk_version.h"
+#include "mongo/s/config.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/shard.h"
+#include "mongo/s/stale_exception.h" // for SendStaleConfigException
+#include "mongo/s/writeback_listener.h"
+
+namespace mongo {
+
+ // Global version manager
+ VersionManager versionManager;
+
+ // when running in sharded mode, use chunk shard version control
+ struct ConnectionShardStatus {
+
+ typedef unsigned long long S;
+
+ ConnectionShardStatus()
+ : _mutex( "ConnectionShardStatus" ) {
+ }
+
+ S getSequence( DBClientBase * conn , const string& ns ) {
+ scoped_lock lk( _mutex );
+ return _map[conn->getConnectionId()][ns];
+ }
+
+ void setSequence( DBClientBase * conn , const string& ns , const S& s ) {
+ scoped_lock lk( _mutex );
+ _map[conn->getConnectionId()][ns] = s;
+ }
+
+ void reset( DBClientBase * conn ) {
+ scoped_lock lk( _mutex );
+ _map.erase( conn->getConnectionId() );
+ }
+
+ // protects _map
+ mongo::mutex _mutex;
+
+ // a map from a connection into ChunkManager's sequence number for each namespace
+ map<unsigned long long, map<string,unsigned long long> > _map;
+
+ } connectionShardStatus;
+
+ void VersionManager::resetShardVersionCB( DBClientBase * conn ) {
+ connectionShardStatus.reset( conn );
+ }
+
+ bool VersionManager::isVersionableCB( DBClientBase* conn ){
+ return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET;
+ }
+
+ DBClientBase* getVersionable( DBClientBase* conn ){
+
+ switch ( conn->type() ) {
+ case ConnectionString::INVALID:
+ massert( 15904, str::stream() << "cannot set version on invalid connection " << conn->toString(), false );
+ return NULL;
+ case ConnectionString::MASTER:
+ return conn;
+ case ConnectionString::PAIR:
+ massert( 15905, str::stream() << "cannot set version or shard on pair connection " << conn->toString(), false );
+ return NULL;
+ case ConnectionString::SYNC:
+ massert( 15906, str::stream() << "cannot set version or shard on sync connection " << conn->toString(), false );
+ return NULL;
+ case ConnectionString::CUSTOM:
+ massert( 16334, str::stream() << "cannot set version or shard on custom connection " << conn->toString(), false );
+ return NULL;
+ case ConnectionString::SET:
+ DBClientReplicaSet* set = (DBClientReplicaSet*) conn;
+ return &( set->masterConn() );
+ }
+
+ verify( false );
+ return NULL;
+ }
+
+ extern OID serverID;
+
+ bool VersionManager::initShardVersionCB( DBClientBase * conn_in, BSONObj& result ){
+
+ WriteBackListener::init( *conn_in );
+
+ DBClientBase* conn = getVersionable( conn_in );
+ verify( conn ); // errors thrown above
+
+ BSONObjBuilder cmdBuilder;
+
+ cmdBuilder.append( "setShardVersion" , "" );
+ cmdBuilder.appendBool( "init", true );
+ cmdBuilder.append( "configdb" , configServer.modelServer() );
+ cmdBuilder.appendOID( "serverID" , &serverID );
+ cmdBuilder.appendBool( "authoritative" , true );
+
+ BSONObj cmd = cmdBuilder.obj();
+
+ LOG(1) << "initializing shard connection to " << conn->toString() << endl;
+ LOG(2) << "initial sharding settings : " << cmd << endl;
+
+ bool ok = conn->runCommand("admin", cmd, result, 0);
+
+ // HACK for backwards compatibility with v1.8.x, v2.0.0 and v2.0.1
+ // Result is false, but will still initialize serverID and configdb
+ if( ! ok && ! result["errmsg"].eoo() && ( result["errmsg"].String() == "need to specify namespace"/* 2.0.1/2 */ ||
+ result["errmsg"].String() == "need to speciy namespace" /* 1.8 */ ))
+ {
+ ok = true;
+ }
+
+ LOG(3) << "initial sharding result : " << result << endl;
+
+ return ok;
+
+ }
+
+ bool VersionManager::forceRemoteCheckShardVersionCB( const string& ns ){
+
+ DBConfigPtr conf = grid.getDBConfig( ns );
+ if ( ! conf ) return false;
+ conf->reload();
+
+ // If we don't have a collection, don't refresh the chunk manager
+ if( nsGetCollection( ns ).size() == 0 ) return false;
+
+ ChunkManagerPtr manager = conf->getChunkManagerIfExists( ns, true, true );
+ if( ! manager ) return false;
+
+ return true;
+
+ }
+
+ /**
+ * @return true if had to do something
+ */
+ bool checkShardVersion( DBClientBase * conn_in , const string& ns , ChunkManagerPtr refManager, bool authoritative , int tryNumber ) {
+ // TODO: cache, optimize, etc...
+
+ WriteBackListener::init( *conn_in );
+
+ DBConfigPtr conf = grid.getDBConfig( ns );
+ if ( ! conf )
+ return false;
+
+ DBClientBase* conn = getVersionable( conn_in );
+ verify(conn); // errors thrown above
+
+ unsigned long long officialSequenceNumber = 0;
+
+ ChunkManagerPtr manager;
+ const bool isSharded = conf->isSharded( ns );
+ if ( isSharded ) {
+ manager = conf->getChunkManagerIfExists( ns , authoritative );
+ // It's possible the chunk manager was reset since we checked whether sharded was true,
+ // so must check this here.
+ if( manager ) officialSequenceNumber = manager->getSequenceNumber();
+ }
+
+ // Check this manager against the reference manager
+ if( isSharded && manager ){
+
+ Shard shard = Shard::make( conn->getServerAddress() );
+ if( refManager && ! refManager->compatibleWith( manager, shard ) ){
+ throw SendStaleConfigException( ns, str::stream() << "manager (" << manager->getVersion( shard ).toString() << " : " << manager->getSequenceNumber() << ") "
+ << "not compatible with reference manager (" << refManager->getVersion( shard ).toString() << " : " << refManager->getSequenceNumber() << ") "
+ << "on shard " << shard.getName() << " (" << shard.getAddress().toString() << ")",
+ refManager->getVersion( shard ), manager->getVersion( shard ) );
+ }
+ }
+ else if( refManager ){
+ Shard shard = Shard::make( conn->getServerAddress() );
+ string msg( str::stream() << "not sharded ("
+ << ( (manager.get() == 0) ? string( "<none>" ) :
+ str::stream() << manager->getSequenceNumber() )
+ << ") but has reference manager ("
+ << refManager->getSequenceNumber() << ") "
+ << "on conn " << conn->getServerAddress() << " ("
+ << conn_in->getServerAddress() << ")" );
+
+ throw SendStaleConfigException( ns, msg,
+ refManager->getVersion( shard ), ChunkVersion( 0, OID() ));
+ }
+
+ // has the ChunkManager been reloaded since the last time we updated the connection-level version?
+ // (ie., last time we issued the setShardVersions below)
+ unsigned long long sequenceNumber = connectionShardStatus.getSequence(conn,ns);
+ if ( sequenceNumber == officialSequenceNumber ) {
+ return false;
+ }
+
+
+ ChunkVersion version = ChunkVersion( 0, OID() );
+ if ( isSharded && manager ) {
+ version = manager->getVersion( Shard::make( conn->getServerAddress() ) );
+ }
+
+ if( ! version.isSet() ){
+ LOG(0) << "resetting shard version of " << ns << " on " << conn->getServerAddress() << ", " <<
+ ( ! isSharded ? "no longer sharded" :
+ ( ! manager ? "no chunk manager found" :
+ "version is zero" ) ) << endl;
+ }
+
+ LOG(2) << " have to set shard version for conn: " << conn->getServerAddress() << " ns:" << ns
+ << " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber
+ << " version: " << version << " manager: " << manager.get()
+ << endl;
+
+ const string versionableServerAddress(conn->getServerAddress());
+
+ BSONObj result;
+ if ( setShardVersion( *conn , ns , version , manager , authoritative , result ) ) {
+ // success!
+ LOG(1) << " setShardVersion success: " << result << endl;
+ connectionShardStatus.setSequence( conn , ns , officialSequenceNumber );
+ return true;
+ }
+
+ LOG(1) << " setShardVersion failed!\n" << result << endl;
+
+ if ( result["need_authoritative"].trueValue() )
+ massert( 10428 , "need_authoritative set but in authoritative mode already" , ! authoritative );
+
+ if ( ! authoritative ) {
+ // use the original connection and get a fresh versionable connection
+ // since conn can be invalidated (or worse, freed) after the failure
+ checkShardVersion(conn_in, ns, refManager, 1, tryNumber + 1);
+ return true;
+ }
+
+ if ( result["reloadConfig"].trueValue() ) {
+ if( result["version"].timestampTime() == 0 ){
+
+ warning() << "reloading full configuration for " << conf->getName()
+ << ", connection state indicates significant version changes" << endl;
+
+ // reload db
+ conf->reload();
+ }
+ else {
+ // reload config
+ conf->getChunkManager( ns , true );
+ }
+ }
+
+ const int maxNumTries = 7;
+ if ( tryNumber < maxNumTries ) {
+ LOG( tryNumber < ( maxNumTries / 2 ) ? 1 : 0 )
+ << "going to retry checkShardVersion host: " << versionableServerAddress << " " << result << endl;
+ sleepmillis( 10 * tryNumber );
+ // use the original connection and get a fresh versionable connection
+ // since conn can be invalidated (or worse, freed) after the failure
+ checkShardVersion(conn_in, ns, refManager, true, tryNumber + 1);
+ return true;
+ }
+
+ string errmsg = str::stream() << "setShardVersion failed host: " << versionableServerAddress << " " << result;
+ log() << " " << errmsg << endl;
+ massert( 10429 , errmsg , 0 );
+ return true;
+ }
+
+ bool VersionManager::checkShardVersionCB( DBClientBase* conn_in , const string& ns , bool authoritative , int tryNumber ) {
+ return checkShardVersion( conn_in, ns, ChunkManagerPtr(), authoritative, tryNumber );
+ }
+
+ bool VersionManager::checkShardVersionCB( ShardConnection* conn_in , bool authoritative , int tryNumber ) {
+ return checkShardVersion( conn_in->get(), conn_in->getNS(), conn_in->getManager(), authoritative, tryNumber );
+ }
+
+} // namespace mongo