diff options
author | Aaron <aaron@10gen.com> | 2009-03-02 13:45:26 -0500 |
---|---|---|
committer | Aaron <aaron@10gen.com> | 2009-03-02 13:45:26 -0500 |
commit | 67d39935dc11443690091ec246a3d6329fbe2aa0 (patch) | |
tree | a3fed08128bde10c4a7cd58e54be305caf315593 /util | |
parent | b5d11b9350a26fb579dd64f082e4c436b6c5ad6e (diff) | |
parent | 474d2c0d2465a686c01ed8a9f22deb485281563f (diff) | |
download | mongo-67d39935dc11443690091ec246a3d6329fbe2aa0.tar.gz |
Merge branch 'master' of git@github.com:mongodb/mongo
Diffstat (limited to 'util')
-rw-r--r-- | util/message_server_asio.cpp | 59 | ||||
-rw-r--r-- | util/message_server_port.cpp | 57 |
2 files changed, 86 insertions, 30 deletions
diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp index 12a26c0435e..58996db356d 100644 --- a/util/message_server_asio.cpp +++ b/util/message_server_asio.cpp @@ -1,5 +1,7 @@ // message_server_asio.cpp +#ifdef USE_ASIO + #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/enable_shared_from_this.hpp> @@ -30,17 +32,10 @@ namespace mongo { void start(){ cout << "MessageServerSession start from:" << _socket.remote_endpoint() << endl; - async_read( _socket , - buffer( &_inHeader , sizeof( _inHeader ) ) , - bind( &MessageServerSession::handleReadHeader , shared_from_this() , placeholders::error ) ); + _startHeaderRead(); } void handleReadHeader( const boost::system::error_code& error ){ - cout << "got header\n" - << " len: " << _inHeader.len << "\n" - << " id : " << _inHeader.id << "\n" - << " op : " << _inHeader._operation << "\n"; - if ( ! _inHeader.valid() ){ cerr << " got invalid header from: " << _socket.remote_endpoint() << " closing connected" << endl; return; @@ -52,7 +47,7 @@ namespace mongo { memcpy( data , &_inHeader , sizeof( _inHeader ) ); assert( data->len == _inHeader.len ); - uassert( "_cur not empty!" , ! _cur.data ); + uassert( "_cur not empty! pipelining requests not supported" , ! _cur.data ); _cur.setData( data , true ); async_read( _socket , @@ -61,18 +56,29 @@ namespace mongo { } void handleReadBody( const boost::system::error_code& error ){ + _replyCalled = false; + _handler->process( _cur , this ); + + if ( ! _replyCalled ){ + _cur.reset(); + _startHeaderRead(); + } } void handleWriteDone( const boost::system::error_code& error ){ _cur.reset(); + _replyCalled = false; + _startHeaderRead(); } - + virtual void reply( Message& received, Message& response ){ reply( received , response , received.data->id ); } virtual void reply( Message& query , Message& toSend, MSGID responseTo ){ + _replyCalled = true; + toSend.data->id = nextMessageId(); toSend.data->responseTo = responseTo; uassert( "pipelining requests doesn't work yet" , query.data->id == _cur.data->id ); @@ -83,10 +89,19 @@ namespace mongo { private: + + void _startHeaderRead(){ + async_read( _socket , + buffer( &_inHeader , sizeof( _inHeader ) ) , + bind( &MessageServerSession::handleReadHeader , shared_from_this() , placeholders::error ) ); + } + MessageHandler * _handler; tcp::socket _socket; MsgData _inHeader; Message _cur; + + bool _replyCalled; }; @@ -133,27 +148,11 @@ namespace mongo { tcp::endpoint _endpoint; tcp::acceptor _acceptor; }; - - // --temp hacks-- - void dbexit( int rc , const char * why ){ - cerr << "dbserver.cpp::dbexit" << endl; - ::exit(rc); - } - - const char * curNs = ""; - - string getDbContext(){ - return "getDbContext bad"; - } + MessageServer * createServer( int port , MessageHandler * handler ){ + return new AsyncMessageServer( port , handler ); + } } -using namespace mongo; - -int main(){ - mongo::AsyncMessageServer s(9999,0); - s.run(); - - return 0; -} +#endif diff --git a/util/message_server_port.cpp b/util/message_server_port.cpp new file mode 100644 index 00000000000..e73792c3047 --- /dev/null +++ b/util/message_server_port.cpp @@ -0,0 +1,57 @@ +// message_server_port.cpp + +#ifndef USE_ASIO + +#include "message.h" +#include "message_server.h" + +namespace mongo { + + class PortMessageServer : public MessageServer , public Listener { + public: + PortMessageServer( int port , MessageHandler * handler ) : + MessageServer( port , handler ) , + Listener( port ){ + + } + + void threadRun( MessagingPort * p ){ + assert( p ); + Message m; + try { + while ( 1 ){ + m.reset(); + + if ( ! p->recv(m) ) { + log() << "end connection " << p->farEnd.toString() << endl; + p->shutdown(); + break; + } + + _handler->process( m , p ); + } + } + catch ( ... ){ + problem() << "uncaught exception in PortMessageServer::threadRun, closing connection" << endl; + delete p; + } + } + + virtual void accepted(MessagingPort * p) { + boost::thread thr( bind( &PortMessageServer::threadRun , this , p ) ); + } + + void run(){ + listen(); + } + + }; + + + MessageServer * createServer( int port , MessageHandler * handler ){ + return new PortMessageServer( port , handler ); + } + +} + +#endif |