diff options
Diffstat (limited to 'src/mongo/tools/bridge.cpp')
-rw-r--r-- | src/mongo/tools/bridge.cpp | 95 |
1 files changed, 48 insertions, 47 deletions
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index bd8882c4dfc..dafc44b272b 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -51,15 +51,16 @@ using namespace mongo; using namespace std; namespace mongo { -bool inShutdown() { return false; } +bool inShutdown() { + return false; +} } // namespace mongo -void cleanup( int sig ); +void cleanup(int sig); class Forwarder { public: - Forwarder( MessagingPort &mp ) : mp_( mp ) { - } + Forwarder(MessagingPort& mp) : mp_(mp) {} void operator()() const { DBClientConnection dest; @@ -70,9 +71,9 @@ public: // If we can't connect for the configured timeout, give up // if (connectTimer.seconds() >= mongoBridgeGlobalParams.connectTimeoutSec) { - cout << "Unable to establish connection from " << mp_.psock->remoteString() - << " to " << mongoBridgeGlobalParams.destUri - << " after " << connectTimer.seconds() << " seconds. Giving up." << endl; + cout << "Unable to establish connection from " << mp_.psock->remoteString() + << " to " << mongoBridgeGlobalParams.destUri << " after " + << connectTimer.seconds() << " seconds. Giving up." << endl; mp_.shutdown(); return; } @@ -81,10 +82,10 @@ public: } Message m; - while( 1 ) { + while (1) { try { m.reset(); - if ( !mp_.recv( m ) ) { + if (!mp_.recv(m)) { cout << "end connection " << mp_.psock->remoteString() << endl; mp_.shutdown(); break; @@ -92,66 +93,66 @@ public: sleepmillis(mongoBridgeGlobalParams.delay); int oldId = m.header().getId(); - if ( m.operation() == dbQuery || m.operation() == dbMsg || m.operation() == dbGetMore ) { + if (m.operation() == dbQuery || m.operation() == dbMsg || + m.operation() == dbGetMore) { bool exhaust = false; - if ( m.operation() == dbQuery ) { - DbMessage d( m ); - QueryMessage q( d ); + if (m.operation() == dbQuery) { + DbMessage d(m); + QueryMessage q(d); exhaust = q.queryOptions & QueryOption_Exhaust; } Message response; - dest.port().call( m, response ); + dest.port().call(m, response); // nothing to reply with? - if ( response.empty() ) cleanup(0); + if (response.empty()) + cleanup(0); - mp_.reply( m, response, oldId ); - while ( exhaust ) { + mp_.reply(m, response, oldId); + while (exhaust) { MsgData::View header = response.header(); QueryResult::View qr = header.view2ptr(); - if ( qr.getCursorId() ) { + if (qr.getCursorId()) { response.reset(); - dest.port().recv( response ); - mp_.reply( m, response ); // m argument is ignored anyway - } - else { + dest.port().recv(response); + mp_.reply(m, response); // m argument is ignored anyway + } else { exhaust = false; } } + } else { + dest.port().say(m, oldId); } - else { - dest.port().say( m, oldId ); - } - } - catch ( ... ) { + } catch (...) { log() << "caught exception in Forwarder, continuing" << endl; } } } + private: - MessagingPort &mp_; + MessagingPort& mp_; }; -set<MessagingPort*>& ports ( *(new std::set<MessagingPort*>()) ); +set<MessagingPort*>& ports(*(new std::set<MessagingPort*>())); class MyListener : public Listener { public: - MyListener( int port ) : Listener( "bridge" , "", port ) {} - virtual void acceptedMP(MessagingPort *mp) { - ports.insert( mp ); - Forwarder f( *mp ); - stdx::thread t( f ); + MyListener(int port) : Listener("bridge", "", port) {} + virtual void acceptedMP(MessagingPort* mp) { + ports.insert(mp); + Forwarder f(*mp); + stdx::thread t(f); } }; -unique_ptr< MyListener > listener; +unique_ptr<MyListener> listener; -void cleanup( int sig ) { +void cleanup(int sig) { ListeningSockets::get()->closeAll(); - for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) + for (set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++) (*i)->shutdown(); - quickExit( 0 ); + quickExit(0); } #if !defined(_WIN32) void myterminate() { @@ -160,20 +161,20 @@ void myterminate() { } void setupSignals() { - signal( SIGINT , cleanup ); - signal( SIGTERM , cleanup ); - signal( SIGPIPE , cleanup ); - signal( SIGABRT , cleanup ); - signal( SIGSEGV , cleanup ); - signal( SIGBUS , cleanup ); - signal( SIGFPE , cleanup ); - set_terminate( myterminate ); + signal(SIGINT, cleanup); + signal(SIGTERM, cleanup); + signal(SIGPIPE, cleanup); + signal(SIGABRT, cleanup); + signal(SIGSEGV, cleanup); + signal(SIGBUS, cleanup); + signal(SIGFPE, cleanup); + set_terminate(myterminate); } #else inline void setupSignals() {} #endif -int toolMain( int argc, char **argv, char** envp ) { +int toolMain(int argc, char** argv, char** envp) { mongo::runGlobalInitializersOrDie(argc, argv, envp); static StaticObserver staticObserver; |