diff options
Diffstat (limited to 'src/mongo/util/net/message.h')
-rw-r--r-- | src/mongo/util/net/message.h | 702 |
1 files changed, 358 insertions, 344 deletions
diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h index b3573026f84..d83492e519c 100644 --- a/src/mongo/util/net/message.h +++ b/src/mongo/util/net/message.h @@ -44,55 +44,65 @@ namespace mongo { - /** - * Maximum accepted message size on the wire protocol. - */ - const size_t MaxMessageSizeBytes = 48 * 1000 * 1000; - - class Message; - class MessagingPort; - class PiggyBackData; - - typedef uint32_t MSGID; - - enum Operations { - opReply = 1, /* reply. responseTo is set. */ - dbMsg = 1000, /* generic msg command followed by a std::string */ - dbUpdate = 2001, /* update object */ - dbInsert = 2002, - //dbGetByOID = 2003, - dbQuery = 2004, - dbGetMore = 2005, - dbDelete = 2006, - dbKillCursors = 2007, - dbCommand = 2008, - dbCommandReply = 2009, - }; - - bool doesOpGetAResponse( int op ); - - inline const char * opToString( int op ) { - switch ( op ) { - case 0: return "none"; - case opReply: return "reply"; - case dbMsg: return "msg"; - case dbUpdate: return "update"; - case dbInsert: return "insert"; - case dbQuery: return "query"; - case dbGetMore: return "getmore"; - case dbDelete: return "remove"; - case dbKillCursors: return "killcursors"; - case dbCommand: return "command"; - case dbCommandReply: return "commandReply"; +/** + * Maximum accepted message size on the wire protocol. + */ +const size_t MaxMessageSizeBytes = 48 * 1000 * 1000; + +class Message; +class MessagingPort; +class PiggyBackData; + +typedef uint32_t MSGID; + +enum Operations { + opReply = 1, /* reply. responseTo is set. */ + dbMsg = 1000, /* generic msg command followed by a std::string */ + dbUpdate = 2001, /* update object */ + dbInsert = 2002, + // dbGetByOID = 2003, + dbQuery = 2004, + dbGetMore = 2005, + dbDelete = 2006, + dbKillCursors = 2007, + dbCommand = 2008, + dbCommandReply = 2009, +}; + +bool doesOpGetAResponse(int op); + +inline const char* opToString(int op) { + switch (op) { + case 0: + return "none"; + case opReply: + return "reply"; + case dbMsg: + return "msg"; + case dbUpdate: + return "update"; + case dbInsert: + return "insert"; + case dbQuery: + return "query"; + case dbGetMore: + return "getmore"; + case dbDelete: + return "remove"; + case dbKillCursors: + return "killcursors"; + case dbCommand: + return "command"; + case dbCommandReply: + return "commandReply"; default: - massert( 16141, str::stream() << "cannot translate opcode " << op, !op ); + massert(16141, str::stream() << "cannot translate opcode " << op, !op); return ""; - } } +} - inline bool opIsWrite( int op ) { - switch ( op ) { - +inline bool opIsWrite(int op) { + switch (op) { case 0: case opReply: case dbMsg: @@ -110,383 +120,387 @@ namespace mongo { PRINT(op); verify(0); return ""; - } - } +} - namespace MSGHEADER { +namespace MSGHEADER { #pragma pack(1) - /* see http://dochub.mongodb.org/core/mongowireprotocol - */ - struct Layout { - int32_t messageLength; // total message size, including this - int32_t requestID; // identifier for this message - int32_t responseTo; // requestID from the original request - // (used in responses from db) - int32_t opCode; - }; +/* see http://dochub.mongodb.org/core/mongowireprotocol +*/ +struct Layout { + int32_t messageLength; // total message size, including this + int32_t requestID; // identifier for this message + int32_t responseTo; // requestID from the original request + // (used in responses from db) + int32_t opCode; +}; #pragma pack() - class ConstView { - public: - typedef ConstDataView view_type; +class ConstView { +public: + typedef ConstDataView view_type; - ConstView(const char* data) : _data(data) { } + ConstView(const char* data) : _data(data) {} - const char* view2ptr() const { - return data().view(); - } + const char* view2ptr() const { + return data().view(); + } - int32_t getMessageLength() const { - return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength)); - } + int32_t getMessageLength() const { + return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength)); + } - int32_t getRequestID() const { - return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID)); - } + int32_t getRequestID() const { + return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID)); + } - int32_t getResponseTo() const { - return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo)); - } + int32_t getResponseTo() const { + return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo)); + } - int32_t getOpCode() const { - return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode)); - } + int32_t getOpCode() const { + return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode)); + } - protected: - const view_type& data() const { - return _data; - } +protected: + const view_type& data() const { + return _data; + } - private: - view_type _data; - }; +private: + view_type _data; +}; - class View : public ConstView { - public: - typedef DataView view_type; +class View : public ConstView { +public: + typedef DataView view_type; - View(char* data) : ConstView(data) {} + View(char* data) : ConstView(data) {} - using ConstView::view2ptr; - char* view2ptr() { - return data().view(); - } + using ConstView::view2ptr; + char* view2ptr() { + return data().view(); + } - void setMessageLength(int32_t value) { - data().write(tagLittleEndian(value), offsetof(Layout, messageLength)); - } + void setMessageLength(int32_t value) { + data().write(tagLittleEndian(value), offsetof(Layout, messageLength)); + } - void setRequestID(int32_t value) { - data().write(tagLittleEndian(value), offsetof(Layout, requestID)); - } + void setRequestID(int32_t value) { + data().write(tagLittleEndian(value), offsetof(Layout, requestID)); + } - void setResponseTo(int32_t value) { - data().write(tagLittleEndian(value), offsetof(Layout, responseTo)); - } + void setResponseTo(int32_t value) { + data().write(tagLittleEndian(value), offsetof(Layout, responseTo)); + } - void setOpCode(int32_t value) { - data().write(tagLittleEndian(value), offsetof(Layout, opCode)); - } + void setOpCode(int32_t value) { + data().write(tagLittleEndian(value), offsetof(Layout, opCode)); + } - private: - view_type data() const { - return const_cast<char *>(ConstView::view2ptr()); - } - }; +private: + view_type data() const { + return const_cast<char*>(ConstView::view2ptr()); + } +}; - class Value : public EncodedValueStorage<Layout, ConstView, View> { - public: - Value() { - BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout)); - } +class Value : public EncodedValueStorage<Layout, ConstView, View> { +public: + Value() { + BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout)); + } - Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {} - }; + Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {} +}; - } // namespace MSGHEADER +} // namespace MSGHEADER - namespace MsgData { +namespace MsgData { #pragma pack(1) - struct Layout { - MSGHEADER::Layout header; - char data[4]; - }; +struct Layout { + MSGHEADER::Layout header; + char data[4]; +}; #pragma pack() - class ConstView { - public: - ConstView(const char* storage) : _storage(storage) { } +class ConstView { +public: + ConstView(const char* storage) : _storage(storage) {} - const char* view2ptr() const { - return storage().view(); - } - - int32_t getLen() const { - return header().getMessageLength(); - } + const char* view2ptr() const { + return storage().view(); + } - MSGID getId() const { - return header().getRequestID(); - } + int32_t getLen() const { + return header().getMessageLength(); + } - MSGID getResponseTo() const { - return header().getResponseTo(); - } + MSGID getId() const { + return header().getRequestID(); + } - int32_t getOperation() const { - return header().getOpCode(); - } + MSGID getResponseTo() const { + return header().getResponseTo(); + } - const char* data() const { - return storage().view(offsetof(Layout, data)); - } + int32_t getOperation() const { + return header().getOpCode(); + } - bool valid() const { - if ( getLen() <= 0 || getLen() > ( 4 * BSONObjMaxInternalSize ) ) - return false; - if ( getOperation() < 0 || getOperation() > 30000 ) - return false; - return true; - } + const char* data() const { + return storage().view(offsetof(Layout, data)); + } - int64_t getCursor() const { - verify( getResponseTo() > 0 ); - verify( getOperation() == opReply ); - return ConstDataView(data() + sizeof(int32_t)).read<LittleEndian<int64_t>>(); - } + bool valid() const { + if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize)) + return false; + if (getOperation() < 0 || getOperation() > 30000) + return false; + return true; + } - int dataLen() const; // len without header + int64_t getCursor() const { + verify(getResponseTo() > 0); + verify(getOperation() == opReply); + return ConstDataView(data() + sizeof(int32_t)).read<LittleEndian<int64_t>>(); + } - protected: - const ConstDataView& storage() const { - return _storage; - } + int dataLen() const; // len without header - MSGHEADER::ConstView header() const { - return storage().view(offsetof(Layout, header)); - } +protected: + const ConstDataView& storage() const { + return _storage; + } - private: - ConstDataView _storage; - }; + MSGHEADER::ConstView header() const { + return storage().view(offsetof(Layout, header)); + } - class View : public ConstView { - public: - View(char* storage) : ConstView(storage) {} +private: + ConstDataView _storage; +}; - using ConstView::view2ptr; - char* view2ptr() { - return storage().view(); - } +class View : public ConstView { +public: + View(char* storage) : ConstView(storage) {} - void setLen(int value) { - return header().setMessageLength(value); - } + using ConstView::view2ptr; + char* view2ptr() { + return storage().view(); + } - void setId(MSGID value) { - return header().setRequestID(value); - } + void setLen(int value) { + return header().setMessageLength(value); + } - void setResponseTo(MSGID value) { - return header().setResponseTo(value); - } + void setId(MSGID value) { + return header().setRequestID(value); + } - void setOperation(int value) { - return header().setOpCode(value); - } + void setResponseTo(MSGID value) { + return header().setResponseTo(value); + } - using ConstView::data; - char* data() { - return storage().view(offsetof(Layout, data)); - } + void setOperation(int value) { + return header().setOpCode(value); + } - private: - DataView storage() const { - return const_cast<char *>(ConstView::view2ptr()); - } + using ConstView::data; + char* data() { + return storage().view(offsetof(Layout, data)); + } - MSGHEADER::View header() const { - return storage().view(offsetof(Layout, header)); - } - }; +private: + DataView storage() const { + return const_cast<char*>(ConstView::view2ptr()); + } - class Value : public EncodedValueStorage<Layout, ConstView, View> { - public: - Value() { - BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout)); - } + MSGHEADER::View header() const { + return storage().view(offsetof(Layout, header)); + } +}; - Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {} - }; +class Value : public EncodedValueStorage<Layout, ConstView, View> { +public: + Value() { + BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout)); + } - const int MsgDataHeaderSize = sizeof(Value) - 4; - inline int ConstView::dataLen() const { - return getLen() - MsgDataHeaderSize; - } - } // namespace MsgData - - class Message { - public: - // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit). - Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {} - Message( void * data , bool freeIt ) : - _buf( 0 ), _data( 0 ), _freeIt( false ) { - _setData( reinterpret_cast< char* >( data ), freeIt ); - }; - Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) { - *this = r; - } - ~Message() { - reset(); - } + Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {} +}; + +const int MsgDataHeaderSize = sizeof(Value) - 4; +inline int ConstView::dataLen() const { + return getLen() - MsgDataHeaderSize; +} +} // namespace MsgData + +class Message { +public: + // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit). + Message() : _buf(0), _data(0), _freeIt(false) {} + Message(void* data, bool freeIt) : _buf(0), _data(0), _freeIt(false) { + _setData(reinterpret_cast<char*>(data), freeIt); + }; + Message(Message& r) : _buf(0), _data(0), _freeIt(false) { + *this = r; + } + ~Message() { + reset(); + } - SockAddr _from; + SockAddr _from; - MsgData::View header() const { - verify( !empty() ); - return _buf ? _buf : _data[ 0 ].first; - } + MsgData::View header() const { + verify(!empty()); + return _buf ? _buf : _data[0].first; + } - int operation() const { return header().getOperation(); } + int operation() const { + return header().getOperation(); + } - MsgData::View singleData() const { - massert( 13273, "single data buffer expected", _buf ); - return header(); - } + MsgData::View singleData() const { + massert(13273, "single data buffer expected", _buf); + return header(); + } - bool empty() const { return !_buf && _data.empty(); } + bool empty() const { + return !_buf && _data.empty(); + } - int size() const { - int res = 0; - if ( _buf ) { - res = MsgData::ConstView(_buf).getLen(); - } - else { - for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) { - res += it->second; - } + int size() const { + int res = 0; + if (_buf) { + res = MsgData::ConstView(_buf).getLen(); + } else { + for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) { + res += it->second; } - return res; } + return res; + } - int dataSize() const { return size() - sizeof(MSGHEADER::Value); } - - // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy - // can get rid of this if we make response handling smarter - void concat() { - if ( _buf || empty() ) { - return; - } + int dataSize() const { + return size() - sizeof(MSGHEADER::Value); + } - verify( _freeIt ); - int totalSize = 0; - for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin(); - i != _data.end(); ++i) { - totalSize += i->second; - } - char *buf = (char*)mongoMalloc( totalSize ); - char *p = buf; - for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin(); - i != _data.end(); ++i) { - memcpy( p, i->first, i->second ); - p += i->second; - } - reset(); - _setData( buf, true ); + // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy + // can get rid of this if we make response handling smarter + void concat() { + if (_buf || empty()) { + return; } - // vector swap() so this is fast - Message& operator=(Message& r) { - verify( empty() ); - verify( r._freeIt ); - _buf = r._buf; - r._buf = 0; - if ( r._data.size() > 0 ) { - _data.swap( r._data ); - } - r._freeIt = false; - _freeIt = true; - return *this; + verify(_freeIt); + int totalSize = 0; + for (std::vector<std::pair<char*, int>>::const_iterator i = _data.begin(); i != _data.end(); + ++i) { + totalSize += i->second; + } + char* buf = (char*)mongoMalloc(totalSize); + char* p = buf; + for (std::vector<std::pair<char*, int>>::const_iterator i = _data.begin(); i != _data.end(); + ++i) { + memcpy(p, i->first, i->second); + p += i->second; } + reset(); + _setData(buf, true); + } - void reset() { - if ( _freeIt ) { - if ( _buf ) { - free( _buf ); - } - for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin(); - i != _data.end(); ++i) { - free(i->first); - } - } - _buf = 0; - _data.clear(); - _freeIt = false; + // vector swap() so this is fast + Message& operator=(Message& r) { + verify(empty()); + verify(r._freeIt); + _buf = r._buf; + r._buf = 0; + if (r._data.size() > 0) { + _data.swap(r._data); } + r._freeIt = false; + _freeIt = true; + return *this; + } - // use to add a buffer - // assumes message will free everything - void appendData(char *d, int size) { - if ( size <= 0 ) { - return; - } - if ( empty() ) { - MsgData::View md = d; - md.setLen(size); // can be updated later if more buffers added - _setData( md.view2ptr(), true ); - return; + void reset() { + if (_freeIt) { + if (_buf) { + free(_buf); } - verify( _freeIt ); - if ( _buf ) { - _data.push_back(std::make_pair(_buf, MsgData::ConstView(_buf).getLen())); - _buf = 0; + for (std::vector<std::pair<char*, int>>::const_iterator i = _data.begin(); + i != _data.end(); + ++i) { + free(i->first); } - _data.push_back(std::make_pair(d, size)); - header().setLen(header().getLen() + size); } + _buf = 0; + _data.clear(); + _freeIt = false; + } - // use to set first buffer if empty - void setData(char* d, bool freeIt) { - verify( empty() ); - _setData( d, freeIt ); + // use to add a buffer + // assumes message will free everything + void appendData(char* d, int size) { + if (size <= 0) { + return; } - void setData(int operation, const char *msgtxt) { - setData(operation, msgtxt, strlen(msgtxt)+1); + if (empty()) { + MsgData::View md = d; + md.setLen(size); // can be updated later if more buffers added + _setData(md.view2ptr(), true); + return; } - void setData(int operation, const char *msgdata, size_t len) { - verify( empty() ); - size_t dataLen = len + sizeof(MsgData::Value) - 4; - MsgData::View d = reinterpret_cast<char *>(mongoMalloc(dataLen)); - memcpy(d.data(), msgdata, len); - d.setLen(dataLen); - d.setOperation(operation); - _setData( d.view2ptr(), true ); + verify(_freeIt); + if (_buf) { + _data.push_back(std::make_pair(_buf, MsgData::ConstView(_buf).getLen())); + _buf = 0; } + _data.push_back(std::make_pair(d, size)); + header().setLen(header().getLen() + size); + } - bool doIFreeIt() { - return _freeIt; - } + // use to set first buffer if empty + void setData(char* d, bool freeIt) { + verify(empty()); + _setData(d, freeIt); + } + void setData(int operation, const char* msgtxt) { + setData(operation, msgtxt, strlen(msgtxt) + 1); + } + void setData(int operation, const char* msgdata, size_t len) { + verify(empty()); + size_t dataLen = len + sizeof(MsgData::Value) - 4; + MsgData::View d = reinterpret_cast<char*>(mongoMalloc(dataLen)); + memcpy(d.data(), msgdata, len); + d.setLen(dataLen); + d.setOperation(operation); + _setData(d.view2ptr(), true); + } - void send( MessagingPort &p, const char *context ); - - std::string toString() const; + bool doIFreeIt() { + return _freeIt; + } - private: - void _setData( char* d, bool freeIt ) { - _freeIt = freeIt; - _buf = d; - } - // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data - char* _buf; - // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead - typedef std::vector< std::pair< char*, int > > MsgVec; - MsgVec _data; - bool _freeIt; - }; + void send(MessagingPort& p, const char* context); + + std::string toString() const; + +private: + void _setData(char* d, bool freeIt) { + _freeIt = freeIt; + _buf = d; + } + // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data + char* _buf; + // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead + typedef std::vector<std::pair<char*, int>> MsgVec; + MsgVec _data; + bool _freeIt; +}; - MSGID nextMessageId(); +MSGID nextMessageId(); -} // namespace mongo +} // namespace mongo |