summaryrefslogtreecommitdiff
path: root/util
diff options
context:
space:
mode:
authorAaron <aaron@10gen.com>2009-03-02 13:45:26 -0500
committerAaron <aaron@10gen.com>2009-03-02 13:45:26 -0500
commit67d39935dc11443690091ec246a3d6329fbe2aa0 (patch)
treea3fed08128bde10c4a7cd58e54be305caf315593 /util
parentb5d11b9350a26fb579dd64f082e4c436b6c5ad6e (diff)
parent474d2c0d2465a686c01ed8a9f22deb485281563f (diff)
downloadmongo-67d39935dc11443690091ec246a3d6329fbe2aa0.tar.gz
Merge branch 'master' of git@github.com:mongodb/mongo
Diffstat (limited to 'util')
-rw-r--r--util/message_server_asio.cpp59
-rw-r--r--util/message_server_port.cpp57
2 files changed, 86 insertions, 30 deletions
diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp
index 12a26c0435e..58996db356d 100644
--- a/util/message_server_asio.cpp
+++ b/util/message_server_asio.cpp
@@ -1,5 +1,7 @@
// message_server_asio.cpp
+#ifdef USE_ASIO
+
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
@@ -30,17 +32,10 @@ namespace mongo {
void start(){
cout << "MessageServerSession start from:" << _socket.remote_endpoint() << endl;
- async_read( _socket ,
- buffer( &_inHeader , sizeof( _inHeader ) ) ,
- bind( &MessageServerSession::handleReadHeader , shared_from_this() , placeholders::error ) );
+ _startHeaderRead();
}
void handleReadHeader( const boost::system::error_code& error ){
- cout << "got header\n"
- << " len: " << _inHeader.len << "\n"
- << " id : " << _inHeader.id << "\n"
- << " op : " << _inHeader._operation << "\n";
-
if ( ! _inHeader.valid() ){
cerr << " got invalid header from: " << _socket.remote_endpoint() << " closing connected" << endl;
return;
@@ -52,7 +47,7 @@ namespace mongo {
memcpy( data , &_inHeader , sizeof( _inHeader ) );
assert( data->len == _inHeader.len );
- uassert( "_cur not empty!" , ! _cur.data );
+ uassert( "_cur not empty! pipelining requests not supported" , ! _cur.data );
_cur.setData( data , true );
async_read( _socket ,
@@ -61,18 +56,29 @@ namespace mongo {
}
void handleReadBody( const boost::system::error_code& error ){
+ _replyCalled = false;
+
_handler->process( _cur , this );
+
+ if ( ! _replyCalled ){
+ _cur.reset();
+ _startHeaderRead();
+ }
}
void handleWriteDone( const boost::system::error_code& error ){
_cur.reset();
+ _replyCalled = false;
+ _startHeaderRead();
}
-
+
virtual void reply( Message& received, Message& response ){
reply( received , response , received.data->id );
}
virtual void reply( Message& query , Message& toSend, MSGID responseTo ){
+ _replyCalled = true;
+
toSend.data->id = nextMessageId();
toSend.data->responseTo = responseTo;
uassert( "pipelining requests doesn't work yet" , query.data->id == _cur.data->id );
@@ -83,10 +89,19 @@ namespace mongo {
private:
+
+ void _startHeaderRead(){
+ async_read( _socket ,
+ buffer( &_inHeader , sizeof( _inHeader ) ) ,
+ bind( &MessageServerSession::handleReadHeader , shared_from_this() , placeholders::error ) );
+ }
+
MessageHandler * _handler;
tcp::socket _socket;
MsgData _inHeader;
Message _cur;
+
+ bool _replyCalled;
};
@@ -133,27 +148,11 @@ namespace mongo {
tcp::endpoint _endpoint;
tcp::acceptor _acceptor;
};
-
- // --temp hacks--
- void dbexit( int rc , const char * why ){
- cerr << "dbserver.cpp::dbexit" << endl;
- ::exit(rc);
- }
-
- const char * curNs = "";
-
- string getDbContext(){
- return "getDbContext bad";
- }
+ MessageServer * createServer( int port , MessageHandler * handler ){
+ return new AsyncMessageServer( port , handler );
+ }
}
-using namespace mongo;
-
-int main(){
- mongo::AsyncMessageServer s(9999,0);
- s.run();
-
- return 0;
-}
+#endif
diff --git a/util/message_server_port.cpp b/util/message_server_port.cpp
new file mode 100644
index 00000000000..e73792c3047
--- /dev/null
+++ b/util/message_server_port.cpp
@@ -0,0 +1,57 @@
+// message_server_port.cpp
+
+#ifndef USE_ASIO
+
+#include "message.h"
+#include "message_server.h"
+
+namespace mongo {
+
+ class PortMessageServer : public MessageServer , public Listener {
+ public:
+ PortMessageServer( int port , MessageHandler * handler ) :
+ MessageServer( port , handler ) ,
+ Listener( port ){
+
+ }
+
+ void threadRun( MessagingPort * p ){
+ assert( p );
+ Message m;
+ try {
+ while ( 1 ){
+ m.reset();
+
+ if ( ! p->recv(m) ) {
+ log() << "end connection " << p->farEnd.toString() << endl;
+ p->shutdown();
+ break;
+ }
+
+ _handler->process( m , p );
+ }
+ }
+ catch ( ... ){
+ problem() << "uncaught exception in PortMessageServer::threadRun, closing connection" << endl;
+ delete p;
+ }
+ }
+
+ virtual void accepted(MessagingPort * p) {
+ boost::thread thr( bind( &PortMessageServer::threadRun , this , p ) );
+ }
+
+ void run(){
+ listen();
+ }
+
+ };
+
+
+ MessageServer * createServer( int port , MessageHandler * handler ){
+ return new PortMessageServer( port , handler );
+ }
+
+}
+
+#endif