summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2011-06-26 17:56:43 -0400
committerEliot Horowitz <eliot@10gen.com>2011-06-26 17:56:43 -0400
commit0594b8a6ab5ba1561e1fe760e755f2a055beb699 (patch)
tree5b402f673234f0a2a955cbdb8b2f72cbfcfa9b72
parentddde89b8cfb567480c7db6b2832eb6280974ea2a (diff)
downloadmongo-0594b8a6ab5ba1561e1fe760e755f2a055beb699.tar.gz
net cleaning: starting to split up message.h
-rw-r--r--SConstruct2
-rw-r--r--client/dbclient.h1
-rw-r--r--db/dbmessage.h71
-rw-r--r--db/dur_journal.cpp2
-rw-r--r--db/queryoptimizer.h2
-rw-r--r--db/record.cpp2
-rw-r--r--s/d_writeback.cpp2
-rw-r--r--util/goodies.h2
-rw-r--r--util/net/httpclient.cpp1
-rw-r--r--util/net/message.cpp14
-rw-r--r--util/net/message.h207
-rw-r--r--util/net/message_server_port.cpp1
-rw-r--r--util/net/miniwebserver.h1
-rw-r--r--util/net/sock.h16
14 files changed, 58 insertions, 266 deletions
diff --git a/SConstruct b/SConstruct
index a430a09f0d6..5431db5197d 100644
--- a/SConstruct
+++ b/SConstruct
@@ -325,7 +325,7 @@ coreDbFiles = [ "db/commands.cpp" ]
coreServerFiles = [ "util/net/message_server_port.cpp" ,
"client/parallel.cpp" , "db/common.cpp",
"util/net/miniwebserver.cpp" , "db/dbwebserver.cpp" ,
- "db/matcher.cpp" , "db/dbcommands_generic.cpp" ]
+ "db/matcher.cpp" , "db/dbcommands_generic.cpp" , "db/dbmessage.cpp" ]
mmapFiles = [ "util/mmap.cpp" ]
diff --git a/client/dbclient.h b/client/dbclient.h
index c55a397dff3..d461bbfbec0 100644
--- a/client/dbclient.h
+++ b/client/dbclient.h
@@ -22,6 +22,7 @@
#include "../pch.h"
#include "../util/net/message.h"
+#include "../util/net/message_port.h"
#include "../db/jsobj.h"
#include "../db/json.h"
#include <stack>
diff --git a/db/dbmessage.h b/db/dbmessage.h
index 7a72e84f9af..a2dc113c2b9 100644
--- a/db/dbmessage.h
+++ b/db/dbmessage.h
@@ -1,3 +1,5 @@
+// dbmessage.h
+
/**
* Copyright (C) 2008 10gen Inc.
*
@@ -21,6 +23,7 @@
#include "namespace-inl.h"
#include "../util/net/message.h"
#include "../client/constants.h"
+#include "instance.h"
namespace mongo {
@@ -251,69 +254,21 @@ namespace mongo {
}
};
-} // namespace mongo
-
-#include "../client/dbclient.h"
-
-namespace mongo {
-
- inline void replyToQuery(int queryResultFlags,
- AbstractMessagingPort* p, Message& requestMsg,
- void *data, int size,
- int nReturned, int startingFrom = 0,
- long long cursorId = 0
- ) {
- BufBuilder b(32768);
- b.skip(sizeof(QueryResult));
- b.appendBuf(data, size);
- QueryResult *qr = (QueryResult *) b.buf();
- qr->_resultFlags() = queryResultFlags;
- qr->len = b.len();
- qr->setOperation(opReply);
- qr->cursorId = cursorId;
- qr->startingFrom = startingFrom;
- qr->nReturned = nReturned;
- b.decouple();
- Message resp(qr, true);
- p->reply(requestMsg, resp, requestMsg.header()->id);
- }
+ void replyToQuery(int queryResultFlags,
+ AbstractMessagingPort* p, Message& requestMsg,
+ void *data, int size,
+ int nReturned, int startingFrom = 0,
+ long long cursorId = 0
+ );
-} // namespace mongo
-
-//#include "bsonobj.h"
-
-#include "instance.h"
-
-namespace mongo {
/* object reply helper. */
- inline void replyToQuery(int queryResultFlags,
- AbstractMessagingPort* p, Message& requestMsg,
- BSONObj& responseObj) {
- replyToQuery(queryResultFlags,
- p, requestMsg,
- (void *) responseObj.objdata(), responseObj.objsize(), 1);
- }
+ void replyToQuery(int queryResultFlags,
+ AbstractMessagingPort* p, Message& requestMsg,
+ BSONObj& responseObj);
/* helper to do a reply using a DbResponse object */
- inline void replyToQuery(int queryResultFlags, Message &m, DbResponse &dbresponse, BSONObj obj) {
- BufBuilder b;
- b.skip(sizeof(QueryResult));
- b.appendBuf((void*) obj.objdata(), obj.objsize());
- QueryResult* msgdata = (QueryResult *) b.buf();
- b.decouple();
- QueryResult *qr = msgdata;
- qr->_resultFlags() = queryResultFlags;
- qr->len = b.len();
- qr->setOperation(opReply);
- qr->cursorId = 0;
- qr->startingFrom = 0;
- qr->nReturned = 1;
- Message *resp = new Message();
- resp->setData(msgdata, true); // transport will free
- dbresponse.response = resp;
- dbresponse.responseTo = m.header()->id;
- }
+ void replyToQuery(int queryResultFlags, Message &m, DbResponse &dbresponse, BSONObj obj);
string debugString( Message& m );
diff --git a/db/dur_journal.cpp b/db/dur_journal.cpp
index 1695a56932b..de959614485 100644
--- a/db/dur_journal.cpp
+++ b/db/dur_journal.cpp
@@ -25,7 +25,7 @@
#include "../util/logfile.h"
#include "../util/timer.h"
#include "../util/alignedbuilder.h"
-#include "../util/net/message.h" // getelapsedtimemillis
+#include "../util/net/message_port.h" // getelapsedtimemillis
#include "../util/concurrency/race.h"
#include <boost/static_assert.hpp>
#undef assert
diff --git a/db/queryoptimizer.h b/db/queryoptimizer.h
index 616c457a239..3b2bbdf53a0 100644
--- a/db/queryoptimizer.h
+++ b/db/queryoptimizer.h
@@ -23,12 +23,14 @@
#include "queryutil.h"
#include "matcher.h"
#include "../util/net/message.h"
+#include "../util/net/message_port.h"
#include <queue>
namespace mongo {
class IndexDetails;
class IndexType;
+ class ElapsedTracker;
/** A plan for executing a query using the given index spec and FieldRangeSet. */
class QueryPlan : boost::noncopyable {
diff --git a/db/record.cpp b/db/record.cpp
index d8cf7c7ef6e..e540fccdc34 100644
--- a/db/record.cpp
+++ b/db/record.cpp
@@ -3,7 +3,7 @@
#include "pch.h"
#include "pdfile.h"
#include "../util/processinfo.h"
-#include "../util/net/message.h"
+#include "../util/net/message_port.h"
namespace mongo {
diff --git a/s/d_writeback.cpp b/s/d_writeback.cpp
index fe1b9fb29df..ff0891330c8 100644
--- a/s/d_writeback.cpp
+++ b/s/d_writeback.cpp
@@ -20,7 +20,7 @@
#include "../db/commands.h"
#include "../util/queue.h"
-#include "../util/net/message.h"
+#include "../util/net/message_port.h"
#include "d_writeback.h"
diff --git a/util/goodies.h b/util/goodies.h
index d996f061b79..852fef32809 100644
--- a/util/goodies.h
+++ b/util/goodies.h
@@ -493,6 +493,8 @@ namespace mongo {
T* _p;
};
+
+
/** Hmmmm */
using namespace boost;
diff --git a/util/net/httpclient.cpp b/util/net/httpclient.cpp
index c8e14eb033f..01927022c31 100644
--- a/util/net/httpclient.cpp
+++ b/util/net/httpclient.cpp
@@ -19,6 +19,7 @@
#include "httpclient.h"
#include "sock.h"
#include "message.h"
+#include "message_port.h"
#include "../..//bson/util/builder.h"
namespace mongo {
diff --git a/util/net/message.cpp b/util/net/message.cpp
index 2bac961b3a5..851107ce4f5 100644
--- a/util/net/message.cpp
+++ b/util/net/message.cpp
@@ -25,6 +25,7 @@
#include <time.h>
#include "message.h"
+#include "message_port.h"
#include "../goodies.h"
#include "../background.h"
#include "../time_support.h"
@@ -48,6 +49,19 @@
namespace mongo {
+
+ void Message::send( MessagingPort &p, const char *context ) {
+ if ( empty() ) {
+ return;
+ }
+ if ( _buf != 0 ) {
+ p.send( (char*)_buf, _buf->len, context );
+ }
+ else {
+ p.send( _data, context );
+ }
+ }
+
bool objcheck = false;
void checkTicketNumbers();
diff --git a/util/net/message.h b/util/net/message.h
index 8ecf1d447e1..d0186b5f36c 100644
--- a/util/net/message.h
+++ b/util/net/message.h
@@ -1,4 +1,4 @@
-// Message.h
+// message.h
/* Copyright 2009 10gen Inc.
*
@@ -29,148 +29,6 @@ namespace mongo {
typedef AtomicUInt MSGID;
- class Listener : boost::noncopyable {
- public:
- Listener(const string &ip, int p, bool logConnect=true ) : _port(p), _ip(ip), _logConnect(logConnect), _elapsedTime(0) { }
- virtual ~Listener() {
- if ( _timeTracker == this )
- _timeTracker = 0;
- }
- void initAndListen(); // never returns unless error (start a thread)
-
- /* spawn a thread, etc., then return */
- virtual void accepted(int sock, const SockAddr& from);
- virtual void accepted(MessagingPort *mp) {
- assert(!"You must overwrite one of the accepted methods");
- }
-
- const int _port;
-
- /**
- * @return a rough estimate of elapsed time since the server started
- */
- long long getMyElapsedTimeMillis() const { return _elapsedTime; }
-
- void setAsTimeTracker() {
- _timeTracker = this;
- }
-
- static const Listener* getTimeTracker() {
- return _timeTracker;
- }
-
- static long long getElapsedTimeMillis() {
- if ( _timeTracker )
- return _timeTracker->getMyElapsedTimeMillis();
-
- // should this assert or throw? seems like callers may not expect to get zero back, certainly not forever.
- return 0;
- }
-
- private:
- string _ip;
- bool _logConnect;
- long long _elapsedTime;
-
- static const Listener* _timeTracker;
-
- virtual bool useUnixSockets() const { return false; }
- };
-
- class AbstractMessagingPort : boost::noncopyable {
- public:
- AbstractMessagingPort() : tag(0) {}
- virtual ~AbstractMessagingPort() { }
- virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available
- virtual void reply(Message& received, Message& response) = 0;
-
- virtual HostAndPort remote() const = 0;
- virtual unsigned remotePort() const = 0;
-
- private:
-
- public:
- // TODO make this private with some helpers
-
- /* ports can be tagged with various classes. see closeAllSockets(tag). defaults to 0. */
- unsigned tag;
-
- };
-
- class MessagingPort : public AbstractMessagingPort {
- public:
- MessagingPort(int sock, const SockAddr& farEnd);
-
- // in some cases the timeout will actually be 2x this value - eg we do a partial send,
- // then the timeout fires, then we try to send again, then the timeout fires again with
- // no data sent, then we detect that the other side is down
- MessagingPort(double so_timeout = 0, int logLevel = 0 );
-
- virtual ~MessagingPort();
-
- void shutdown();
-
- bool connect(SockAddr& farEnd);
-
- /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
- also, the Message data will go out of scope on the subsequent recv call.
- */
- bool recv(Message& m);
- void reply(Message& received, Message& response, MSGID responseTo);
- void reply(Message& received, Message& response);
- bool call(Message& toSend, Message& response);
-
- void say(Message& toSend, int responseTo = -1);
-
- /**
- * this is used for doing 'async' queries
- * instead of doing call( to , from )
- * you would do
- * say( to )
- * recv( from )
- * Note: if you fail to call recv and someone else uses this port,
- * horrible things will happend
- */
- bool recv( const Message& sent , Message& response );
-
- void piggyBack( Message& toSend , int responseTo = -1 );
-
- virtual unsigned remotePort() const;
- virtual HostAndPort remote() const;
-
- // send len or throw SocketException
- void send( const char * data , int len, const char *context );
- void send( const vector< pair< char *, int > > &data, const char *context );
-
- // recv len or throw SocketException
- void recv( char * data , int len );
-
- int unsafe_recv( char *buf, int max );
-
- void clearCounters() { _bytesIn = 0; _bytesOut = 0; }
- long long getBytesIn() const { return _bytesIn; }
- long long getBytesOut() const { return _bytesOut; }
- private:
- int sock;
- PiggyBackData * piggyBackData;
-
- long long _bytesIn;
- long long _bytesOut;
-
- // this is the parsed version of farEnd
- // mutable because its initialized only on call to remote()
- mutable HostAndPort _farEndParsed;
-
- public:
- SockAddr farEnd;
- double _timeout;
- int _logLevel; // passed to log() when logging errors
-
- static void closeAllSockets(unsigned tagMask = 0xffffffff);
-
- friend class PiggyBackData;
- };
-
enum Operations {
opReply = 1, /* reply. responseTo is set. */
dbMsg = 1000, /* generic msg command followed by a string */
@@ -430,17 +288,8 @@ namespace mongo {
return _freeIt;
}
- void send( MessagingPort &p, const char *context ) {
- if ( empty() ) {
- return;
- }
- if ( _buf != 0 ) {
- p.send( (char*)_buf, _buf->len, context );
- }
- else {
- p.send( _data, context );
- }
- }
+ void send( MessagingPort &p, const char *context );
+
private:
void _setData( MsgData *d, bool freeIt ) {
@@ -455,59 +304,9 @@ namespace mongo {
bool _freeIt;
};
- class SocketException : public DBException {
- public:
- const enum Type { CLOSED , RECV_ERROR , SEND_ERROR, RECV_TIMEOUT, SEND_TIMEOUT, FAILED_STATE, CONNECT_ERROR } _type;
-
- SocketException( Type t , string server , int code = 9001 , string extra="" ) : DBException( "socket exception" , code ) , _type(t) , _server(server), _extra(extra){ }
- virtual ~SocketException() throw() {}
-
- bool shouldPrint() const { return _type != CLOSED; }
- virtual string toString() const;
-
- private:
- string _server;
- string _extra;
- };
MSGID nextMessageId();
extern TicketHolder connTicketHolder;
- class ElapsedTracker {
- public:
- ElapsedTracker( int hitsBetweenMarks , int msBetweenMarks )
- : _h( hitsBetweenMarks ) , _ms( msBetweenMarks ) , _pings(0) {
- _last = Listener::getElapsedTimeMillis();
- }
-
- /**
- * call this for every iteration
- * returns true if one of the triggers has gone off
- */
- bool ping() {
- if ( ( ++_pings % _h ) == 0 ) {
- _last = Listener::getElapsedTimeMillis();
- return true;
- }
-
- long long now = Listener::getElapsedTimeMillis();
- if ( now - _last > _ms ) {
- _last = now;
- return true;
- }
-
- return false;
- }
-
- private:
- int _h;
- int _ms;
-
- unsigned long long _pings;
-
- long long _last;
-
- };
-
} // namespace mongo
diff --git a/util/net/message_server_port.cpp b/util/net/message_server_port.cpp
index a2538c90d6d..723d9ea83eb 100644
--- a/util/net/message_server_port.cpp
+++ b/util/net/message_server_port.cpp
@@ -20,6 +20,7 @@
#ifndef USE_ASIO
#include "message.h"
+#include "message_port.h"
#include "message_server.h"
#include "../../db/cmdline.h"
diff --git a/util/net/miniwebserver.h b/util/net/miniwebserver.h
index f7351d9f743..56f3dbac978 100644
--- a/util/net/miniwebserver.h
+++ b/util/net/miniwebserver.h
@@ -19,6 +19,7 @@
#include "../../pch.h"
#include "message.h"
+#include "message_port.h"
#include "../../db/jsobj.h"
namespace mongo {
diff --git a/util/net/sock.h b/util/net/sock.h
index 506d7fbb63c..34bcfb424ea 100644
--- a/util/net/sock.h
+++ b/util/net/sock.h
@@ -298,4 +298,20 @@ namespace mongo {
static ListeningSockets* _instance;
};
+ class SocketException : public DBException {
+ public:
+ const enum Type { CLOSED , RECV_ERROR , SEND_ERROR, RECV_TIMEOUT, SEND_TIMEOUT, FAILED_STATE, CONNECT_ERROR } _type;
+
+ SocketException( Type t , string server , int code = 9001 , string extra="" ) : DBException( "socket exception" , code ) , _type(t) , _server(server), _extra(extra){ }
+ virtual ~SocketException() throw() {}
+
+ bool shouldPrint() const { return _type != CLOSED; }
+ virtual string toString() const;
+
+ private:
+ string _server;
+ string _extra;
+ };
+
+
} // namespace mongo