summaryrefslogtreecommitdiff
path: root/grid/message.h
diff options
context:
space:
mode:
Diffstat (limited to 'grid/message.h')
-rw-r--r--grid/message.h195
1 files changed, 106 insertions, 89 deletions
diff --git a/grid/message.h b/grid/message.h
index 9a644cb0bf4..d158989077b 100644
--- a/grid/message.h
+++ b/grid/message.h
@@ -2,16 +2,16 @@
/**
* Copyright (C) 2008 10gen Inc.
-*
+*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
-*
+*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
-*
+*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
@@ -21,129 +21,146 @@
#include "../util/sock.h"
class Message;
-class MessagingPort;
+class MessagingPort;
typedef WrappingInt MSGID;
const int DBPort = 27017;
-class Listener {
+class Listener {
public:
- Listener(int p) : port(p) { }
- void listen(); // never returns (start a thread)
+ Listener(int p) : port(p) { }
+ void listen(); // never returns (start a thread)
- /* spawn a thread, etc., then return */
- virtual void accepted(MessagingPort *mp) = 0;
+ /* spawn a thread, etc., then return */
+ virtual void accepted(MessagingPort *mp) = 0;
private:
- int port;
+ int port;
};
-class AbstractMessagingPort {
+class AbstractMessagingPort {
public:
- virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available
- virtual void reply(Message& received, Message& response) = 0;
+ virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available
+ virtual void reply(Message& received, Message& response) = 0;
};
class MessagingPort : public AbstractMessagingPort {
public:
- MessagingPort(int sock, SockAddr& farEnd);
- MessagingPort();
- ~MessagingPort();
+ MessagingPort(int sock, SockAddr& farEnd);
+ MessagingPort();
+ ~MessagingPort();
- void shutdown();
+ void shutdown();
- bool connect(SockAddr& farEnd);
+ bool connect(SockAddr& farEnd);
- /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
- also, the Message data will go out of scope on the subsequent recv call.
- */
- bool recv(Message& m);
- void reply(Message& received, Message& response, MSGID responseTo);
- void reply(Message& received, Message& response);
- bool call(Message& toSend, Message& response);
- void say(Message& toSend, int responseTo = -1);
+ /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
+ also, the Message data will go out of scope on the subsequent recv call.
+ */
+ bool recv(Message& m);
+ void reply(Message& received, Message& response, MSGID responseTo);
+ void reply(Message& received, Message& response);
+ bool call(Message& toSend, Message& response);
+ void say(Message& toSend, int responseTo = -1);
private:
- int sock;
+ int sock;
public:
- SockAddr farEnd;
+ SockAddr farEnd;
};
#pragma pack(push)
#pragma pack(1)
-enum Operations {
- opReply = 1, /* reply. responseTo is set. */
- dbMsg = 1000, /* generic msg command followed by a string */
- dbUpdate = 2001, /* update object */
- dbInsert = 2002,
- //dbGetByOID = 2003,
- dbQuery = 2004,
- dbGetMore = 2005,
- dbDelete = 2006,
- dbKillCursors = 2007
+enum Operations {
+ opReply = 1, /* reply. responseTo is set. */
+ dbMsg = 1000, /* generic msg command followed by a string */
+ dbUpdate = 2001, /* update object */
+ dbInsert = 2002,
+ //dbGetByOID = 2003,
+ dbQuery = 2004,
+ dbGetMore = 2005,
+ dbDelete = 2006,
+ dbKillCursors = 2007
};
struct MsgData {
- int len; /* len of the msg, including this field */
- MSGID id; /* request/reply id's match... */
- MSGID responseTo; /* id of the message we are responding to */
- int _operation;
- int operation() const { return _operation; }
- void setOperation(int o) { _operation = o; }
+ int len; /* len of the msg, including this field */
+ MSGID id; /* request/reply id's match... */
+ MSGID responseTo; /* id of the message we are responding to */
+ int _operation;
+ int operation() const {
+ return _operation;
+ }
+ void setOperation(int o) {
+ _operation = o;
+ }
char _data[4];
- int& dataAsInt() { return *((int *) _data); }
+ int& dataAsInt() {
+ return *((int *) _data);
+ }
- int dataLen(); // len without header
+ int dataLen(); // len without header
};
const int MsgDataHeaderSize = sizeof(MsgData) - 4;
-inline int MsgData::dataLen() { return len - MsgDataHeaderSize; }
+inline int MsgData::dataLen() {
+ return len - MsgDataHeaderSize;
+}
#pragma pack(pop)
class Message {
public:
- Message() { data = 0; freeIt = false; }
- Message( void * _data , bool _freeIt ){ data = (MsgData*)_data; freeIt = _freeIt; };
- ~Message() { reset(); }
-
- SockAddr from;
- MsgData *data;
-
- Message& operator=(Message& r) {
- assert( data == 0 );
- data = r.data;
- assert( r.freeIt );
- r.freeIt = false;
- r.data = 0;
- freeIt = true;
- return *this;
- }
-
- void reset() {
- if( freeIt && data )
- free(data);
- data = 0; freeIt = false;
- }
-
- void setData(MsgData *d, bool _freeIt) {
- assert( data == 0 );
- freeIt = _freeIt;
- data = d;
- }
- void setData(int operation, const char *msgtxt) {
- setData(operation, msgtxt, strlen(msgtxt)+1);
- }
- void setData(int operation, const char *msgdata, int len) {
- assert(data == 0);
- int dataLen = len + sizeof(MsgData) - 4;
- MsgData *d = (MsgData *) malloc(dataLen);
- memcpy(d->_data, msgdata, len);
- d->len = fixEndian(dataLen);
- d->setOperation(operation);
- freeIt= true;
- data = d;
- }
+ Message() {
+ data = 0;
+ freeIt = false;
+ }
+ Message( void * _data , bool _freeIt ) {
+ data = (MsgData*)_data;
+ freeIt = _freeIt;
+ };
+ ~Message() {
+ reset();
+ }
+
+ SockAddr from;
+ MsgData *data;
+
+ Message& operator=(Message& r) {
+ assert( data == 0 );
+ data = r.data;
+ assert( r.freeIt );
+ r.freeIt = false;
+ r.data = 0;
+ freeIt = true;
+ return *this;
+ }
+
+ void reset() {
+ if ( freeIt && data )
+ free(data);
+ data = 0;
+ freeIt = false;
+ }
+
+ void setData(MsgData *d, bool _freeIt) {
+ assert( data == 0 );
+ freeIt = _freeIt;
+ data = d;
+ }
+ void setData(int operation, const char *msgtxt) {
+ setData(operation, msgtxt, strlen(msgtxt)+1);
+ }
+ void setData(int operation, const char *msgdata, int len) {
+ assert(data == 0);
+ int dataLen = len + sizeof(MsgData) - 4;
+ MsgData *d = (MsgData *) malloc(dataLen);
+ memcpy(d->_data, msgdata, len);
+ d->len = fixEndian(dataLen);
+ d->setOperation(operation);
+ freeIt= true;
+ data = d;
+ }
private:
- bool freeIt;
+ bool freeIt;
};