// message.h
/* Copyright 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#pragma once
#include
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/cstdint.h"
#include "mongo/base/data_type_endian.h"
#include "mongo/base/data_view.h"
#include "mongo/base/encoded_value_storage.h"
#include "mongo/util/allocator.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/net/sock.h"
#include "mongo/util/print.h"
namespace mongo {
/**
* Maximum accepted message size on the wire protocol.
*/
const size_t MaxMessageSizeBytes = 48 * 1000 * 1000;
class Message;
class MessagingPort;
class PiggyBackData;
typedef uint32_t MSGID;
enum Operations {
opReply = 1, /* reply. responseTo is set. */
dbMsg = 1000, /* generic msg command followed by a std::string */
dbUpdate = 2001, /* update object */
dbInsert = 2002,
//dbGetByOID = 2003,
dbQuery = 2004,
dbGetMore = 2005,
dbDelete = 2006,
dbKillCursors = 2007,
dbCommand = 2008,
dbCommandReply = 2009,
};
bool doesOpGetAResponse( int op );
inline const char * opToString( int op ) {
switch ( op ) {
case 0: return "none";
case opReply: return "reply";
case dbMsg: return "msg";
case dbUpdate: return "update";
case dbInsert: return "insert";
case dbQuery: return "query";
case dbGetMore: return "getmore";
case dbDelete: return "remove";
case dbKillCursors: return "killcursors";
case dbCommand: return "command";
case dbCommandReply: return "commandReply";
default:
massert( 16141, str::stream() << "cannot translate opcode " << op, !op );
return "";
}
}
inline bool opIsWrite( int op ) {
switch ( op ) {
case 0:
case opReply:
case dbMsg:
case dbQuery:
case dbGetMore:
case dbKillCursors:
return false;
case dbUpdate:
case dbInsert:
case dbDelete:
return true;
default:
PRINT(op);
verify(0);
return "";
}
}
namespace MSGHEADER {
#pragma pack(1)
/* see http://dochub.mongodb.org/core/mongowireprotocol
*/
struct Layout {
int32_t messageLength; // total message size, including this
int32_t requestID; // identifier for this message
int32_t responseTo; // requestID from the original request
// (used in responses from db)
int32_t opCode;
};
#pragma pack()
class ConstView {
public:
typedef ConstDataView view_type;
ConstView(const char* data) : _data(data) { }
const char* view2ptr() const {
return data().view();
}
int32_t getMessageLength() const {
return data().read>(offsetof(Layout, messageLength));
}
int32_t getRequestID() const {
return data().read>(offsetof(Layout, requestID));
}
int32_t getResponseTo() const {
return data().read>(offsetof(Layout, responseTo));
}
int32_t getOpCode() const {
return data().read>(offsetof(Layout, opCode));
}
protected:
const view_type& data() const {
return _data;
}
private:
view_type _data;
};
class View : public ConstView {
public:
typedef DataView view_type;
View(char* data) : ConstView(data) {}
using ConstView::view2ptr;
char* view2ptr() {
return data().view();
}
void setMessageLength(int32_t value) {
data().write(tagLittleEndian(value), offsetof(Layout, messageLength));
}
void setRequestID(int32_t value) {
data().write(tagLittleEndian(value), offsetof(Layout, requestID));
}
void setResponseTo(int32_t value) {
data().write(tagLittleEndian(value), offsetof(Layout, responseTo));
}
void setOpCode(int32_t value) {
data().write(tagLittleEndian(value), offsetof(Layout, opCode));
}
private:
view_type data() const {
return const_cast(ConstView::view2ptr());
}
};
class Value : public EncodedValueStorage {
public:
Value() {
BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
}
Value(ZeroInitTag_t zit) : EncodedValueStorage(zit) {}
};
} // namespace MSGHEADER
namespace MsgData {
#pragma pack(1)
struct Layout {
MSGHEADER::Layout header;
char data[4];
};
#pragma pack()
class ConstView {
public:
ConstView(const char* storage) : _storage(storage) { }
const char* view2ptr() const {
return storage().view();
}
int32_t getLen() const {
return header().getMessageLength();
}
MSGID getId() const {
return header().getRequestID();
}
MSGID getResponseTo() const {
return header().getResponseTo();
}
int32_t getOperation() const {
return header().getOpCode();
}
const char* data() const {
return storage().view(offsetof(Layout, data));
}
bool valid() const {
if ( getLen() <= 0 || getLen() > ( 4 * BSONObjMaxInternalSize ) )
return false;
if ( getOperation() < 0 || getOperation() > 30000 )
return false;
return true;
}
int64_t getCursor() const {
verify( getResponseTo() > 0 );
verify( getOperation() == opReply );
return ConstDataView(data() + sizeof(int32_t)).read>();
}
int dataLen() const; // len without header
protected:
const ConstDataView& storage() const {
return _storage;
}
MSGHEADER::ConstView header() const {
return storage().view(offsetof(Layout, header));
}
private:
ConstDataView _storage;
};
class View : public ConstView {
public:
View(char* storage) : ConstView(storage) {}
using ConstView::view2ptr;
char* view2ptr() {
return storage().view();
}
void setLen(int value) {
return header().setMessageLength(value);
}
void setId(MSGID value) {
return header().setRequestID(value);
}
void setResponseTo(MSGID value) {
return header().setResponseTo(value);
}
void setOperation(int value) {
return header().setOpCode(value);
}
using ConstView::data;
char* data() {
return storage().view(offsetof(Layout, data));
}
private:
DataView storage() const {
return const_cast(ConstView::view2ptr());
}
MSGHEADER::View header() const {
return storage().view(offsetof(Layout, header));
}
};
class Value : public EncodedValueStorage {
public:
Value() {
BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
}
Value(ZeroInitTag_t zit) : EncodedValueStorage(zit) {}
};
const int MsgDataHeaderSize = sizeof(Value) - 4;
inline int ConstView::dataLen() const {
return getLen() - MsgDataHeaderSize;
}
} // namespace MsgData
class Message {
public:
// we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit).
Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {}
Message( void * data , bool freeIt ) :
_buf( 0 ), _data( 0 ), _freeIt( false ) {
_setData( reinterpret_cast< char* >( data ), freeIt );
};
Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) {
*this = r;
}
~Message() {
reset();
}
SockAddr _from;
MsgData::View header() const {
verify( !empty() );
return _buf ? _buf : _data[ 0 ].first;
}
int operation() const { return header().getOperation(); }
MsgData::View singleData() const {
massert( 13273, "single data buffer expected", _buf );
return header();
}
bool empty() const { return !_buf && _data.empty(); }
int size() const {
int res = 0;
if ( _buf ) {
res = MsgData::ConstView(_buf).getLen();
}
else {
for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) {
res += it->second;
}
}
return res;
}
int dataSize() const { return size() - sizeof(MSGHEADER::Value); }
// concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy
// can get rid of this if we make response handling smarter
void concat() {
if ( _buf || empty() ) {
return;
}
verify( _freeIt );
int totalSize = 0;
for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
i != _data.end(); ++i) {
totalSize += i->second;
}
char *buf = (char*)mongoMalloc( totalSize );
char *p = buf;
for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
i != _data.end(); ++i) {
memcpy( p, i->first, i->second );
p += i->second;
}
reset();
_setData( buf, true );
}
// vector swap() so this is fast
Message& operator=(Message& r) {
verify( empty() );
verify( r._freeIt );
_buf = r._buf;
r._buf = 0;
if ( r._data.size() > 0 ) {
_data.swap( r._data );
}
r._freeIt = false;
_freeIt = true;
return *this;
}
void reset() {
if ( _freeIt ) {
if ( _buf ) {
free( _buf );
}
for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
i != _data.end(); ++i) {
free(i->first);
}
}
_buf = 0;
_data.clear();
_freeIt = false;
}
// use to add a buffer
// assumes message will free everything
void appendData(char *d, int size) {
if ( size <= 0 ) {
return;
}
if ( empty() ) {
MsgData::View md = d;
md.setLen(size); // can be updated later if more buffers added
_setData( md.view2ptr(), true );
return;
}
verify( _freeIt );
if ( _buf ) {
_data.push_back(std::make_pair(_buf, MsgData::ConstView(_buf).getLen()));
_buf = 0;
}
_data.push_back(std::make_pair(d, size));
header().setLen(header().getLen() + size);
}
// use to set first buffer if empty
void setData(char* d, bool freeIt) {
verify( empty() );
_setData( d, freeIt );
}
void setData(int operation, const char *msgtxt) {
setData(operation, msgtxt, strlen(msgtxt)+1);
}
void setData(int operation, const char *msgdata, size_t len) {
verify( empty() );
size_t dataLen = len + sizeof(MsgData::Value) - 4;
MsgData::View d = reinterpret_cast(mongoMalloc(dataLen));
memcpy(d.data(), msgdata, len);
d.setLen(dataLen);
d.setOperation(operation);
_setData( d.view2ptr(), true );
}
bool doIFreeIt() {
return _freeIt;
}
void send( MessagingPort &p, const char *context );
std::string toString() const;
private:
void _setData( char* d, bool freeIt ) {
_freeIt = freeIt;
_buf = d;
}
// if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data
char* _buf;
// byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead
typedef std::vector< std::pair< char*, int > > MsgVec;
MsgVec _data;
bool _freeIt;
};
MSGID nextMessageId();
} // namespace mongo