diff options
Diffstat (limited to 'src/mongo')
24 files changed, 564 insertions, 330 deletions
diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp index 53e5baaa3ee..0f0e78a8c90 100644 --- a/src/mongo/client/dbclient_rs.cpp +++ b/src/mongo/client/dbclient_rs.cpp @@ -987,9 +987,9 @@ namespace { return false; if ( ns ) { - QueryResult * res = (QueryResult*)response.singleData(); - if ( res->nReturned == 1 ) { - BSONObj x(res->data() ); + QueryResult::View res = response.singleData().view2ptr(); + if ( res.getNReturned() == 1 ) { + BSONObj x(res.data() ); if ( str::contains( ns , "$cmd" ) ) { if ( isNotMasterErrorString( x["errmsg"] ) ) isntMaster(); diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp index c929a01a8e7..120c951e441 100644 --- a/src/mongo/client/dbclientcursor.cpp +++ b/src/mongo/client/dbclientcursor.cpp @@ -193,16 +193,16 @@ namespace mongo { void DBClientCursor::dataReceived( bool& retry, string& host ) { - QueryResult *qr = (QueryResult *) batch.m->singleData(); - resultFlags = qr->resultFlags(); + QueryResult::View qr = batch.m->singleData().view2ptr(); + resultFlags = qr.getResultFlags(); - if ( qr->resultFlags() & ResultFlag_ErrSet ) { + if ( qr.getResultFlags() & ResultFlag_ErrSet ) { wasError = true; } - if ( qr->resultFlags() & ResultFlag_CursorNotFound ) { + if ( qr.getResultFlags() & ResultFlag_CursorNotFound ) { // cursor id no longer valid at the server. - verify( qr->cursorId == 0 ); + verify( qr.getCursorId() == 0 ); cursorId = 0; // 0 indicates no longer valid (dead) if ( ! ( opts & QueryOption_CursorTailable ) ) throw UserException( 13127 , "getMore: cursor didn't exist on server, possible restart or timeout?" ); @@ -211,16 +211,16 @@ namespace mongo { if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) { // only set initially: we don't want to kill it on end of data // if it's a tailable cursor - cursorId = qr->cursorId; + cursorId = qr.getCursorId(); } - batch.nReturned = qr->nReturned; + batch.nReturned = qr.getNReturned(); batch.pos = 0; - batch.data = qr->data(); + batch.data = qr.data(); _client->checkResponse( batch.data, batch.nReturned, &retry, &host ); // watches for "not master" - if( qr->resultFlags() & ResultFlag_ShardConfigStale ) { + if( qr.getResultFlags() & ResultFlag_ShardConfigStale ) { BSONObj error; verify( peekError( &error ) ); throw RecvStaleConfigException( (string)"stale config on lazy receive" + causedBy( getErrField( error ) ), error ); diff --git a/src/mongo/db/catalog/collection_cursor_cache.cpp b/src/mongo/db/catalog/collection_cursor_cache.cpp index 2822117c9c7..ab9646eeb82 100644 --- a/src/mongo/db/catalog/collection_cursor_cache.cpp +++ b/src/mongo/db/catalog/collection_cursor_cache.cpp @@ -30,6 +30,7 @@ #include "mongo/db/catalog/collection_cursor_cache.h" +#include "mongo/base/data_cursor.h" #include "mongo/db/audit.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" @@ -242,10 +243,11 @@ namespace mongo { } int CollectionCursorCache::eraseCursorGlobalIfAuthorized(OperationContext* txn, int n, - const long long* ids) { + const char* _ids) { + ConstDataCursor ids(_ids); int numDeleted = 0; for ( int i = 0; i < n; i++ ) { - if ( eraseCursorGlobalIfAuthorized(txn, ids[i] ) ) + if ( eraseCursorGlobalIfAuthorized(txn, ids.readLEAndAdvance<int64_t>())) numDeleted++; if ( inShutdown() ) break; diff --git a/src/mongo/db/catalog/collection_cursor_cache.h b/src/mongo/db/catalog/collection_cursor_cache.h index 2015b7049d3..77c0df16557 100644 --- a/src/mongo/db/catalog/collection_cursor_cache.h +++ b/src/mongo/db/catalog/collection_cursor_cache.h @@ -111,7 +111,7 @@ namespace mongo { // ---------------------- static int eraseCursorGlobalIfAuthorized(OperationContext* txn, int n, - const long long* ids); + const char* ids); static bool eraseCursorGlobalIfAuthorized(OperationContext* txn, CursorId id); static bool eraseCursorGlobal(OperationContext* txn, CursorId id); diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index dd2b2b2ae82..531e8df648d 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -133,7 +133,7 @@ namespace mongo { } } mystartupdbcpp; - QueryResult* emptyMoreResult(long long); + QueryResult::View emptyMoreResult(long long); /* todo: make this a real test. the stuff in dbtests/ seem to do all dbdirectclient which exhaust doesn't support yet. */ @@ -198,17 +198,17 @@ namespace mongo { if ( dbresponse.response ) { port->reply(m, *dbresponse.response, dbresponse.responseTo); if( dbresponse.exhaustNS.size() > 0 ) { - MsgData *header = dbresponse.response->header(); - QueryResult *qr = (QueryResult *) header; - long long cursorid = qr->cursorId; + MsgData::View header = dbresponse.response->header(); + QueryResult::View qr = header.view2ptr(); + long long cursorid = qr.getCursorId(); if( cursorid ) { verify( dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0] ); string ns = dbresponse.exhaustNS; // before reset() free's it... m.reset(); BufBuilder b(512); b.appendNum((int) 0 /*size set later in appendData()*/); - b.appendNum(header->id); - b.appendNum(header->responseTo); + b.appendNum(header.getId()); + b.appendNum(header.getResponseTo()); b.appendNum((int) dbGetMore); b.appendNum((int) 0); b.appendStr(ns); diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp index a3b06058547..1b0f3ae3a82 100644 --- a/src/mongo/db/dbmessage.cpp +++ b/src/mongo/db/dbmessage.cpp @@ -67,8 +67,8 @@ namespace mongo { DbMessage::DbMessage(const Message& msg) : _msg(msg), _nsStart(NULL), _mark(NULL), _nsLen(0) { // for received messages, Message has only one buffer - _theEnd = _msg.singleData()->_data + _msg.singleData()->dataLen(); - _nextjsobj = _msg.singleData()->_data; + _theEnd = _msg.singleData().data() + _msg.singleData().dataLen(); + _nextjsobj = _msg.singleData().data(); _reserved = readAndAdvance<int>(); @@ -77,7 +77,7 @@ namespace mongo { // Limit = buffer size of message - // (first int4 in message which is either flags or a zero constant) - size_t limit = _msg.singleData()->dataLen() - sizeof(int); + size_t limit = _msg.singleData().dataLen() - sizeof(int); _nsStart = _nextjsobj; _nsLen = strnlen(_nsStart, limit); @@ -100,20 +100,20 @@ namespace mongo { const char* p = _nsStart + _nsLen + 1; checkRead<int>(p, 2); - return ((reinterpret_cast<const int*>(p)))[1]; + return ConstDataView(p).readLE<int32_t>(sizeof(int32_t)); } int DbMessage::pullInt() { - return readAndAdvance<int>(); + return readAndAdvance<int32_t>(); } long long DbMessage::pullInt64() { - return readAndAdvance<long long>(); + return readAndAdvance<int64_t>(); } - const long long* DbMessage::getArray(size_t count) const { + const char* DbMessage::getArray(size_t count) const { checkRead<long long>(_nextjsobj, count); - return reinterpret_cast<const long long*>(_nextjsobj); + return _nextjsobj; } BSONObj DbMessage::nextJsObj() { @@ -158,7 +158,7 @@ namespace mongo { T DbMessage::read() const { checkRead<T>(_nextjsobj, 1); - return *(reinterpret_cast<const T*>(_nextjsobj)); + return ConstDataView(_nextjsobj).readLE<T>(); } template<typename T> T DbMessage::readAndAdvance() { @@ -174,18 +174,18 @@ namespace mongo { long long cursorId ) { BufBuilder b(32768); - b.skip(sizeof(QueryResult)); + b.skip(sizeof(QueryResult::Value)); b.appendBuf(data, size); - QueryResult *qr = (QueryResult *) b.buf(); - qr->_resultFlags() = queryResultFlags; - qr->len = b.len(); - qr->setOperation(opReply); - qr->cursorId = cursorId; - qr->startingFrom = startingFrom; - qr->nReturned = nReturned; + QueryResult::View qr = b.buf(); + qr.setResultFlags(queryResultFlags); + qr.msgdata().setLen(b.len()); + qr.msgdata().setOperation(opReply); + qr.setCursorId(cursorId); + qr.setStartingFrom(startingFrom); + qr.setNReturned(nReturned); b.decouple(); - Message resp(qr, true); - p->reply(requestMsg, resp, requestMsg.header()->id); + Message resp(qr.view2ptr(), true); + p->reply(requestMsg, resp, requestMsg.header().getId()); } void replyToQuery(int queryResultFlags, @@ -200,26 +200,26 @@ namespace mongo { Message *resp = new Message(); replyToQuery( queryResultFlags, *resp, obj ); dbresponse.response = resp; - dbresponse.responseTo = m.header()->id; + dbresponse.responseTo = m.header().getId(); } void replyToQuery( int queryResultFlags, Message& response, const BSONObj& resultObj ) { BufBuilder bufBuilder; - bufBuilder.skip( sizeof( QueryResult )); + bufBuilder.skip( sizeof( QueryResult::Value )); bufBuilder.appendBuf( reinterpret_cast< void *>( const_cast< char* >( resultObj.objdata() )), resultObj.objsize() ); - QueryResult* queryResult = reinterpret_cast< QueryResult* >( bufBuilder.buf() ); + QueryResult::View queryResult = bufBuilder.buf(); bufBuilder.decouple(); - queryResult->_resultFlags() = queryResultFlags; - queryResult->len = bufBuilder.len(); - queryResult->setOperation( opReply ); - queryResult->cursorId = 0; - queryResult->startingFrom = 0; - queryResult->nReturned = 1; + queryResult.setResultFlags(queryResultFlags); + queryResult.msgdata().setLen(bufBuilder.len()); + queryResult.msgdata().setOperation( opReply ); + queryResult.setCursorId(0); + queryResult.setStartingFrom(0); + queryResult.setNReturned(1); - response.setData( queryResult, true ); // transport will free + response.setData( queryResult.view2ptr(), true ); // transport will free } } diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index 5131e2d30a2..1b15a58030e 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -91,31 +91,114 @@ namespace mongo { Note that the update field layout is very similar layout to Query. */ - + namespace QueryResult { #pragma pack(1) - struct QueryResult : public MsgData { - long long cursorId; - int startingFrom; - int nReturned; - const char *data() { - return (char *) (((int *)&nReturned)+1); - } - int resultFlags() { - return dataAsInt(); - } - int& _resultFlags() { - return dataAsInt(); - } - void setResultFlagsToOk() { - _resultFlags() = ResultFlag_AwaitCapable; - } - void initializeResultFlags() { - _resultFlags() = 0; - } - }; - + /* see http://dochub.mongodb.org/core/mongowireprotocol + */ + struct Layout { + MsgData::Layout msgdata; + int64_t cursorId; + int32_t startingFrom; + int32_t nReturned; + }; #pragma pack() + class ConstView { + public: + ConstView(const char* storage) : _storage(storage) { } + + const char* view2ptr() const { + return storage().view(); + } + + MsgData::ConstView msgdata() const { + return storage().view(offsetof(Layout, msgdata)); + } + + int64_t getCursorId() const { + return storage().readLE<int64_t>(offsetof(Layout, cursorId)); + } + + int32_t getStartingFrom() const { + return storage().readLE<int32_t>(offsetof(Layout, startingFrom)); + } + + int32_t getNReturned() const { + return storage().readLE<int32_t>(offsetof(Layout, nReturned)); + } + + const char* data() const { + return storage().view(sizeof(Layout)); + } + + protected: + const ConstDataView& storage() const { + return _storage; + } + + private: + ConstDataView _storage; + }; + + class View : public ConstView { + public: + View(char* data) : ConstView(data) {} + + using ConstView::view2ptr; + char* view2ptr() { + return storage().view(); + } + + using ConstView::msgdata; + MsgData::View msgdata() { + return storage().view(offsetof(Layout, msgdata)); + } + + void setCursorId(int64_t value) { + return storage().writeLE(value, offsetof(Layout, cursorId)); + } + + void setStartingFrom(int32_t value) { + return storage().writeLE(value, offsetof(Layout, startingFrom)); + } + + void setNReturned(int32_t value) { + return storage().writeLE(value, offsetof(Layout, nReturned)); + } + + int32_t getResultFlags() { + return DataView(msgdata().data()).readLE<int32_t>(); + } + + void setResultFlags(int32_t value) { + return DataView(msgdata().data()).writeLE(value); + } + + void setResultFlagsToOk() { + setResultFlags(ResultFlag_AwaitCapable); + } + + void initializeResultFlags() { + setResultFlags(0); + } + + private: + DataView storage() const { + return const_cast<char*>(ConstView::view2ptr()); + } + }; + + class Value : public EncodedValueStorage<Layout, ConstView, View> { + public: + Value() { + BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout)); + } + + Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {} + }; + + } // namespace QueryResult + /* For the database/server protocol, these objects and functions encapsulate the various messages transmitted over the connection. @@ -147,7 +230,7 @@ namespace mongo { int pullInt(); long long pullInt64(); - const long long* getArray(size_t count) const; + const char* getArray(size_t count) const; /* for insert and update msgs */ bool moreJSObjs() const { @@ -215,7 +298,7 @@ namespace mongo { if ( d.moreJSObjs() ) { fields = d.nextJsObj(); } - queryOptions = d.msg().header()->dataAsInt(); + queryOptions = DataView(d.msg().header().data()).readLE<int32_t>(); } }; diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index 2835194ad04..30c69daca76 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -87,8 +87,17 @@ namespace mongo { MONGO_LOG_DEFAULT_COMPONENT_FILE(::mongo::logger::LogComponent::kCommands); // for diaglog - inline void opread(Message& m) { if( _diaglog.getLevel() & 2 ) _diaglog.readop((char *) m.singleData(), m.header()->len); } - inline void opwrite(Message& m) { if( _diaglog.getLevel() & 1 ) _diaglog.writeop((char *) m.singleData(), m.header()->len); } + inline void opread(Message& m) { + if (_diaglog.getLevel() & 2) { + _diaglog.readop(m.singleData().view2ptr(), m.header().getLen()); + } + } + + inline void opwrite(Message& m) { + if (_diaglog.getLevel() & 1) { + _diaglog.writeop(m.singleData().view2ptr(), m.header().getLen()); + } + } void receivedKillCursors(OperationContext* txn, Message& m); void receivedUpdate(OperationContext* txn, Message& m, CurOp& op); @@ -229,7 +238,7 @@ namespace mongo { static bool receivedQuery(OperationContext* txn, Client& c, DbResponse& dbresponse, Message& m ) { bool ok = true; - MSGID responseTo = m.header()->id; + MSGID responseTo = m.header().getId(); DbMessage d(m); QueryMessage q(d); @@ -286,26 +295,26 @@ namespace mongo { } BufBuilder b; - b.skip(sizeof(QueryResult)); + b.skip(sizeof(QueryResult::Value)); b.appendBuf((void*) errObj.objdata(), errObj.objsize()); // todo: call replyToQuery() from here instead of this!!! see dbmessage.h - QueryResult * msgdata = (QueryResult *) b.buf(); + QueryResult::View msgdata = b.buf(); b.decouple(); - QueryResult *qr = msgdata; - qr->_resultFlags() = ResultFlag_ErrSet; - if( scex ) qr->_resultFlags() |= ResultFlag_ShardConfigStale; - qr->len = b.len(); - qr->setOperation(opReply); - qr->cursorId = 0; - qr->startingFrom = 0; - qr->nReturned = 1; + QueryResult::View qr = msgdata; + qr.setResultFlags(ResultFlag_ErrSet); + if( scex ) qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale); + qr.msgdata().setLen(b.len()); + qr.msgdata().setOperation(opReply); + qr.setCursorId(0); + qr.setStartingFrom(0); + qr.setNReturned(1); resp.reset( new Message() ); - resp->setData( msgdata, true ); + resp->setData( msgdata.view2ptr(), true ); } - op.debug().responseLength = resp->header()->dataLen(); + op.debug().responseLength = resp->header().dataLen(); dbresponse.response = resp.release(); dbresponse.responseTo = responseTo; @@ -445,7 +454,7 @@ namespace mongo { resp->setData( opReply , "i am fine - dbMsg deprecated"); dbresponse.response = resp; - dbresponse.responseTo = m.header()->id; + dbresponse.responseTo = m.header().getId(); } else { try { @@ -537,7 +546,7 @@ namespace mongo { verify( n < 30000 ); } - const long long* cursorArray = dbmessage.getArray(n); + const char* cursorArray = dbmessage.getArray(n); int found = CollectionCursorCache::eraseCursorGlobalIfAuthorized(txn, n, cursorArray); @@ -556,11 +565,11 @@ namespace mongo { BSONObj query = d.nextJsObj(); verify( d.moreJSObjs() ); - verify( query.objsize() < m.header()->dataLen() ); + verify( query.objsize() < m.header().dataLen() ); BSONObj toupdate = d.nextJsObj(); uassert( 10055 , "update object too large", toupdate.objsize() <= BSONObjMaxUserSize); - verify( toupdate.objsize() < m.header()->dataLen() ); - verify( query.objsize() + toupdate.objsize() < m.header()->dataLen() ); + verify( toupdate.objsize() < m.header().dataLen() ); + verify( query.objsize() + toupdate.objsize() < m.header().dataLen() ); bool upsert = flags & UpdateOption_Upsert; bool multi = flags & UpdateOption_Multi; bool broadcast = flags & UpdateOption_Broadcast; @@ -640,7 +649,7 @@ namespace mongo { op.debug().ndeleted = n; } - QueryResult* emptyMoreResult(long long); + QueryResult::View emptyMoreResult(long long); bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop ) { bool ok = true; @@ -659,7 +668,7 @@ namespace mongo { scoped_ptr<Timer> timer; int pass = 0; bool exhaust = false; - QueryResult* msgdata = 0; + QueryResult::View msgdata = 0; OpTime last; while( 1 ) { bool isCursorAuthorized = false; @@ -709,7 +718,7 @@ namespace mongo { break; } - if (msgdata == 0) { + if (msgdata.view2ptr() == 0) { // this should only happen with QueryOption_AwaitData exhaust = false; massert(13073, "shutting down", !inShutdown() ); @@ -746,18 +755,18 @@ namespace mongo { curop.debug().exceptionInfo = ex->getInfo(); replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj); - curop.debug().responseLength = dbresponse.response->header()->dataLen(); + curop.debug().responseLength = dbresponse.response->header().dataLen(); curop.debug().nreturned = 1; return ok; } Message *resp = new Message(); - resp->setData(msgdata, true); - curop.debug().responseLength = resp->header()->dataLen(); - curop.debug().nreturned = msgdata->nReturned; + resp->setData(msgdata.view2ptr(), true); + curop.debug().responseLength = resp->header().dataLen(); + curop.debug().nreturned = msgdata.getNReturned(); dbresponse.response = resp; - dbresponse.responseTo = m.header()->id; + dbresponse.responseTo = m.header().getId(); if( exhaust ) { curop.debug().exhaust = true; diff --git a/src/mongo/db/query/new_find.cpp b/src/mongo/db/query/new_find.cpp index c480bca6f09..8c5ae0a3fb7 100644 --- a/src/mongo/db/query/new_find.cpp +++ b/src/mongo/db/query/new_find.cpp @@ -148,7 +148,7 @@ namespace mongo { * when this method returns an empty result, incrementing pass on each call. * Thus, pass == 0 indicates this is the first "attempt" before any 'awaiting'. */ - QueryResult* newGetMore(OperationContext* txn, + QueryResult::View newGetMore(OperationContext* txn, const char* ns, int ntoreturn, long long cursorid, @@ -193,9 +193,11 @@ namespace mongo { int numResults = 0; int startingResult = 0; - const int InitialBufSize = 512 + sizeof(QueryResult) + MaxBytesToReturnToClientAtOnce; + const int InitialBufSize = + 512 + sizeof(QueryResult::Value) + MaxBytesToReturnToClientAtOnce; + BufBuilder bb(InitialBufSize); - bb.skip(sizeof(QueryResult)); + bb.skip(sizeof(QueryResult::Value)); if (NULL == cc) { cursorid = 0; @@ -345,13 +347,13 @@ namespace mongo { } } - QueryResult* qr = reinterpret_cast<QueryResult*>(bb.buf()); - qr->len = bb.len(); - qr->setOperation(opReply); - qr->_resultFlags() = resultFlags; - qr->cursorId = cursorid; - qr->startingFrom = startingResult; - qr->nReturned = numResults; + QueryResult::View qr = bb.buf(); + qr.msgdata().setLen(bb.len()); + qr.msgdata().setOperation(opReply); + qr.setResultFlags(resultFlags); + qr.setCursorId(cursorid); + qr.setStartingFrom(startingResult); + qr.setNReturned(numResults); bb.decouple(); QLOG() << "getMore returned " << numResults << " results\n"; return qr; @@ -461,7 +463,7 @@ namespace mongo { curop.markCommand(); BufBuilder bb; - bb.skip(sizeof(QueryResult)); + bb.skip(sizeof(QueryResult::Value)); BSONObjBuilder cmdResBuf; if (!runCommands(txn, ns, q.query, curop, bb, cmdResBuf, false, q.queryOptions)) { @@ -472,16 +474,16 @@ namespace mongo { // TODO: Does this get overwritten/do we really need to set this twice? curop.debug().query = q.query; - QueryResult* qr = reinterpret_cast<QueryResult*>(bb.buf()); + QueryResult::View qr = bb.buf(); bb.decouple(); - qr->setResultFlagsToOk(); - qr->len = bb.len(); + qr.setResultFlagsToOk(); + qr.msgdata().setLen(bb.len()); curop.debug().responseLength = bb.len(); - qr->setOperation(opReply); - qr->cursorId = 0; - qr->startingFrom = 0; - qr->nReturned = 1; - result.setData(qr, true); + qr.msgdata().setOperation(opReply); + qr.setCursorId(0); + qr.setStartingFrom(0); + qr.setNReturned(1); + result.setData(qr.view2ptr(), true); return ""; } @@ -524,7 +526,7 @@ namespace mongo { } BufBuilder bb; - bb.skip(sizeof(QueryResult)); + bb.skip(sizeof(QueryResult::Value)); PlanExecutor* rawExec; // Takes ownership of 'cq'. @@ -550,16 +552,16 @@ namespace mongo { curop.debug().query = q.query; // Set query result fields. - QueryResult* qr = reinterpret_cast<QueryResult*>(bb.buf()); + QueryResult::View qr = bb.buf(); bb.decouple(); - qr->setResultFlagsToOk(); - qr->len = bb.len(); + qr.setResultFlagsToOk(); + qr.msgdata().setLen(bb.len()); curop.debug().responseLength = bb.len(); - qr->setOperation(opReply); - qr->cursorId = 0; - qr->startingFrom = 0; - qr->nReturned = 1; - result.setData(qr, true); + qr.msgdata().setOperation(opReply); + qr.setCursorId(0); + qr.setStartingFrom(0); + qr.setNReturned(1); + result.setData(qr.view2ptr(), true); return ""; } @@ -636,7 +638,7 @@ namespace mongo { // this buffer should contain either requested documents per query or // explain information, but not both BufBuilder bb(32768); - bb.skip(sizeof(QueryResult)); + bb.skip(sizeof(QueryResult::Value)); // How many results have we obtained from the executor? int numResults = 0; @@ -839,13 +841,13 @@ namespace mongo { bb.decouple(); // Fill out the output buffer's header. - QueryResult* qr = static_cast<QueryResult*>(result.header()); - qr->cursorId = ccId; + QueryResult::View qr = result.header().view2ptr(); + qr.setCursorId(ccId); curop.debug().cursorid = (0 == ccId ? -1 : ccId); - qr->setResultFlagsToOk(); - qr->setOperation(opReply); - qr->startingFrom = 0; - qr->nReturned = numResults; + qr.setResultFlagsToOk(); + qr.msgdata().setOperation(opReply); + qr.setStartingFrom(0); + qr.setNReturned(numResults); // Set debug information for consumption by the profiler. curop.debug().ntoskip = pq.getSkip(); diff --git a/src/mongo/db/query/new_find.h b/src/mongo/db/query/new_find.h index c136cee8981..99232c1088b 100644 --- a/src/mongo/db/query/new_find.h +++ b/src/mongo/db/query/new_find.h @@ -43,7 +43,7 @@ namespace mongo { /** * Called from the getMore entry point in ops/query.cpp. */ - QueryResult* newGetMore(OperationContext* txn, + QueryResult::View newGetMore(OperationContext* txn, const char* ns, int ntoreturn, long long cursorid, diff --git a/src/mongo/dbtests/config_server_fixture.h b/src/mongo/dbtests/config_server_fixture.h index b97eee7cd1d..9f61d8ba30c 100644 --- a/src/mongo/dbtests/config_server_fixture.h +++ b/src/mongo/dbtests/config_server_fixture.h @@ -52,21 +52,21 @@ namespace mongo { // This is tailored to act as a dummy response for write commands. BufBuilder bb; - bb.skip(sizeof(QueryResult)); + bb.skip(sizeof(QueryResult::Value)); BSONObj cmdResult(BSON("ok" << 1)); bb.appendBuf(cmdResult.objdata(), cmdResult.objsize()); - QueryResult* qr = reinterpret_cast<QueryResult*>(bb.buf()); + QueryResult::View qr = bb.buf(); bb.decouple(); - qr->setResultFlagsToOk(); - qr->len = bb.len(); - qr->setOperation(opReply); - qr->cursorId = 0; - qr->startingFrom = 0; - qr->nReturned = 1; - m.setData(qr, true); + qr.setResultFlagsToOk(); + qr.msgdata().setLen(bb.len()); + qr.msgdata().setOperation(opReply); + qr.setCursorId(0); + qr.setStartingFrom(0); + qr.setNReturned(1); + m.setData(qr.view2ptr(), true); return true; } diff --git a/src/mongo/s/cursors.cpp b/src/mongo/s/cursors.cpp index 6f2b588e3fd..39d14ed9c8d 100644 --- a/src/mongo/s/cursors.cpp +++ b/src/mongo/s/cursors.cpp @@ -34,6 +34,7 @@ #include <string> #include <vector> +#include "mongo/base/data_cursor.h" #include "mongo/db/audit.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" @@ -346,11 +347,12 @@ namespace mongo { m.dataSize() == 8 + ( 8 * n ) ); - const long long* cursors = dbmessage.getArray(n); + ConstDataCursor cursors(dbmessage.getArray(n)); + ClientBasic* client = ClientBasic::getCurrent(); AuthorizationSession* authSession = client->getAuthorizationSession(); for ( int i=0; i<n; i++ ) { - long long id = cursors[i]; + long long id = cursors.readLEAndAdvance<int64_t>(); LOG(_myLogLevel) << "CursorCache::gotKillCursors id: " << id << endl; if ( ! id ) { diff --git a/src/mongo/s/d_logic.cpp b/src/mongo/s/d_logic.cpp index e183e97d523..0c036f65c3d 100644 --- a/src/mongo/s/d_logic.cpp +++ b/src/mongo/s/d_logic.cpp @@ -85,7 +85,7 @@ namespace mongo { if( getsAResponse ){ verify( dbresponse ); BufBuilder b( 32768 ); - b.skip( sizeof( QueryResult ) ); + b.skip( sizeof( QueryResult::Value ) ); { BSONObjBuilder bob; @@ -99,20 +99,20 @@ namespace mongo { b.appendBuf( obj.objdata() , obj.objsize() ); } - QueryResult *qr = (QueryResult*)b.buf(); - qr->_resultFlags() = ResultFlag_ErrSet | ResultFlag_ShardConfigStale; - qr->len = b.len(); - qr->setOperation( opReply ); - qr->cursorId = 0; - qr->startingFrom = 0; - qr->nReturned = 1; + QueryResult::View qr = b.buf(); + qr.setResultFlags(ResultFlag_ErrSet | ResultFlag_ShardConfigStale); + qr.msgdata().setLen(b.len()); + qr.msgdata().setOperation( opReply ); + qr.setCursorId(0); + qr.setStartingFrom(0); + qr.setNReturned(1); b.decouple(); Message * resp = new Message(); - resp->setData( qr , true ); + resp->setData( qr.view2ptr() , true ); dbresponse->response = resp; - dbresponse->responseTo = m.header()->id; + dbresponse->responseTo = m.header().getId(); return true; } @@ -139,8 +139,10 @@ namespace mongo { wanted.addToBSON( b ); received.addToBSON( b, "yourVersion" ); - b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) ); - LOG(2) << "writing back msg with len: " << m.header()->len << " op: " << m.operation() << endl; + b.appendBinData( "msg" , m.header().getLen() , bdtCustom , m.singleData().view2ptr() ); + + LOG(2) << "writing back msg with len: " << m.header().getLen() + << " op: " << m.operation() << endl; // we pass the builder to queueWriteBack so that it can select the writebackId // this is important so that the id is guaranteed to be ascending diff --git a/src/mongo/s/dbclient_multi_command.cpp b/src/mongo/s/dbclient_multi_command.cpp index a6251f311cf..09bdacfe63a 100644 --- a/src/mongo/s/dbclient_multi_command.cpp +++ b/src/mongo/s/dbclient_multi_command.cpp @@ -146,8 +146,8 @@ namespace mongo { } // A query result is returned from commands - QueryResult* recvdQuery = reinterpret_cast<QueryResult*>( toRecv->singleData() ); - *result = BSONObj( recvdQuery->data() ); + QueryResult::View recvdQuery = toRecv->singleData().view2ptr(); + *result = BSONObj( recvdQuery.data() ); } void DBClientMultiCommand::sendAll() { diff --git a/src/mongo/s/request.cpp b/src/mongo/s/request.cpp index f3d09e191f9..06f355f82f1 100644 --- a/src/mongo/s/request.cpp +++ b/src/mongo/s/request.cpp @@ -53,7 +53,7 @@ namespace mongo { Request::Request( Message& m, AbstractMessagingPort* p ) : _m(m) , _d( m ) , _p(p) , _didInit(false) { - _id = _m.header()->id; + _id = _m.header().getId(); _txn.reset(new OperationContextNoop()); @@ -76,7 +76,7 @@ namespace mongo { // Deprecated, will move to the strategy itself void Request::reset() { - _m.header()->id = _id; + _m.header().setId(_id); _clientInfo->clearRequestInfo(); if ( !_d.messageShouldHaveNs()) { @@ -93,7 +93,7 @@ namespace mongo { int op = _m.operation(); verify( op > dbMsg ); - int msgId = (int)(_m.header()->id); + int msgId = (int)(_m.header().getId()); Timer t; LOG(3) << "Request::process begin ns: " << getns() @@ -146,7 +146,7 @@ namespace mongo { void Request::reply( Message & response , const string& fromServer ) { verify( _didInit ); - long long cursor =response.header()->getCursor(); + long long cursor = response.header().getCursor(); if ( cursor ) { if ( fromServer.size() ) { cursorCache.storeRef(fromServer, cursor, getns()); diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 1cd1eb97914..324d94224e1 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -150,7 +150,7 @@ namespace mongo { << " for " << r.getns() << causedBy( ex ) << endl; if ( r.expectResponse() ) { - m.header()->id = r.id(); + m.header().setId(r.id()); replyToQuery( ResultFlag_ErrSet, p , m , buildErrReply( ex ) ); } @@ -164,7 +164,7 @@ namespace mongo { << " for " << r.getns() << causedBy( ex ) << endl; if ( r.expectResponse() ) { - m.header()->id = r.id(); + m.header().setId(r.id()); replyToQuery( ResultFlag_ErrSet, p , m , buildErrReply( ex ) ); } diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp index 63fe466cdc2..7fa0147e1c4 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/strategy.cpp @@ -105,8 +105,8 @@ namespace mongo { uassert( 10200 , "mongos: error calling db", ok ); { - QueryResult *qr = (QueryResult *) response.singleData(); - if ( qr->resultFlags() & ResultFlag_ShardConfigStale ) { + QueryResult::View qr = response.singleData().view2ptr(); + if ( qr.getResultFlags() & ResultFlag_ShardConfigStale ) { dbcon.done(); // Version is zero b/c this is deprecated codepath throw RecvStaleConfigException( r.getns(), @@ -498,7 +498,7 @@ namespace mongo { bool ok = conn->callRead( r.m() , response); uassert( 10204 , "dbgrid: getmore: error calling db", ok); - bool hasMore = (response.singleData()->getCursor() != 0); + bool hasMore = (response.singleData().getCursor() != 0); if ( !hasMore ) { cursorCache.removeRef( id ); diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index 663260aea5f..7181258505c 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -80,7 +80,7 @@ public: } sleepmillis(mongoBridgeGlobalParams.delay); - int oldId = m.header()->id; + int oldId = m.header().getId(); if ( m.operation() == dbQuery || m.operation() == dbMsg || m.operation() == dbGetMore ) { bool exhaust = false; if ( m.operation() == dbQuery ) { @@ -96,9 +96,9 @@ public: mp_.reply( m, response, oldId ); while ( exhaust ) { - MsgData *header = response.header(); - QueryResult *qr = (QueryResult *) header; - if ( qr->cursorId ) { + MsgData::View header = response.header(); + QueryResult::View qr = header.view2ptr(); + if ( qr.getCursorId() ) { response.reset(); dest.port().recv( response ); mp_.reply( m, response ); // m argument is ignored anyway diff --git a/src/mongo/tools/sniffer.cpp b/src/mongo/tools/sniffer.cpp index 9e39a3b1d15..c4da46703fb 100644 --- a/src/mongo/tools/sniffer.cpp +++ b/src/mongo/tools/sniffer.cpp @@ -71,12 +71,10 @@ using namespace std; using mongo::Message; -using mongo::MsgData; using mongo::DbMessage; using mongo::BSONObj; using mongo::BufBuilder; using mongo::DBClientConnection; -using mongo::QueryResult; using mongo::MemoryMappedFile; #define SNAP_LEN 65535 @@ -222,17 +220,17 @@ void got_packet(u_char *args, const struct pcap_pkthdr *header, const u_char *pa Message m; if ( bytesRemainingInMessage[ c ] == 0 ) { - m.setData( (MsgData*)payload , false ); - if ( !m.header()->valid() ) { + m.setData( const_cast<char *>(reinterpret_cast<const char *>(payload)) , false ); + if ( !m.header().valid() ) { cerr << "Invalid message start, skipping packet." << endl; return; } - if ( size_payload > m.header()->len ) { + if ( size_payload > m.header().getLen() ) { cerr << "Multiple messages in packet, skipping packet." << endl; return; } - if ( size_payload < m.header()->len ) { - bytesRemainingInMessage[ c ] = m.header()->len - size_payload; + if ( size_payload < m.header().getLen() ) { + bytesRemainingInMessage[ c ] = m.header().getLen() - size_payload; messageBuilder[ c ].reset( new BufBuilder() ); messageBuilder[ c ]->appendBuf( (void*)payload, size_payload ); return; @@ -249,7 +247,7 @@ void got_packet(u_char *args, const struct pcap_pkthdr *header, const u_char *pa } if ( bytesRemainingInMessage[ c ] > 0 ) return; - m.setData( (MsgData*)messageBuilder[ c ]->buf(), true ); + m.setData( messageBuilder[ c ]->buf(), true ); messageBuilder[ c ]->decouple(); messageBuilder[ c ].reset(); } @@ -260,8 +258,8 @@ void got_packet(u_char *args, const struct pcap_pkthdr *header, const u_char *pa << ( serverPorts.count( ntohs( tcp->th_dport ) ) ? " -->> " : " <<-- " ) << inet_ntoa(ip->ip_dst) << ":" << ntohs( tcp->th_dport ) << " " << d.getns() - << " " << m.header()->len << " bytes " - << " id:" << hex << m.header()->id << dec << "\t" << m.header()->id; + << " " << m.header().getLen() << " bytes " + << " id:" << hex << m.header().getId() << dec << "\t" << m.header().getId(); processMessage( c , m ); } @@ -283,16 +281,21 @@ void processMessage( Connection& c , Message& m ) { AuditingDbMessage d(m); if ( m.operation() == mongo::opReply ) - out() << " - " << (unsigned)m.header()->responseTo; + out() << " - " << (unsigned)m.header().getResponseTo(); out() << '\n'; try { switch( m.operation() ) { case mongo::opReply: { - mongo::QueryResult* r = (mongo::QueryResult*)m.singleData(); - out() << "\treply" << " n:" << r->nReturned << " cursorId: " << r->cursorId << endl; - if ( r->nReturned ) { - mongo::BSONObj o( r->data() ); + mongo::QueryResult::View r = m.singleData().view2ptr(); + + out() << "\treply" + << " n:" << r.getNReturned() + << " cursorId: " << r.getCursorId() + << endl; + + if ( r.getNReturned() ) { + mongo::BSONObj o( r.data() ); out() << "\t" << o << endl; } break; @@ -369,10 +372,10 @@ void processMessage( Connection& c , Message& m ) { } Message response; conn->port().call( m, response ); - QueryResult *qr = (QueryResult *) response.singleData(); - if ( !( qr->resultFlags() & mongo::ResultFlag_CursorNotFound ) ) { - if ( qr->cursorId != 0 ) { - lastCursor[ c ] = qr->cursorId; + mongo::QueryResult::View qr = response.singleData().view2ptr(); + if ( !( qr.getResultFlags() & mongo::ResultFlag_CursorNotFound ) ) { + if ( qr.getCursorId() != 0 ) { + lastCursor[ c ] = qr.getCursorId(); return; } } @@ -385,16 +388,16 @@ void processMessage( Connection& c , Message& m ) { else { Connection r = c.reverse(); long long myCursor = lastCursor[ r ]; - QueryResult *qr = (QueryResult *) m.singleData(); - long long yourCursor = qr->cursorId; - if ( ( qr->resultFlags() & mongo::ResultFlag_CursorNotFound ) ) + mongo::QueryResult::View qr = m.singleData().view2ptr(); + long long yourCursor = qr.getCursorId(); + if ( ( qr.getResultFlags() & mongo::ResultFlag_CursorNotFound ) ) yourCursor = 0; if ( myCursor && !yourCursor ) cerr << "Expected valid cursor in sniffed response, found none" << endl; if ( !myCursor && yourCursor ) cerr << "Sniffed valid cursor when none expected" << endl; if ( myCursor && yourCursor ) { - mapCursor[ r ][ qr->cursorId ] = lastCursor[ r ]; + mapCursor[ r ][ qr.getCursorId() ] = lastCursor[ r ]; lastCursor[ r ] = 0; } } @@ -417,7 +420,7 @@ void processDiagLog( const char * file ) { long read = 0; while ( read < length ) { Message m(pos,false); - int len = m.header()->len; + int len = m.header().getLen(); DbMessage d(m); cout << len << " " << d.getns() << endl; diff --git a/src/mongo/util/goodies.h b/src/mongo/util/goodies.h index 74e48bd65b7..4be632afed7 100644 --- a/src/mongo/util/goodies.h +++ b/src/mongo/util/goodies.h @@ -33,7 +33,6 @@ #include <iostream> #include <sstream> -#include <boost/detail/endian.hpp> #include <boost/intrusive_ptr.hpp> #include <boost/scoped_array.hpp> #include <boost/scoped_ptr.hpp> @@ -108,26 +107,6 @@ namespace mongo { return strcmp(p + a - b, suffix) == 0; } - inline unsigned long swapEndian(unsigned long x) { - return - ((x & 0xff) << 24) | - ((x & 0xff00) << 8) | - ((x & 0xff0000) >> 8) | - ((x & 0xff000000) >> 24); - } - -#if defined(BOOST_LITTLE_ENDIAN) - inline unsigned long fixEndian(unsigned long x) { - return x; - } -#elif defined(BOOST_BIG_ENDIAN) - inline unsigned long fixEndian(unsigned long x) { - return swapEndian(x); - } -#else -#error no boost endian header defined -#endif - #if !defined(_WIN32) typedef int HANDLE; inline void strcpy_s(char *dst, unsigned len, const char *src) { diff --git a/src/mongo/util/net/message.cpp b/src/mongo/util/net/message.cpp index e3b7eb1f139..ce038190601 100644 --- a/src/mongo/util/net/message.cpp +++ b/src/mongo/util/net/message.cpp @@ -46,7 +46,7 @@ namespace mongo { return; } if ( _buf != 0 ) { - p.send( (char*)_buf, _buf->len, context ); + p.send( _buf, MsgData::ConstView(_buf).getLen(), context ); } else { p.send( _data, context ); diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h index 80205e1d1f5..3c7bffa7654 100644 --- a/src/mongo/util/net/message.h +++ b/src/mongo/util/net/message.h @@ -33,6 +33,8 @@ #include "mongo/platform/atomic_word.h" #include "mongo/platform/cstdint.h" +#include "mongo/base/data_view.h" +#include "mongo/base/encoded_value_storage.h" #include "mongo/util/goodies.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" @@ -106,77 +108,225 @@ namespace mongo { } + namespace MSGHEADER { #pragma pack(1) - /* see http://dochub.mongodb.org/core/mongowireprotocol - */ - struct MSGHEADER { - int messageLength; // total message size, including this - int requestID; // identifier for this message - int responseTo; // requestID from the original request - // (used in responses from db) - int opCode; - }; + /* see http://dochub.mongodb.org/core/mongowireprotocol + */ + struct Layout { + int32_t messageLength; // total message size, including this + int32_t requestID; // identifier for this message + int32_t responseTo; // requestID from the original request + // (used in responses from db) + int32_t opCode; + }; #pragma pack() -#pragma pack(1) - /* todo merge this with MSGHEADER (or inherit from it). */ - class MsgData { - friend class Message; - friend class DbMessage; - friend class MessagingPort; - public: - int len; /* len of the msg, including this field */ - MSGID id; /* request/reply id's match... */ - MSGID responseTo; /* id of the message we are responding to */ - short _operation; - char _flags; - char _version; - - int operation() const { - return _operation; - } - void setOperation(int o) { - _flags = 0; - _version = 0; - _operation = o; - } + class ConstView { + public: + typedef ConstDataView view_type; - int& dataAsInt() { - return *((int *) _data); - } + ConstView(const char* data) : _data(data) { } - bool valid() { - if ( len <= 0 || len > ( 4 * BSONObjMaxInternalSize ) ) - return false; - if ( _operation < 0 || _operation > 30000 ) - return false; - return true; - } + const char* view2ptr() const { + return data().view(); + } - long long getCursor() { - verify( responseTo > 0 ); - verify( _operation == opReply ); - long long * l = (long long *)(_data + 4); - return l[0]; - } + int32_t getMessageLength() const { + return data().readLE<int32_t>(offsetof(Layout, messageLength)); + } - int dataLen(); // len without header - private: - char _data[4]; //must be last member - }; - const int MsgDataHeaderSize = sizeof(MsgData) - 4; - inline int MsgData::dataLen() { - return len - MsgDataHeaderSize; - } + int32_t getRequestID() const { + return data().readLE<int32_t>(offsetof(Layout, requestID)); + } + + int32_t getResponseTo() const { + return data().readLE<int32_t>(offsetof(Layout, responseTo)); + } + + int32_t getOpCode() const { + return data().readLE<int32_t>(offsetof(Layout, opCode)); + } + + protected: + const view_type& data() const { + return _data; + } + + private: + view_type _data; + }; + + class View : public ConstView { + public: + typedef DataView view_type; + + View(char* data) : ConstView(data) {} + + using ConstView::view2ptr; + char* view2ptr() { + return data().view(); + } + + void setMessageLength(int32_t value) { + return data().writeLE(value, offsetof(Layout, messageLength)); + } + + void setRequestID(int32_t value) { + return data().writeLE(value, offsetof(Layout, requestID)); + } + + void setResponseTo(int32_t value) { + return data().writeLE(value, offsetof(Layout, responseTo)); + } + + void setOpCode(int32_t value) { + return data().writeLE(value, offsetof(Layout, opCode)); + } + + private: + view_type data() const { + return const_cast<char *>(ConstView::view2ptr()); + } + }; + + class Value : public EncodedValueStorage<Layout, ConstView, View> { + public: + Value() { + BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout)); + } + + Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {} + }; + + } // namespace MSGHEADER + + namespace MsgData { +#pragma pack(1) + struct Layout { + MSGHEADER::Layout header; + char data[4]; + }; #pragma pack() + class ConstView { + public: + ConstView(const char* storage) : _storage(storage) { } + + const char* view2ptr() const { + return storage().view(); + } + + int32_t getLen() const { + return header().getMessageLength(); + } + + MSGID getId() const { + return header().getRequestID(); + } + + MSGID getResponseTo() const { + return header().getResponseTo(); + } + + int32_t getOperation() const { + return header().getOpCode(); + } + + const char* data() const { + return storage().view(offsetof(Layout, data)); + } + + bool valid() const { + if ( getLen() <= 0 || getLen() > ( 4 * BSONObjMaxInternalSize ) ) + return false; + if ( getOperation() < 0 || getOperation() > 30000 ) + return false; + return true; + } + + int64_t getCursor() const { + verify( getResponseTo() > 0 ); + verify( getOperation() == opReply ); + return ConstDataView(data() + sizeof(int32_t)).readLE<int64_t>(); + } + + int dataLen() const; // len without header + + protected: + const ConstDataView& storage() const { + return _storage; + } + + MSGHEADER::ConstView header() const { + return storage().view(offsetof(Layout, header)); + } + + private: + ConstDataView _storage; + }; + + class View : public ConstView { + public: + View(char* storage) : ConstView(storage) {} + + using ConstView::view2ptr; + char* view2ptr() { + return storage().view(); + } + + void setLen(int value) { + return header().setMessageLength(value); + } + + void setId(MSGID value) { + return header().setRequestID(value); + } + + void setResponseTo(MSGID value) { + return header().setResponseTo(value); + } + + void setOperation(int value) { + return header().setOpCode(value); + } + + using ConstView::data; + char* data() { + return storage().view(offsetof(Layout, data)); + } + + private: + DataView storage() const { + return const_cast<char *>(ConstView::view2ptr()); + } + + MSGHEADER::View header() const { + return storage().view(offsetof(Layout, header)); + } + }; + + class Value : public EncodedValueStorage<Layout, ConstView, View> { + public: + Value() { + BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout)); + } + + Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {} + }; + + const int MsgDataHeaderSize = sizeof(Value) - 4; + inline int ConstView::dataLen() const { + return getLen() - MsgDataHeaderSize; + } + } // namespace MsgData + class Message { public: // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit). Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {} Message( void * data , bool freeIt ) : _buf( 0 ), _data( 0 ), _freeIt( false ) { - _setData( reinterpret_cast< MsgData* >( data ), freeIt ); + _setData( reinterpret_cast< char* >( data ), freeIt ); }; Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) { *this = r; @@ -187,13 +337,14 @@ namespace mongo { SockAddr _from; - MsgData *header() const { + MsgData::View header() const { verify( !empty() ); - return _buf ? _buf : reinterpret_cast< MsgData* > ( _data[ 0 ].first ); + return _buf ? _buf : _data[ 0 ].first; } - int operation() const { return header()->operation(); } - MsgData *singleData() const { + int operation() const { return header().getOperation(); } + + MsgData::View singleData() const { massert( 13273, "single data buffer expected", _buf ); return header(); } @@ -203,7 +354,7 @@ namespace mongo { int size() const { int res = 0; if ( _buf ) { - res = _buf->len; + res = MsgData::ConstView(_buf).getLen(); } else { for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) { @@ -213,7 +364,7 @@ namespace mongo { return res; } - int dataSize() const { return size() - sizeof(MSGHEADER); } + int dataSize() const { return size() - sizeof(MSGHEADER::Value); } // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy // can get rid of this if we make response handling smarter @@ -236,7 +387,7 @@ namespace mongo { p += i->second; } reset(); - _setData( (MsgData*)buf, true ); + _setData( buf, true ); } // vector swap() so this is fast @@ -275,22 +426,22 @@ namespace mongo { return; } if ( empty() ) { - MsgData *md = (MsgData*)d; - md->len = size; // can be updated later if more buffers added - _setData( md, true ); + MsgData::View md = d; + md.setLen(size); // can be updated later if more buffers added + _setData( md.view2ptr(), true ); return; } verify( _freeIt ); if ( _buf ) { - _data.push_back(std::make_pair((char*)_buf, _buf->len)); + _data.push_back(std::make_pair(_buf, MsgData::ConstView(_buf).getLen())); _buf = 0; } _data.push_back(std::make_pair(d, size)); - header()->len += size; + header().setLen(header().getLen() + size); } // use to set first buffer if empty - void setData(MsgData *d, bool freeIt) { + void setData(char* d, bool freeIt) { verify( empty() ); _setData( d, freeIt ); } @@ -299,12 +450,12 @@ namespace mongo { } void setData(int operation, const char *msgdata, size_t len) { verify( empty() ); - size_t dataLen = len + sizeof(MsgData) - 4; - MsgData *d = (MsgData *) malloc(dataLen); - memcpy(d->_data, msgdata, len); - d->len = fixEndian(dataLen); - d->setOperation(operation); - _setData( d, true ); + size_t dataLen = len + sizeof(MsgData::Value) - 4; + MsgData::View d = reinterpret_cast<char *>(malloc(dataLen)); + memcpy(d.data(), msgdata, len); + d.setLen(dataLen); + d.setOperation(operation); + _setData( d.view2ptr(), true ); } bool doIFreeIt() { @@ -316,12 +467,12 @@ namespace mongo { std::string toString() const; private: - void _setData( MsgData *d, bool freeIt ) { + void _setData( char* d, bool freeIt ) { _freeIt = freeIt; _buf = d; } // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data - MsgData * _buf; + char* _buf; // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead typedef std::vector< std::pair< char*, int > > MsgVec; MsgVec _data; diff --git a/src/mongo/util/net/message_port.cpp b/src/mongo/util/net/message_port.cpp index 2c4d723d79f..8dc67ab2896 100644 --- a/src/mongo/util/net/message_port.cpp +++ b/src/mongo/util/net/message_port.cpp @@ -82,13 +82,13 @@ namespace mongo { } void append( Message& m ) { - verify( m.header()->len <= 1300 ); + verify( m.header().getLen() <= 1300 ); - if ( len() + m.header()->len > 1300 ) + if ( len() + m.header().getLen() > 1300 ) flush(); - memcpy( _cur , m.singleData() , m.header()->len ); - _cur += m.header()->len; + memcpy( _cur , m.singleData().view2ptr() , m.header().getLen() ); + _cur += m.header().getLen(); } void flush() { @@ -173,10 +173,10 @@ namespace mongo { try { again: //mmm( log() << "* recv() sock:" << this->sock << endl; ) - MSGHEADER header; - int headerLen = sizeof(MSGHEADER); + MSGHEADER::Value header; + int headerLen = sizeof(MSGHEADER::Value); psock->recv( (char *)&header, headerLen ); - int len = header.messageLength; + int len = header.constView().getMessageLength(); if ( len == 542393671 ) { // an http GET @@ -198,12 +198,14 @@ again: // If responseTo is not 0 or -1 for first packet assume SSL else if (psock->isAwaitingHandshake()) { #ifndef MONGO_SSL - if (header.responseTo != 0 && header.responseTo != -1) { + if (header.constView().getResponseTo() != 0 + && header.constView().getResponseTo() != -1) { uasserted(17133, "SSL handshake requested, SSL feature not available in this build"); } #else - if (header.responseTo != 0 && header.responseTo != -1) { + if (header.constView().getResponseTo() != 0 + && header.constView().getResponseTo() != -1) { uassert(17132, "SSL handshake received but server is started without SSL support", sslGlobalParams.sslMode.load() != SSLGlobalParams::SSLMode_disabled); @@ -216,27 +218,27 @@ again: sslGlobalParams.sslMode.load() != SSLGlobalParams::SSLMode_requireSSL); #endif // MONGO_SSL } - if ( static_cast<size_t>(len) < sizeof(MSGHEADER) || + if ( static_cast<size_t>(len) < sizeof(MSGHEADER::Value) || static_cast<size_t>(len) > MaxMessageSizeBytes ) { LOG(0) << "recv(): message len " << len << " is invalid. " - << "Min " << sizeof(MSGHEADER) << " Max: " << MaxMessageSizeBytes; + << "Min " << sizeof(MSGHEADER::Value) << " Max: " << MaxMessageSizeBytes; return false; } psock->setHandshakeReceived(); int z = (len+1023)&0xfffffc00; verify(z>=len); - MsgData *md = (MsgData *) malloc(z); - ScopeGuard guard = MakeGuard(free, md); - verify(md); + MsgData::View md = reinterpret_cast<char *>(malloc(z)); + ScopeGuard guard = MakeGuard(free, md.view2ptr()); + verify(md.view2ptr()); - memcpy(md, &header, headerLen); + memcpy(md.view2ptr(), &header, headerLen); int left = len - headerLen; - psock->recv( (char *)&md->_data, left ); + psock->recv( md.data(), left ); guard.Dismiss(); - m.setData(md, true); + m.setData(md.view2ptr(), true); return true; } @@ -251,7 +253,7 @@ again: } void MessagingPort::reply(Message& received, Message& response) { - say(/*received.from, */response, received.header()->id); + say(/*received.from, */response, received.header().getId()); } void MessagingPort::reply(Message& received, Message& response, MSGID responseTo) { @@ -272,13 +274,15 @@ again: return false; } //log() << "got response: " << response.data->responseTo << endl; - if ( response.header()->responseTo == toSend.header()->id ) + if ( response.header().getResponseTo() == toSend.header().getId() ) break; - error() << "MessagingPort::call() wrong id got:" << std::hex << (unsigned)response.header()->responseTo << " expect:" << (unsigned)toSend.header()->id << '\n' + error() << "MessagingPort::call() wrong id got:" + << std::hex << (unsigned)response.header().getResponseTo() + << " expect:" << (unsigned)toSend.header().getId() << '\n' << std::dec << " toSend op: " << (unsigned)toSend.operation() << '\n' - << " response msgid:" << (unsigned)response.header()->id << '\n' - << " response len: " << (unsigned)response.header()->len << '\n' + << " response msgid:" << (unsigned)response.header().getId() << '\n' + << " response len: " << (unsigned)response.header().getLen() << '\n' << " response op: " << response.operation() << '\n' << " remote: " << psock->remoteString(); verify(false); @@ -291,12 +295,12 @@ again: void MessagingPort::say(Message& toSend, int responseTo) { verify( !toSend.empty() ); mmm( log() << "* say() thr:" << GetCurrentThreadId() << endl; ) - toSend.header()->id = nextMessageId(); - toSend.header()->responseTo = responseTo; + toSend.header().setId(nextMessageId()); + toSend.header().setResponseTo(responseTo); if ( piggyBackData && piggyBackData->len() ) { mmm( log() << "* have piggy back" << endl; ) - if ( ( piggyBackData->len() + toSend.header()->len ) > 1300 ) { + if ( ( piggyBackData->len() + toSend.header().getLen() ) > 1300 ) { // won't fit in a packet - so just send it off piggyBackData->flush(); } @@ -312,15 +316,15 @@ again: void MessagingPort::piggyBack( Message& toSend , int responseTo ) { - if ( toSend.header()->len > 1300 ) { + if ( toSend.header().getLen() > 1300 ) { // not worth saving because its almost an entire packet say( toSend ); return; } // we're going to be storing this, so need to set it up - toSend.header()->id = nextMessageId(); - toSend.header()->responseTo = responseTo; + toSend.header().setId(nextMessageId()); + toSend.header().setResponseTo(responseTo); if ( ! piggyBackData ) piggyBackData = new PiggyBackData( this ); diff --git a/src/mongo/util/util.cpp b/src/mongo/util/util.cpp index ae004ee41b4..f510aa6c31a 100644 --- a/src/mongo/util/util.cpp +++ b/src/mongo/util/util.cpp @@ -86,9 +86,6 @@ namespace mongo { verify( endsWith("abcde", "de") ); verify( !endsWith("abcde", "dasdfasdfashkfde") ); - - verify( swapEndian(0x01020304) == 0x04030201 ); - } } utilTest; |