diff options
Diffstat (limited to 'src/mongo/s/server.cpp')
-rw-r--r-- | src/mongo/s/server.cpp | 271 |
1 files changed, 128 insertions, 143 deletions
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 092a7339d67..94ed60e91de 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -92,125 +92,119 @@ namespace mongo { - using std::string; - using std::vector; +using std::string; +using std::vector; - using logger::LogComponent; +using logger::LogComponent; #if defined(_WIN32) - ntservice::NtServiceDefaultStrings defaultServiceStrings = { - L"MongoS", - L"MongoDB Router", - L"MongoDB Sharding Router" - }; - static ExitCode initService(); +ntservice::NtServiceDefaultStrings defaultServiceStrings = { + L"MongoS", L"MongoDB Router", L"MongoDB Sharding Router"}; +static ExitCode initService(); #endif - bool dbexitCalled = false; +bool dbexitCalled = false; - bool inShutdown() { - return dbexitCalled; - } +bool inShutdown() { + return dbexitCalled; +} - bool haveLocalShardingInfo( Client* client, const string& ns ) { - verify( 0 ); - return false; - } +bool haveLocalShardingInfo(Client* client, const string& ns) { + verify(0); + return false; +} - static BSONObj buildErrReply( const DBException& ex ) { - BSONObjBuilder errB; - errB.append( "$err", ex.what() ); - errB.append( "code", ex.getCode() ); - if ( !ex._shard.empty() ) { - errB.append( "shard", ex._shard ); - } - return errB.obj(); +static BSONObj buildErrReply(const DBException& ex) { + BSONObjBuilder errB; + errB.append("$err", ex.what()); + errB.append("code", ex.getCode()); + if (!ex._shard.empty()) { + errB.append("shard", ex._shard); } + return errB.obj(); +} - class ShardedMessageHandler : public MessageHandler { - public: - virtual ~ShardedMessageHandler() {} - - virtual void connected(AbstractMessagingPort* p) { - Client::initThread("conn", getGlobalServiceContext(), p); - } - - virtual void process(Message& m, AbstractMessagingPort* p) { - verify( p ); - Request r( m , p ); - - try { - r.init(); - r.process(); - } - catch ( const AssertionException& ex ) { - - LOG( ex.isUserAssertion() ? 1 : 0 ) << "Assertion failed" - << " while processing " << opToString( m.operation() ) << " op" - << " for " << r.getns() << causedBy( ex ); +class ShardedMessageHandler : public MessageHandler { +public: + virtual ~ShardedMessageHandler() {} - if ( r.expectResponse() ) { - m.header().setId(r.id()); - replyToQuery( ResultFlag_ErrSet, p , m , buildErrReply( ex ) ); - } + virtual void connected(AbstractMessagingPort* p) { + Client::initThread("conn", getGlobalServiceContext(), p); + } - // We *always* populate the last error for now - LastError::get(cc()).setLastError(ex.getCode(), ex.what()); + virtual void process(Message& m, AbstractMessagingPort* p) { + verify(p); + Request r(m, p); + + try { + r.init(); + r.process(); + } catch (const AssertionException& ex) { + LOG(ex.isUserAssertion() ? 1 : 0) << "Assertion failed" + << " while processing " << opToString(m.operation()) + << " op" + << " for " << r.getns() << causedBy(ex); + + if (r.expectResponse()) { + m.header().setId(r.id()); + replyToQuery(ResultFlag_ErrSet, p, m, buildErrReply(ex)); } - catch ( const DBException& ex ) { - log() << "Exception thrown" - << " while processing " << opToString( m.operation() ) << " op" - << " for " << r.getns() << causedBy( ex ); + // We *always* populate the last error for now + LastError::get(cc()).setLastError(ex.getCode(), ex.what()); + } catch (const DBException& ex) { + log() << "Exception thrown" + << " while processing " << opToString(m.operation()) << " op" + << " for " << r.getns() << causedBy(ex); - if ( r.expectResponse() ) { - m.header().setId(r.id()); - replyToQuery( ResultFlag_ErrSet, p , m , buildErrReply( ex ) ); - } - - // We *always* populate the last error for now - LastError::get(cc()).setLastError(ex.getCode(), ex.what()); + if (r.expectResponse()) { + m.header().setId(r.id()); + replyToQuery(ResultFlag_ErrSet, p, m, buildErrReply(ex)); } - // Release connections back to pool, if any still cached - ShardConnection::releaseMyConnections(); + // We *always* populate the last error for now + LastError::get(cc()).setLastError(ex.getCode(), ex.what()); } - }; - void start( const MessageServer::Options& opts ) { - balancer.go(); - cursorCache.startTimeoutThread(); - UserCacheInvalidator cacheInvalidatorThread(getGlobalAuthorizationManager()); - cacheInvalidatorThread.go(); + // Release connections back to pool, if any still cached + ShardConnection::releaseMyConnections(); + } +}; + +void start(const MessageServer::Options& opts) { + balancer.go(); + cursorCache.startTimeoutThread(); + UserCacheInvalidator cacheInvalidatorThread(getGlobalAuthorizationManager()); + cacheInvalidatorThread.go(); - PeriodicTask::startRunningPeriodicTasks(); + PeriodicTask::startRunningPeriodicTasks(); - ShardedMessageHandler handler; - MessageServer * server = createServer( opts , &handler ); - server->setAsTimeTracker(); - server->setupSockets(); - server->run(); - } + ShardedMessageHandler handler; + MessageServer* server = createServer(opts, &handler); + server->setAsTimeTracker(); + server->setupSockets(); + server->run(); +} - DBClientBase* createDirectClient(OperationContext* txn) { - uassert( 10197 , "createDirectClient not implemented for sharding yet" , 0 ); - return 0; - } +DBClientBase* createDirectClient(OperationContext* txn) { + uassert(10197, "createDirectClient not implemented for sharding yet", 0); + return 0; +} -} // namespace mongo +} // namespace mongo using namespace mongo; -static ExitCode runMongosServer( bool doUpgrade ) { - setThreadName( "mongosMain" ); - printShardingVersionInfo( false ); +static ExitCode runMongosServer(bool doUpgrade) { + setThreadName("mongosMain"); + printShardingVersionInfo(false); // Add sharding hooks to both connection pools - ShardingConnectionHook includes auth hooks globalConnPool.addHook(new ShardingConnectionHook(false)); shardConnectionPool.addHook(new ShardingConnectionHook(true)); // Mongos shouldn't lazily kill cursors, otherwise we can end up with extras from migration - DBClientConnection::setLazyKillCursor( false ); + DBClientConnection::setLazyKillCursor(false); ReplicaSetMonitor::setConfigChangeHook(&ConfigServer::replicaSetChange); @@ -232,11 +226,11 @@ static ExitCode runMongosServer( bool doUpgrade ) { } } - auto shardRegistry = stdx::make_unique<ShardRegistry>( - stdx::make_unique<RemoteCommandTargeterFactoryImpl>(), - stdx::make_unique<RemoteCommandRunnerImpl>(0), - std::unique_ptr<executor::TaskExecutor>{nullptr}, - catalogManager.get()); + auto shardRegistry = + stdx::make_unique<ShardRegistry>(stdx::make_unique<RemoteCommandTargeterFactoryImpl>(), + stdx::make_unique<RemoteCommandRunnerImpl>(0), + std::unique_ptr<executor::TaskExecutor>{nullptr}, + catalogManager.get()); grid.init(std::move(catalogManager), std::move(shardRegistry)); @@ -259,10 +253,8 @@ static ExitCode runMongosServer( bool doUpgrade ) { #endif if (serverGlobalParams.isHttpInterfaceEnabled) { - std::shared_ptr<DbWebServer> dbWebServer( - new DbWebServer(serverGlobalParams.bind_ip, - serverGlobalParams.port + 1000, - new NoAdminAccess())); + std::shared_ptr<DbWebServer> dbWebServer(new DbWebServer( + serverGlobalParams.bind_ip, serverGlobalParams.port + 1000, new NoAdminAccess())); dbWebServer->setupSockets(); stdx::thread web(stdx::bind(&webServerListenThread, dbWebServer)); @@ -298,12 +290,9 @@ MONGO_INITIALIZER_GENERAL(ForkServer, static void startupConfigActions(const std::vector<std::string>& argv) { #if defined(_WIN32) vector<string> disallowedOptions; - disallowedOptions.push_back( "upgrade" ); - ntservice::configureService(initService, - moe::startupOptionsParsed, - defaultServiceStrings, - disallowedOptions, - argv); + disallowedOptions.push_back("upgrade"); + ntservice::configureService( + initService, moe::startupOptionsParsed, defaultServiceStrings, disallowedOptions, argv); #endif } @@ -316,15 +305,15 @@ static int _main() { // we either have a setting where all processes are in localhost or none are std::vector<HostAndPort> configServers = mongosGlobalParams.configdbs.getServers(); for (std::vector<HostAndPort>::const_iterator it = configServers.begin(); - it != configServers.end(); ++it) { - + it != configServers.end(); + ++it) { const HostAndPort& configAddr = *it; if (it == configServers.begin()) { - grid.setAllowLocalHost( configAddr.isLocalHost() ); + grid.setAllowLocalHost(configAddr.isLocalHost()); } - if ( configAddr.isLocalHost() != grid.allowLocalHost() ) { + if (configAddr.isLocalHost() != grid.allowLocalHost()) { mongo::log(LogComponent::kDefault) << "cannot mix localhost and ip addresses in configdbs"; return 10; @@ -343,7 +332,7 @@ static int _main() { // To maintain backwards compatibility, we exit with EXIT_NET_ERROR if the listener loop returns. if (exitCode == EXIT_NET_ERROR) { - dbexit( EXIT_NET_ERROR ); + dbexit(EXIT_NET_ERROR); } return (exitCode == EXIT_CLEAN) ? 0 : 1; @@ -351,43 +340,43 @@ static int _main() { #if defined(_WIN32) namespace mongo { - static ExitCode initService() { - ntservice::reportStatus( SERVICE_RUNNING ); - log() << "Service running"; +static ExitCode initService() { + ntservice::reportStatus(SERVICE_RUNNING); + log() << "Service running"; - ExitCode exitCode = runMongosServer(mongosGlobalParams.upgrade); + ExitCode exitCode = runMongosServer(mongosGlobalParams.upgrade); - // ignore EXIT_NET_ERROR on clean shutdown since we return this when the listening socket - // is closed - return (exitCode == EXIT_NET_ERROR && inShutdown()) ? EXIT_CLEAN : exitCode; - } + // ignore EXIT_NET_ERROR on clean shutdown since we return this when the listening socket + // is closed + return (exitCode == EXIT_NET_ERROR && inShutdown()) ? EXIT_CLEAN : exitCode; +} } // namespace mongo #endif namespace { - std::unique_ptr<AuthzManagerExternalState> createAuthzManagerExternalStateMongos() { - return stdx::make_unique<AuthzManagerExternalStateMongos>(); - } +std::unique_ptr<AuthzManagerExternalState> createAuthzManagerExternalStateMongos() { + return stdx::make_unique<AuthzManagerExternalStateMongos>(); +} - MONGO_INITIALIZER(CreateAuthorizationExternalStateFactory) (InitializerContext* context) { - AuthzManagerExternalState::create = &createAuthzManagerExternalStateMongos; - return Status::OK(); - } +MONGO_INITIALIZER(CreateAuthorizationExternalStateFactory)(InitializerContext* context) { + AuthzManagerExternalState::create = &createAuthzManagerExternalStateMongos; + return Status::OK(); +} - MONGO_INITIALIZER(SetGlobalEnvironment)(InitializerContext* context) { - setGlobalServiceContext(stdx::make_unique<ServiceContextNoop>()); - return Status::OK(); - } +MONGO_INITIALIZER(SetGlobalEnvironment)(InitializerContext* context) { + setGlobalServiceContext(stdx::make_unique<ServiceContextNoop>()); + return Status::OK(); +} #ifdef MONGO_CONFIG_SSL - MONGO_INITIALIZER_GENERAL(setSSLManagerType, - MONGO_NO_PREREQUISITES, - ("SSLManager"))(InitializerContext* context) { - isSSLServer = true; - return Status::OK(); - } +MONGO_INITIALIZER_GENERAL(setSSLManagerType, + MONGO_NO_PREREQUISITES, + ("SSLManager"))(InitializerContext* context) { + isSSLServer = true; + return Status::OK(); +} #endif -} // namespace +} // namespace int mongoSMain(int argc, char* argv[], char** envp) { static StaticObserver staticObserver; @@ -410,17 +399,13 @@ int mongoSMain(int argc, char* argv[], char** envp) { try { int exitCode = _main(); return exitCode; - } - catch(const SocketException& e) { + } catch (const SocketException& e) { error() << "uncaught SocketException in mongos main: " << e.toString(); - } - catch (const DBException& e) { + } catch (const DBException& e) { error() << "uncaught DBException in mongos main: " << e.toString(); - } - catch (const std::exception& e) { + } catch (const std::exception& e) { error() << "uncaught std::exception in mongos main:" << e.what(); - } - catch (...) { + } catch (...) { error() << "uncaught unknown exception in mongos main"; } @@ -453,10 +438,10 @@ void mongo::signalShutdown() { void mongo::exitCleanly(ExitCode code) { // TODO: do we need to add anything? grid.catalogManager()->shutDown(); - mongo::dbexit( code ); + mongo::dbexit(code); } -void mongo::dbexit(ExitCode rc, const char *why) { +void mongo::dbexit(ExitCode rc, const char* why) { dbexitCalled = true; audit::logShutdown(ClientBasic::getCurrent()); |