diff options
author | Eliot Horowitz <eliot@10gen.com> | 2011-06-26 17:56:43 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2011-06-26 17:56:43 -0400 |
commit | 0594b8a6ab5ba1561e1fe760e755f2a055beb699 (patch) | |
tree | 5b402f673234f0a2a955cbdb8b2f72cbfcfa9b72 | |
parent | ddde89b8cfb567480c7db6b2832eb6280974ea2a (diff) | |
download | mongo-0594b8a6ab5ba1561e1fe760e755f2a055beb699.tar.gz |
net cleaning: starting to split up message.h
-rw-r--r-- | SConstruct | 2 | ||||
-rw-r--r-- | client/dbclient.h | 1 | ||||
-rw-r--r-- | db/dbmessage.h | 71 | ||||
-rw-r--r-- | db/dur_journal.cpp | 2 | ||||
-rw-r--r-- | db/queryoptimizer.h | 2 | ||||
-rw-r--r-- | db/record.cpp | 2 | ||||
-rw-r--r-- | s/d_writeback.cpp | 2 | ||||
-rw-r--r-- | util/goodies.h | 2 | ||||
-rw-r--r-- | util/net/httpclient.cpp | 1 | ||||
-rw-r--r-- | util/net/message.cpp | 14 | ||||
-rw-r--r-- | util/net/message.h | 207 | ||||
-rw-r--r-- | util/net/message_server_port.cpp | 1 | ||||
-rw-r--r-- | util/net/miniwebserver.h | 1 | ||||
-rw-r--r-- | util/net/sock.h | 16 |
14 files changed, 58 insertions, 266 deletions
diff --git a/SConstruct b/SConstruct index a430a09f0d6..5431db5197d 100644 --- a/SConstruct +++ b/SConstruct @@ -325,7 +325,7 @@ coreDbFiles = [ "db/commands.cpp" ] coreServerFiles = [ "util/net/message_server_port.cpp" , "client/parallel.cpp" , "db/common.cpp", "util/net/miniwebserver.cpp" , "db/dbwebserver.cpp" , - "db/matcher.cpp" , "db/dbcommands_generic.cpp" ] + "db/matcher.cpp" , "db/dbcommands_generic.cpp" , "db/dbmessage.cpp" ] mmapFiles = [ "util/mmap.cpp" ] diff --git a/client/dbclient.h b/client/dbclient.h index c55a397dff3..d461bbfbec0 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -22,6 +22,7 @@ #include "../pch.h" #include "../util/net/message.h" +#include "../util/net/message_port.h" #include "../db/jsobj.h" #include "../db/json.h" #include <stack> diff --git a/db/dbmessage.h b/db/dbmessage.h index 7a72e84f9af..a2dc113c2b9 100644 --- a/db/dbmessage.h +++ b/db/dbmessage.h @@ -1,3 +1,5 @@ +// dbmessage.h + /** * Copyright (C) 2008 10gen Inc. * @@ -21,6 +23,7 @@ #include "namespace-inl.h" #include "../util/net/message.h" #include "../client/constants.h" +#include "instance.h" namespace mongo { @@ -251,69 +254,21 @@ namespace mongo { } }; -} // namespace mongo - -#include "../client/dbclient.h" - -namespace mongo { - - inline void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, Message& requestMsg, - void *data, int size, - int nReturned, int startingFrom = 0, - long long cursorId = 0 - ) { - BufBuilder b(32768); - b.skip(sizeof(QueryResult)); - b.appendBuf(data, size); - QueryResult *qr = (QueryResult *) b.buf(); - qr->_resultFlags() = queryResultFlags; - qr->len = b.len(); - qr->setOperation(opReply); - qr->cursorId = cursorId; - qr->startingFrom = startingFrom; - qr->nReturned = nReturned; - b.decouple(); - Message resp(qr, true); - p->reply(requestMsg, resp, requestMsg.header()->id); - } + void replyToQuery(int queryResultFlags, + AbstractMessagingPort* p, Message& requestMsg, + void *data, int size, + int nReturned, int startingFrom = 0, + long long cursorId = 0 + ); -} // namespace mongo - -//#include "bsonobj.h" - -#include "instance.h" - -namespace mongo { /* object reply helper. */ - inline void replyToQuery(int queryResultFlags, - AbstractMessagingPort* p, Message& requestMsg, - BSONObj& responseObj) { - replyToQuery(queryResultFlags, - p, requestMsg, - (void *) responseObj.objdata(), responseObj.objsize(), 1); - } + void replyToQuery(int queryResultFlags, + AbstractMessagingPort* p, Message& requestMsg, + BSONObj& responseObj); /* helper to do a reply using a DbResponse object */ - inline void replyToQuery(int queryResultFlags, Message &m, DbResponse &dbresponse, BSONObj obj) { - BufBuilder b; - b.skip(sizeof(QueryResult)); - b.appendBuf((void*) obj.objdata(), obj.objsize()); - QueryResult* msgdata = (QueryResult *) b.buf(); - b.decouple(); - QueryResult *qr = msgdata; - qr->_resultFlags() = queryResultFlags; - qr->len = b.len(); - qr->setOperation(opReply); - qr->cursorId = 0; - qr->startingFrom = 0; - qr->nReturned = 1; - Message *resp = new Message(); - resp->setData(msgdata, true); // transport will free - dbresponse.response = resp; - dbresponse.responseTo = m.header()->id; - } + void replyToQuery(int queryResultFlags, Message &m, DbResponse &dbresponse, BSONObj obj); string debugString( Message& m ); diff --git a/db/dur_journal.cpp b/db/dur_journal.cpp index 1695a56932b..de959614485 100644 --- a/db/dur_journal.cpp +++ b/db/dur_journal.cpp @@ -25,7 +25,7 @@ #include "../util/logfile.h" #include "../util/timer.h" #include "../util/alignedbuilder.h" -#include "../util/net/message.h" // getelapsedtimemillis +#include "../util/net/message_port.h" // getelapsedtimemillis #include "../util/concurrency/race.h" #include <boost/static_assert.hpp> #undef assert diff --git a/db/queryoptimizer.h b/db/queryoptimizer.h index 616c457a239..3b2bbdf53a0 100644 --- a/db/queryoptimizer.h +++ b/db/queryoptimizer.h @@ -23,12 +23,14 @@ #include "queryutil.h" #include "matcher.h" #include "../util/net/message.h" +#include "../util/net/message_port.h" #include <queue> namespace mongo { class IndexDetails; class IndexType; + class ElapsedTracker; /** A plan for executing a query using the given index spec and FieldRangeSet. */ class QueryPlan : boost::noncopyable { diff --git a/db/record.cpp b/db/record.cpp index d8cf7c7ef6e..e540fccdc34 100644 --- a/db/record.cpp +++ b/db/record.cpp @@ -3,7 +3,7 @@ #include "pch.h" #include "pdfile.h" #include "../util/processinfo.h" -#include "../util/net/message.h" +#include "../util/net/message_port.h" namespace mongo { diff --git a/s/d_writeback.cpp b/s/d_writeback.cpp index fe1b9fb29df..ff0891330c8 100644 --- a/s/d_writeback.cpp +++ b/s/d_writeback.cpp @@ -20,7 +20,7 @@ #include "../db/commands.h" #include "../util/queue.h" -#include "../util/net/message.h" +#include "../util/net/message_port.h" #include "d_writeback.h" diff --git a/util/goodies.h b/util/goodies.h index d996f061b79..852fef32809 100644 --- a/util/goodies.h +++ b/util/goodies.h @@ -493,6 +493,8 @@ namespace mongo { T* _p; }; + + /** Hmmmm */ using namespace boost; diff --git a/util/net/httpclient.cpp b/util/net/httpclient.cpp index c8e14eb033f..01927022c31 100644 --- a/util/net/httpclient.cpp +++ b/util/net/httpclient.cpp @@ -19,6 +19,7 @@ #include "httpclient.h" #include "sock.h" #include "message.h" +#include "message_port.h" #include "../..//bson/util/builder.h" namespace mongo { diff --git a/util/net/message.cpp b/util/net/message.cpp index 2bac961b3a5..851107ce4f5 100644 --- a/util/net/message.cpp +++ b/util/net/message.cpp @@ -25,6 +25,7 @@ #include <time.h> #include "message.h" +#include "message_port.h" #include "../goodies.h" #include "../background.h" #include "../time_support.h" @@ -48,6 +49,19 @@ namespace mongo { + + void Message::send( MessagingPort &p, const char *context ) { + if ( empty() ) { + return; + } + if ( _buf != 0 ) { + p.send( (char*)_buf, _buf->len, context ); + } + else { + p.send( _data, context ); + } + } + bool objcheck = false; void checkTicketNumbers(); diff --git a/util/net/message.h b/util/net/message.h index 8ecf1d447e1..d0186b5f36c 100644 --- a/util/net/message.h +++ b/util/net/message.h @@ -1,4 +1,4 @@ -// Message.h +// message.h /* Copyright 2009 10gen Inc. * @@ -29,148 +29,6 @@ namespace mongo { typedef AtomicUInt MSGID; - class Listener : boost::noncopyable { - public: - Listener(const string &ip, int p, bool logConnect=true ) : _port(p), _ip(ip), _logConnect(logConnect), _elapsedTime(0) { } - virtual ~Listener() { - if ( _timeTracker == this ) - _timeTracker = 0; - } - void initAndListen(); // never returns unless error (start a thread) - - /* spawn a thread, etc., then return */ - virtual void accepted(int sock, const SockAddr& from); - virtual void accepted(MessagingPort *mp) { - assert(!"You must overwrite one of the accepted methods"); - } - - const int _port; - - /** - * @return a rough estimate of elapsed time since the server started - */ - long long getMyElapsedTimeMillis() const { return _elapsedTime; } - - void setAsTimeTracker() { - _timeTracker = this; - } - - static const Listener* getTimeTracker() { - return _timeTracker; - } - - static long long getElapsedTimeMillis() { - if ( _timeTracker ) - return _timeTracker->getMyElapsedTimeMillis(); - - // should this assert or throw? seems like callers may not expect to get zero back, certainly not forever. - return 0; - } - - private: - string _ip; - bool _logConnect; - long long _elapsedTime; - - static const Listener* _timeTracker; - - virtual bool useUnixSockets() const { return false; } - }; - - class AbstractMessagingPort : boost::noncopyable { - public: - AbstractMessagingPort() : tag(0) {} - virtual ~AbstractMessagingPort() { } - virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available - virtual void reply(Message& received, Message& response) = 0; - - virtual HostAndPort remote() const = 0; - virtual unsigned remotePort() const = 0; - - private: - - public: - // TODO make this private with some helpers - - /* ports can be tagged with various classes. see closeAllSockets(tag). defaults to 0. */ - unsigned tag; - - }; - - class MessagingPort : public AbstractMessagingPort { - public: - MessagingPort(int sock, const SockAddr& farEnd); - - // in some cases the timeout will actually be 2x this value - eg we do a partial send, - // then the timeout fires, then we try to send again, then the timeout fires again with - // no data sent, then we detect that the other side is down - MessagingPort(double so_timeout = 0, int logLevel = 0 ); - - virtual ~MessagingPort(); - - void shutdown(); - - bool connect(SockAddr& farEnd); - - /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's. - also, the Message data will go out of scope on the subsequent recv call. - */ - bool recv(Message& m); - void reply(Message& received, Message& response, MSGID responseTo); - void reply(Message& received, Message& response); - bool call(Message& toSend, Message& response); - - void say(Message& toSend, int responseTo = -1); - - /** - * this is used for doing 'async' queries - * instead of doing call( to , from ) - * you would do - * say( to ) - * recv( from ) - * Note: if you fail to call recv and someone else uses this port, - * horrible things will happend - */ - bool recv( const Message& sent , Message& response ); - - void piggyBack( Message& toSend , int responseTo = -1 ); - - virtual unsigned remotePort() const; - virtual HostAndPort remote() const; - - // send len or throw SocketException - void send( const char * data , int len, const char *context ); - void send( const vector< pair< char *, int > > &data, const char *context ); - - // recv len or throw SocketException - void recv( char * data , int len ); - - int unsafe_recv( char *buf, int max ); - - void clearCounters() { _bytesIn = 0; _bytesOut = 0; } - long long getBytesIn() const { return _bytesIn; } - long long getBytesOut() const { return _bytesOut; } - private: - int sock; - PiggyBackData * piggyBackData; - - long long _bytesIn; - long long _bytesOut; - - // this is the parsed version of farEnd - // mutable because its initialized only on call to remote() - mutable HostAndPort _farEndParsed; - - public: - SockAddr farEnd; - double _timeout; - int _logLevel; // passed to log() when logging errors - - static void closeAllSockets(unsigned tagMask = 0xffffffff); - - friend class PiggyBackData; - }; - enum Operations { opReply = 1, /* reply. responseTo is set. */ dbMsg = 1000, /* generic msg command followed by a string */ @@ -430,17 +288,8 @@ namespace mongo { return _freeIt; } - void send( MessagingPort &p, const char *context ) { - if ( empty() ) { - return; - } - if ( _buf != 0 ) { - p.send( (char*)_buf, _buf->len, context ); - } - else { - p.send( _data, context ); - } - } + void send( MessagingPort &p, const char *context ); + private: void _setData( MsgData *d, bool freeIt ) { @@ -455,59 +304,9 @@ namespace mongo { bool _freeIt; }; - class SocketException : public DBException { - public: - const enum Type { CLOSED , RECV_ERROR , SEND_ERROR, RECV_TIMEOUT, SEND_TIMEOUT, FAILED_STATE, CONNECT_ERROR } _type; - - SocketException( Type t , string server , int code = 9001 , string extra="" ) : DBException( "socket exception" , code ) , _type(t) , _server(server), _extra(extra){ } - virtual ~SocketException() throw() {} - - bool shouldPrint() const { return _type != CLOSED; } - virtual string toString() const; - - private: - string _server; - string _extra; - }; MSGID nextMessageId(); extern TicketHolder connTicketHolder; - class ElapsedTracker { - public: - ElapsedTracker( int hitsBetweenMarks , int msBetweenMarks ) - : _h( hitsBetweenMarks ) , _ms( msBetweenMarks ) , _pings(0) { - _last = Listener::getElapsedTimeMillis(); - } - - /** - * call this for every iteration - * returns true if one of the triggers has gone off - */ - bool ping() { - if ( ( ++_pings % _h ) == 0 ) { - _last = Listener::getElapsedTimeMillis(); - return true; - } - - long long now = Listener::getElapsedTimeMillis(); - if ( now - _last > _ms ) { - _last = now; - return true; - } - - return false; - } - - private: - int _h; - int _ms; - - unsigned long long _pings; - - long long _last; - - }; - } // namespace mongo diff --git a/util/net/message_server_port.cpp b/util/net/message_server_port.cpp index a2538c90d6d..723d9ea83eb 100644 --- a/util/net/message_server_port.cpp +++ b/util/net/message_server_port.cpp @@ -20,6 +20,7 @@ #ifndef USE_ASIO #include "message.h" +#include "message_port.h" #include "message_server.h" #include "../../db/cmdline.h" diff --git a/util/net/miniwebserver.h b/util/net/miniwebserver.h index f7351d9f743..56f3dbac978 100644 --- a/util/net/miniwebserver.h +++ b/util/net/miniwebserver.h @@ -19,6 +19,7 @@ #include "../../pch.h" #include "message.h" +#include "message_port.h" #include "../../db/jsobj.h" namespace mongo { diff --git a/util/net/sock.h b/util/net/sock.h index 506d7fbb63c..34bcfb424ea 100644 --- a/util/net/sock.h +++ b/util/net/sock.h @@ -298,4 +298,20 @@ namespace mongo { static ListeningSockets* _instance; }; + class SocketException : public DBException { + public: + const enum Type { CLOSED , RECV_ERROR , SEND_ERROR, RECV_TIMEOUT, SEND_TIMEOUT, FAILED_STATE, CONNECT_ERROR } _type; + + SocketException( Type t , string server , int code = 9001 , string extra="" ) : DBException( "socket exception" , code ) , _type(t) , _server(server), _extra(extra){ } + virtual ~SocketException() throw() {} + + bool shouldPrint() const { return _type != CLOSED; } + virtual string toString() const; + + private: + string _server; + string _extra; + }; + + } // namespace mongo |