// server.cpp
/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
#include "pch.h"
#include
#include "mongo/base/init.h"
#include "mongo/base/initializer.h"
#include "mongo/base/status.h"
#include "mongo/client/connpool.h"
#include "mongo/db/auth/authz_manager_external_state_s.h"
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_manager_global.h"
#include "mongo/db/dbwebserver.h"
#include "mongo/db/initialize_server_global_state.h"
#include "mongo/db/lasterror.h"
#include "mongo/platform/process_id.h"
#include "mongo/s/balance.h"
#include "mongo/s/chunk.h"
#include "mongo/s/client_info.h"
#include "mongo/s/config.h"
#include "mongo/s/config_server_checker_service.h"
#include "mongo/s/config_upgrade.h"
#include "mongo/s/cursors.h"
#include "mongo/s/grid.h"
#include "mongo/s/request.h"
#include "mongo/s/server.h"
#include "mongo/scripting/engine.h"
#include "mongo/util/admin_access.h"
#include "mongo/util/concurrency/task.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/exception_filter_win32.h"
#include "mongo/util/log.h"
#include "mongo/util/net/message.h"
#include "mongo/util/net/message_server.h"
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/ntservice.h"
#include "mongo/util/processinfo.h"
#include "mongo/util/ramlog.h"
#include "mongo/util/signal_handlers.h"
#include "mongo/util/stacktrace.h"
#include "mongo/util/startup_test.h"
#include "mongo/util/stringutils.h"
#include "mongo/util/text.h"
#include "mongo/util/version.h"
namespace {
bool _isUpgradeSwitchSet = false;
}
namespace mongo {
#if defined(_WIN32)
ntservice::NtServiceDefaultStrings defaultServiceStrings = {
L"MongoS",
L"Mongo DB Router",
L"Mongo DB Sharding Router"
};
static void initService();
#endif
CmdLine cmdLine;
Database *database = 0;
string mongosCommand;
bool dbexitCalled = false;
static bool scriptingEnabled = true;
static vector configdbs;
bool inShutdown() {
return dbexitCalled;
}
string getDbContext() {
return "?";
}
bool haveLocalShardingInfo( const string& ns ) {
verify( 0 );
return false;
}
class ShardedMessageHandler : public MessageHandler {
public:
virtual ~ShardedMessageHandler() {}
virtual void connected( AbstractMessagingPort* p ) {
ClientInfo::create(p);
}
virtual void process( Message& m , AbstractMessagingPort* p , LastError * le) {
verify( p );
Request r( m , p );
verify( le );
lastError.startRequest( m , le );
try {
r.init();
r.process();
// Release connections after non-write op
if ( ShardConnection::releaseConnectionsAfterResponse && r.expectResponse() ) {
LOG(2) << "release thread local connections back to pool" << endl;
ShardConnection::releaseMyConnections();
}
}
catch ( AssertionException & e ) {
LOG( e.isUserAssertion() ? 1 : 0 ) << "AssertionException while processing op type : " << m.operation() << " to : " << r.getns() << causedBy(e) << endl;
le->raiseError( e.getCode() , e.what() );
m.header()->id = r.id();
if ( r.expectResponse() ) {
BSONObj err = BSON( "$err" << e.what() << "code" << e.getCode() );
replyToQuery( ResultFlag_ErrSet, p , m , err );
}
}
catch ( DBException& e ) {
// note that e.toString() is more detailed on a SocketException than
// e.what(). we should think about what is the right level of detail both
// for logging and return code.
log() << "DBException in process: " << e.what() << endl;
le->raiseError( e.getCode() , e.what() );
m.header()->id = r.id();
if ( r.expectResponse() ) {
BSONObjBuilder b;
b.append("$err",e.what()).append("code",e.getCode());
if( !e._shard.empty() ) {
b.append("shard",e._shard);
}
replyToQuery( ResultFlag_ErrSet, p , m , b.obj() );
}
}
}
virtual void disconnected( AbstractMessagingPort* p ) {
// all things are thread local
}
};
void sighandler(int sig) {
dbexit(EXIT_CLEAN, (string("received signal ") + BSONObjBuilder::numStr(sig)).c_str());
}
// this gets called when new fails to allocate memory
void my_new_handler() {
rawOut( "out of memory, printing stack and exiting:" );
printStackTrace();
::_exit(EXIT_ABRUPT);
}
#ifndef _WIN32
sigset_t asyncSignals;
void signalProcessingThread() {
while (true) {
int actualSignal = 0;
int status = sigwait( &asyncSignals, &actualSignal );
fassert(16779, status == 0);
switch (actualSignal) {
case SIGUSR1:
// log rotate signal
fassert(16780, rotateLogs());
break;
default:
// no one else should be here
fassertFailed(16778);
break;
}
}
}
void startSignalProcessingThread() {
verify( pthread_sigmask( SIG_SETMASK, &asyncSignals, 0 ) == 0 );
boost::thread it( signalProcessingThread );
}
#endif // not _WIN32
void setupSignalHandlers() {
setupSIGTRAPforGDB();
setupCoreSignals();
signal(SIGTERM, sighandler);
signal(SIGINT, sighandler);
#if defined(SIGQUIT)
signal( SIGQUIT , printStackAndExit );
#endif
signal( SIGSEGV , printStackAndExit );
signal( SIGABRT , printStackAndExit );
signal( SIGFPE , printStackAndExit );
#if defined(SIGBUS)
signal( SIGBUS , printStackAndExit );
#endif
#if defined(SIGPIPE)
signal( SIGPIPE , SIG_IGN );
#endif
#ifndef _WIN32
sigemptyset( &asyncSignals );
sigaddset( &asyncSignals, SIGUSR1 );
startSignalProcessingThread();
#endif
setWindowsUnhandledExceptionFilter();
set_new_handler( my_new_handler );
}
void init() {
serverID.init();
}
void start( const MessageServer::Options& opts ) {
balancer.go();
cursorCache.startTimeoutThread();
PeriodicTask::theRunner->go();
ShardedMessageHandler handler;
MessageServer * server = createServer( opts , &handler );
server->setAsTimeTracker();
server->run();
}
DBClientBase *createDirectClient() {
uassert( 10197 , "createDirectClient not implemented for sharding yet" , 0 );
return 0;
}
void printShardingVersionInfo( bool out ) {
if ( out ) {
cout << "MongoS version " << versionString << " starting: pid=" <<
ProcessId::getCurrent() << " port=" << cmdLine.port <<
( sizeof(int*) == 4 ? " 32" : " 64" ) << "-bit host=" << getHostNameCached() <<
" (--help for usage)" << endl;
DEV cout << "_DEBUG build" << endl;
cout << "git version: " << gitVersion() << endl;
cout << "build sys info: " << sysInfo() << endl;
}
else {
log() << "MongoS version " << versionString << " starting: pid=" <<
ProcessId::getCurrent() << " port=" << cmdLine.port <<
( sizeof( int* ) == 4 ? " 32" : " 64" ) << "-bit host=" << getHostNameCached() <<
" (--help for usage)" << endl;
DEV log() << "_DEBUG build" << endl;
printGitVersion();
printSysInfo();
printCommandLineOpts();
}
}
} // namespace mongo
using namespace mongo;
static bool runMongosServer( bool doUpgrade ) {
setupSignalHandlers();
setThreadName( "mongosMain" );
printShardingVersionInfo( false );
// set some global state
pool.addHook( new ShardingConnectionHook( false ) );
pool.setName( "mongos connectionpool" );
shardConnectionPool.addHook( new ShardingConnectionHook( true ) );
shardConnectionPool.setName( "mongos shardconnection connectionpool" );
// Mongos shouldn't lazily kill cursors, otherwise we can end up with extras from migration
DBClientConnection::setLazyKillCursor( false );
ReplicaSetMonitor::setConfigChangeHook( boost::bind( &ConfigServer::replicaSetChange , &configServer , _1 ) );
if ( ! configServer.init( configdbs ) ) {
log() << "couldn't resolve config db address" << endl;
return false;
}
if ( ! configServer.ok( true ) ) {
log() << "configServer connection startup check failed" << endl;
return false;
}
startConfigServerChecker();
VersionType initVersionInfo;
VersionType versionInfo;
string errMsg;
string configServerURL = configServer.getPrimary().getConnString();
ConnectionString configServerConnString = ConnectionString::parse(configServerURL, errMsg);
if (!configServerConnString.isValid()) {
error() << "Invalid connection string for config servers: " << configServerURL << endl;
return false;
}
bool upgraded = checkAndUpgradeConfigVersion(configServerConnString,
doUpgrade,
&initVersionInfo,
&versionInfo,
&errMsg);
if (!upgraded) {
error() << "error upgrading config database to v" << CURRENT_CONFIG_VERSION
<< causedBy(errMsg) << endl;
return false;
}
configServer.reloadSettings();
init();
#if !defined(_WIN32)
CmdLine::launchOk();
#endif
if ( cmdLine.isHttpInterfaceEnabled )
boost::thread web( boost::bind(&webServerThread, new NoAdminAccess() /* takes ownership */) );
MessageServer::Options opts;
opts.port = cmdLine.port;
opts.ipList = cmdLine.bind_ip;
start(opts);
// listen() will return when exit code closes its socket.
dbexit( EXIT_NET_ERROR );
return true;
}
#include
namespace po = boost::program_options;
static void processCommandLineOptions(const std::vector& argv) {
po::options_description general_options("General options");
#if defined(_WIN32)
po::options_description windows_scm_options("Windows Service Control Manager options");
#endif
po::options_description ssl_options("SSL options");
po::options_description sharding_options("Sharding options");
po::options_description visible_options("Allowed options");
po::options_description hidden_options("Hidden options");
po::positional_options_description positional_options;
CmdLine::addGlobalOptions( general_options, hidden_options, ssl_options );
hidden_options.add_options()
("noAutoSplit", "do not send split commands with writes");
#if defined(_WIN32)
CmdLine::addWindowsOptions( windows_scm_options, hidden_options );
#endif
sharding_options.add_options()
( "configdb" , po::value() , "1 or 3 comma separated config servers" )
( "localThreshold", po::value (), "ping time (in ms) for a node to be "
"considered local (default 15ms)" )
( "test" , "just run unit tests" )
( "upgrade" , "upgrade meta data version" )
( "chunkSize" , po::value(), "maximum amount of data per chunk" )
( "ipv6", "enable IPv6 support (disabled by default)" )
( "jsonp","allow JSONP access via http (has security implications)" )
( "noscripting", "disable scripting engine" )
;
visible_options.add(general_options);
#if defined(_WIN32)
visible_options.add(windows_scm_options);
#endif
visible_options.add(sharding_options);
#ifdef MONGO_SSL
visible_options.add(ssl_options);
#endif
// parse options
po::variables_map params;
if (!CmdLine::store(argv,
visible_options,
hidden_options,
positional_options,
params)) {
::_exit(EXIT_FAILURE);
}
// The default value may vary depending on compile options, but for mongos
// we want durability to be disabled.
cmdLine.dur = false;
if ( params.count( "help" ) ) {
cout << visible_options << endl;
::_exit(EXIT_SUCCESS);
}
if ( params.count( "version" ) ) {
printShardingVersionInfo(true);
::_exit(EXIT_SUCCESS);
}
if ( params.count( "chunkSize" ) ) {
int csize = params["chunkSize"].as();
// validate chunksize before proceeding
if ( csize == 0 ) {
out() << "error: need a non-zero chunksize" << endl;
::_exit(EXIT_FAILURE);
}
if ( !Chunk::setMaxChunkSizeSizeMB( csize ) ) {
out() << "MaxChunkSize invalid" << endl;
::_exit(EXIT_FAILURE);
}
}
if ( params.count( "localThreshold" ) ) {
cmdLine.defaultLocalThresholdMillis = params["localThreshold"].as();
}
if ( params.count( "ipv6" ) ) {
enableIPv6();
}
if ( params.count( "jsonp" ) ) {
cmdLine.jsonp = true;
}
if ( params.count( "test" ) ) {
::mongo::logger::globalLogDomain()->setMinimumLoggedSeverity(::mongo::logger::LogSeverity::Debug(5));
StartupTest::runTests();
cout << "tests passed" << endl;
::_exit(EXIT_SUCCESS);
}
if (params.count("noscripting")) {
scriptingEnabled = false;
}
if (params.count("httpinterface")) {
if (params.count("nohttpinterface")) {
out() << "can't have both --httpinterface and --nohttpinterface" << endl;
::_exit(EXIT_FAILURE);
}
cmdLine.isHttpInterfaceEnabled = true;
}
if (params.count("noAutoSplit")) {
warning() << "running with auto-splitting disabled" << endl;
Chunk::ShouldAutoSplit = false;
}
if ( ! params.count( "configdb" ) ) {
out() << "error: no args for --configdb" << endl;
::_exit(EXIT_FAILURE);
}
splitStringDelim( params["configdb"].as() , &configdbs , ',' );
if ( configdbs.size() != 1 && configdbs.size() != 3 ) {
out() << "need either 1 or 3 configdbs" << endl;
::_exit(EXIT_FAILURE);
}
if( configdbs.size() == 1 ) {
warning() << "running with 1 config server should be done only for testing purposes and is not recommended for production" << endl;
}
_isUpgradeSwitchSet = params.count("upgrade");
// dbpath currently must be linked in to mongos, but the directory should never be written to.
dbpath = "";
#if defined(_WIN32)
vector disallowedOptions;
disallowedOptions.push_back( "upgrade" );
ntservice::configureService(initService,
params,
defaultServiceStrings,
disallowedOptions,
argv);
#endif
}
static int _main() {
if (!initializeServerGlobalState())
return EXIT_FAILURE;
// we either have a setting where all processes are in localhost or none are
for ( vector::const_iterator it = configdbs.begin() ; it != configdbs.end() ; ++it ) {
try {
HostAndPort configAddr( *it ); // will throw if address format is invalid
if ( it == configdbs.begin() ) {
grid.setAllowLocalHost( configAddr.isLocalHost() );
}
if ( configAddr.isLocalHost() != grid.allowLocalHost() ) {
out() << "cannot mix localhost and ip addresses in configdbs" << endl;
return 10;
}
}
catch ( DBException& e) {
out() << "configdb: " << e.what() << endl;
return 9;
}
}
#if defined(_WIN32)
if (ntservice::shouldStartService()) {
ntservice::startService();
// if we reach here, then we are not running as a service. service installation
// exits directly and so never reaches here either.
}
#endif
return !runMongosServer(_isUpgradeSwitchSet);
}
#if defined(_WIN32)
namespace mongo {
static void initService() {
ntservice::reportStatus( SERVICE_RUNNING );
log() << "Service running" << endl;
runMongosServer( false );
}
} // namespace mongo
#endif
MONGO_INITIALIZER(CreateAuthorizationManager)(InitializerContext* context) {
setGlobalAuthorizationManager(new AuthorizationManager(new AuthzManagerExternalStateMongos()));
return Status::OK();
}
#ifdef MONGO_SSL
MONGO_INITIALIZER_GENERAL(setSSLManagerType,
MONGO_NO_PREREQUISITES,
("SSLManager"))(InitializerContext* context) {
isSSLServer = true;
return Status::OK();
}
#endif
int mongoSMain(int argc, char* argv[], char** envp) {
static StaticObserver staticObserver;
if (argc < 1)
return EXIT_FAILURE;
mongosCommand = argv[0];
processCommandLineOptions(std::vector(argv, argv + argc));
mongo::forkServerOrDie();
mongo::runGlobalInitializersOrDie(argc, argv, envp);
CmdLine::censor(argc, argv);
try {
int exitCode = _main();
return exitCode;
}
catch(SocketException& e) {
cout << "uncaught SocketException in mongos main:" << endl;
cout << e.toString() << endl;
}
catch(DBException& e) {
cout << "uncaught DBException in mongos main:" << endl;
cout << e.toString() << endl;
}
catch(std::exception& e) {
cout << "uncaught std::exception in mongos main:" << endl;
cout << e.what() << endl;
}
catch(...) {
cout << "uncaught unknown exception in mongos main" << endl;
}
return 20;
}
#if defined(_WIN32)
// In Windows, wmain() is an alternate entry point for main(), and receives the same parameters
// as main() but encoded in Windows Unicode (UTF-16); "wide" 16-bit wchar_t characters. The
// WindowsCommandLine object converts these wide character strings to a UTF-8 coded equivalent
// and makes them available through the argv() and envp() members. This enables mongoSMain()
// to process UTF-8 encoded arguments and environment variables without regard to platform.
int wmain(int argc, wchar_t* argvW[], wchar_t* envpW[]) {
WindowsCommandLine wcl(argc, argvW, envpW);
int exitCode = mongoSMain(argc, wcl.argv(), wcl.envp());
::_exit(exitCode);
}
#else
int main(int argc, char* argv[], char** envp) {
int exitCode = mongoSMain(argc, argv, envp);
::_exit(exitCode);
}
#endif
#undef exit
void mongo::exitCleanly( ExitCode code ) {
// TODO: do we need to add anything?
mongo::dbexit( code );
}
void mongo::dbexit( ExitCode rc, const char *why ) {
dbexitCalled = true;
#if defined(_WIN32)
if ( rc == EXIT_WINDOWS_SERVICE_STOP ) {
log() << "dbexit: exiting because Windows service was stopped" << endl;
return;
}
#endif
log() << "dbexit: " << why
<< " rc:" << rc
<< " " << ( why ? why : "" )
<< endl;
::_exit(rc);
}