// dbmessage.cpp /* Copyright 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/dbmessage.h" #include "mongo/db/operation_context.h" #include "mongo/platform/strnlen.h" #include "mongo/rpc/object_check.h" #include "mongo/transport/session.h" namespace mongo { using std::string; using std::stringstream; string Message::toString() const { stringstream ss; ss << "op: " << networkOpToString(operation()) << " len: " << size(); if (operation() >= 2000 && operation() < 2100) { DbMessage d(*this); ss << " ns: " << d.getns(); switch (operation()) { case dbUpdate: { int flags = d.pullInt(); BSONObj q = d.nextJsObj(); BSONObj o = d.nextJsObj(); ss << " flags: " << flags << " query: " << q << " update: " << o; break; } case dbInsert: ss << d.nextJsObj(); break; case dbDelete: { int flags = d.pullInt(); BSONObj q = d.nextJsObj(); ss << " flags: " << flags << " query: " << q; break; } default: ss << " CANNOT HANDLE YET"; } } return ss.str(); } DbMessage::DbMessage(const Message& msg) : _msg(msg), _nsStart(NULL), _mark(NULL), _nsLen(0) { // for received messages, Message has only one buffer _theEnd = _msg.singleData().data() + _msg.singleData().dataLen(); _nextjsobj = _msg.singleData().data(); _reserved = readAndAdvance(); // Read packet for NS if (messageShouldHaveNs()) { // Limit = buffer size of message - // (first int4 in message which is either flags or a zero constant) size_t limit = _msg.singleData().dataLen() - sizeof(int); _nsStart = _nextjsobj; _nsLen = strnlen(_nsStart, limit); // Validate there is room for a null byte in the buffer // Strings can be zero length uassert(18633, "Failed to parse ns string", _nsLen < limit); _nextjsobj += _nsLen + 1; // skip namespace + null } } const char* DbMessage::getns() const { verify(messageShouldHaveNs()); return _nsStart; } int DbMessage::getQueryNToReturn() const { verify(messageShouldHaveNs()); const char* p = _nsStart + _nsLen + 1; checkRead(p, 2); return ConstDataView(p).read>(sizeof(int32_t)); } int DbMessage::pullInt() { return readAndAdvance(); } long long DbMessage::pullInt64() { return readAndAdvance(); } const char* DbMessage::getArray(size_t count) const { checkRead(_nextjsobj, count); return _nextjsobj; } BSONObj DbMessage::nextJsObj() { uassert(ErrorCodes::InvalidBSON, "Client Error: Remaining data too small for BSON object", _nextjsobj != NULL && _theEnd - _nextjsobj >= 5); if (serverGlobalParams.objcheck) { Status status = validateBSON( _nextjsobj, _theEnd - _nextjsobj, Validator::enabledBSONVersion()); uassert(ErrorCodes::InvalidBSON, str::stream() << "Client Error: bad object in message: " << status.reason(), status.isOK()); } BSONObj js(_nextjsobj); verify(js.objsize() >= 5); verify(js.objsize() <= (_theEnd - _nextjsobj)); _nextjsobj += js.objsize(); if (_nextjsobj >= _theEnd) _nextjsobj = NULL; return js; } void DbMessage::markReset(const char* toMark = NULL) { if (toMark == NULL) { toMark = _mark; } verify(toMark); _nextjsobj = toMark; } template void DbMessage::checkRead(const char* start, size_t count) const { if ((_theEnd - start) < static_cast(sizeof(T) * count)) { uassert(18634, "Not enough data to read", false); } } template T DbMessage::read() const { checkRead(_nextjsobj, 1); return ConstDataView(_nextjsobj).read>(); } template T DbMessage::readAndAdvance() { T t = read(); _nextjsobj += sizeof(T); return t; } OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) { _buffer.skip(sizeof(QueryResult::Value)); } void OpQueryReplyBuilder::send(const transport::SessionHandle& session, int queryResultFlags, const Message& requestMsg, int nReturned, int startingFrom, long long cursorId) { Message response; putInMessage(&response, queryResultFlags, nReturned, startingFrom, cursorId); response.header().setId(nextMessageId()); response.header().setResponseToMsgId(requestMsg.header().getId()); uassertStatusOK(session->sinkMessage(response).wait()); } void OpQueryReplyBuilder::sendCommandReply(const transport::SessionHandle& session, const Message& requestMsg) { send(session, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1); } void OpQueryReplyBuilder::putInMessage( Message* out, int queryResultFlags, int nReturned, int startingFrom, long long cursorId) { QueryResult::View qr = _buffer.buf(); qr.setResultFlags(queryResultFlags); qr.msgdata().setLen(_buffer.len()); qr.msgdata().setOperation(opReply); qr.setCursorId(cursorId); qr.setStartingFrom(startingFrom); qr.setNReturned(nReturned); out->setData(_buffer.release()); // transport will free } void replyToQuery(int queryResultFlags, const transport::SessionHandle& session, const Message& requestMsg, const void* data, int size, int nReturned, int startingFrom, long long cursorId) { OpQueryReplyBuilder reply; reply.bufBuilderForResults().appendBuf(data, size); reply.send(session, queryResultFlags, requestMsg, nReturned, startingFrom, cursorId); } void replyToQuery(int queryResultFlags, const transport::SessionHandle& session, const Message& requestMsg, const BSONObj& responseObj) { replyToQuery(queryResultFlags, session, requestMsg, (void*)responseObj.objdata(), responseObj.objsize(), 1); } void replyToQuery(int queryResultFlags, const Message& m, DbResponse& dbresponse, BSONObj obj) { Message resp; replyToQuery(queryResultFlags, resp, obj); dbresponse.response = std::move(resp); dbresponse.responseToMsgId = m.header().getId(); } void replyToQuery(int queryResultFlags, Message& response, const BSONObj& resultObj) { OpQueryReplyBuilder reply; resultObj.appendSelfToBufBuilder(reply.bufBuilderForResults()); reply.putInMessage(&response, queryResultFlags, /*nReturned*/ 1); } }