summaryrefslogtreecommitdiff
path: root/src/mongo/tools/bridge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/tools/bridge.cpp')
-rw-r--r--src/mongo/tools/bridge.cpp95
1 files changed, 48 insertions, 47 deletions
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp
index bd8882c4dfc..dafc44b272b 100644
--- a/src/mongo/tools/bridge.cpp
+++ b/src/mongo/tools/bridge.cpp
@@ -51,15 +51,16 @@ using namespace mongo;
using namespace std;
namespace mongo {
-bool inShutdown() { return false; }
+bool inShutdown() {
+ return false;
+}
} // namespace mongo
-void cleanup( int sig );
+void cleanup(int sig);
class Forwarder {
public:
- Forwarder( MessagingPort &mp ) : mp_( mp ) {
- }
+ Forwarder(MessagingPort& mp) : mp_(mp) {}
void operator()() const {
DBClientConnection dest;
@@ -70,9 +71,9 @@ public:
// If we can't connect for the configured timeout, give up
//
if (connectTimer.seconds() >= mongoBridgeGlobalParams.connectTimeoutSec) {
- cout << "Unable to establish connection from " << mp_.psock->remoteString()
- << " to " << mongoBridgeGlobalParams.destUri
- << " after " << connectTimer.seconds() << " seconds. Giving up." << endl;
+ cout << "Unable to establish connection from " << mp_.psock->remoteString()
+ << " to " << mongoBridgeGlobalParams.destUri << " after "
+ << connectTimer.seconds() << " seconds. Giving up." << endl;
mp_.shutdown();
return;
}
@@ -81,10 +82,10 @@ public:
}
Message m;
- while( 1 ) {
+ while (1) {
try {
m.reset();
- if ( !mp_.recv( m ) ) {
+ if (!mp_.recv(m)) {
cout << "end connection " << mp_.psock->remoteString() << endl;
mp_.shutdown();
break;
@@ -92,66 +93,66 @@ public:
sleepmillis(mongoBridgeGlobalParams.delay);
int oldId = m.header().getId();
- if ( m.operation() == dbQuery || m.operation() == dbMsg || m.operation() == dbGetMore ) {
+ if (m.operation() == dbQuery || m.operation() == dbMsg ||
+ m.operation() == dbGetMore) {
bool exhaust = false;
- if ( m.operation() == dbQuery ) {
- DbMessage d( m );
- QueryMessage q( d );
+ if (m.operation() == dbQuery) {
+ DbMessage d(m);
+ QueryMessage q(d);
exhaust = q.queryOptions & QueryOption_Exhaust;
}
Message response;
- dest.port().call( m, response );
+ dest.port().call(m, response);
// nothing to reply with?
- if ( response.empty() ) cleanup(0);
+ if (response.empty())
+ cleanup(0);
- mp_.reply( m, response, oldId );
- while ( exhaust ) {
+ mp_.reply(m, response, oldId);
+ while (exhaust) {
MsgData::View header = response.header();
QueryResult::View qr = header.view2ptr();
- if ( qr.getCursorId() ) {
+ if (qr.getCursorId()) {
response.reset();
- dest.port().recv( response );
- mp_.reply( m, response ); // m argument is ignored anyway
- }
- else {
+ dest.port().recv(response);
+ mp_.reply(m, response); // m argument is ignored anyway
+ } else {
exhaust = false;
}
}
+ } else {
+ dest.port().say(m, oldId);
}
- else {
- dest.port().say( m, oldId );
- }
- }
- catch ( ... ) {
+ } catch (...) {
log() << "caught exception in Forwarder, continuing" << endl;
}
}
}
+
private:
- MessagingPort &mp_;
+ MessagingPort& mp_;
};
-set<MessagingPort*>& ports ( *(new std::set<MessagingPort*>()) );
+set<MessagingPort*>& ports(*(new std::set<MessagingPort*>()));
class MyListener : public Listener {
public:
- MyListener( int port ) : Listener( "bridge" , "", port ) {}
- virtual void acceptedMP(MessagingPort *mp) {
- ports.insert( mp );
- Forwarder f( *mp );
- stdx::thread t( f );
+ MyListener(int port) : Listener("bridge", "", port) {}
+ virtual void acceptedMP(MessagingPort* mp) {
+ ports.insert(mp);
+ Forwarder f(*mp);
+ stdx::thread t(f);
}
};
-unique_ptr< MyListener > listener;
+unique_ptr<MyListener> listener;
-void cleanup( int sig ) {
+void cleanup(int sig) {
ListeningSockets::get()->closeAll();
- for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ )
+ for (set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++)
(*i)->shutdown();
- quickExit( 0 );
+ quickExit(0);
}
#if !defined(_WIN32)
void myterminate() {
@@ -160,20 +161,20 @@ void myterminate() {
}
void setupSignals() {
- signal( SIGINT , cleanup );
- signal( SIGTERM , cleanup );
- signal( SIGPIPE , cleanup );
- signal( SIGABRT , cleanup );
- signal( SIGSEGV , cleanup );
- signal( SIGBUS , cleanup );
- signal( SIGFPE , cleanup );
- set_terminate( myterminate );
+ signal(SIGINT, cleanup);
+ signal(SIGTERM, cleanup);
+ signal(SIGPIPE, cleanup);
+ signal(SIGABRT, cleanup);
+ signal(SIGSEGV, cleanup);
+ signal(SIGBUS, cleanup);
+ signal(SIGFPE, cleanup);
+ set_terminate(myterminate);
}
#else
inline void setupSignals() {}
#endif
-int toolMain( int argc, char **argv, char** envp ) {
+int toolMain(int argc, char** argv, char** envp) {
mongo::runGlobalInitializersOrDie(argc, argv, envp);
static StaticObserver staticObserver;