// dbmessage.h /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include "mongo/bson/bson_validate.h" #include "mongo/client/constants.h" #include "mongo/db/jsobj.h" #include "mongo/db/server_options.h" #include "mongo/util/net/message.h" #include "mongo/util/net/message_port.h" namespace mongo { /* db response format Query or GetMore: // see struct QueryResult int resultFlags; int64 cursorID; int startingFrom; int nReturned; list of marshalled JSObjects; */ /* db request message format unsigned opid; // arbitary; will be echoed back byte operation; int options; then for: dbInsert: std::string collection; a series of JSObjects dbDelete: std::string collection; int flags=0; // 1=DeleteSingle JSObject query; dbUpdate: std::string collection; int flags; // 1=upsert JSObject query; JSObject objectToUpdate; objectToUpdate may include { $inc: } or { $set: ... }, see struct Mod. dbQuery: std::string collection; int nToSkip; int nToReturn; // how many you want back as the beginning of the cursor data (0=no limit) // greater than zero is simply a hint on how many objects to send back per "cursor batch". // a negative number indicates a hard limit. JSObject query; [JSObject fieldsToReturn] dbGetMore: std::string collection; // redundant, might use for security. int nToReturn; int64 cursorID; dbKillCursors=2007: int n; int64 cursorIDs[n]; Note that on Update, there is only one object, which is different from insert where you can pass a list of objects to insert in the db. Note that the update field layout is very similar layout to Query. */ #pragma pack(1) struct QueryResult : public MsgData { long long cursorId; int startingFrom; int nReturned; const char *data() { return (char *) (((int *)&nReturned)+1); } int resultFlags() { return dataAsInt(); } int& _resultFlags() { return dataAsInt(); } void setResultFlagsToOk() { _resultFlags() = ResultFlag_AwaitCapable; } void initializeResultFlags() { _resultFlags() = 0; } }; #pragma pack() /* For the database/server protocol, these objects and functions encapsulate the various messages transmitted over the connection. See http://dochub.mongodb.org/core/mongowireprotocol */ class DbMessage { public: DbMessage(const Message& _m) : m(_m) , mark(0) { // for received messages, Message has only one buffer theEnd = _m.singleData()->_data + _m.header()->dataLen(); char *r = _m.singleData()->_data; reserved = (int *) r; data = r + 4; nextjsobj = data; } /** the 32 bit field before the ns * track all bit usage here as its cross op * 0: InsertOption_ContinueOnError * 1: fromWriteback */ int& reservedField() { return *reserved; } const char * getns() const { return data; } const char * afterNS() const { return data + strlen( data ) + 1; } int getInt( int num ) const { const int * foo = (const int*)afterNS(); return foo[num]; } int getQueryNToReturn() const { return getInt( 1 ); } /** * get an int64 at specified offsetBytes after ns */ long long getInt64( int offsetBytes ) const { const char * x = afterNS(); x += offsetBytes; const long long * ll = (const long long*)x; return ll[0]; } void resetPull() { nextjsobj = data; } int pullInt() const { return pullInt(); } int& pullInt() { if ( nextjsobj == data ) nextjsobj += strlen(data) + 1; // skip namespace int& i = *((int *)nextjsobj); nextjsobj += 4; return i; } long long pullInt64() const { return pullInt64(); } long long &pullInt64() { if ( nextjsobj == data ) nextjsobj += strlen(data) + 1; // skip namespace long long &i = *((long long *)nextjsobj); nextjsobj += 8; return i; } OID* getOID() const { return (OID *) (data + strlen(data) + 1); // skip namespace } void getQueryStuff(const char *&query, int& ntoreturn) { int *i = (int *) (data + strlen(data) + 1); ntoreturn = *i; i++; query = (const char *) i; } /* for insert and update msgs */ bool moreJSObjs() const { return nextjsobj != 0; } BSONObj nextJsObj() { if ( nextjsobj == data ) { nextjsobj += strlen(data) + 1; // skip namespace massert( 13066 , "Message contains no documents", theEnd > nextjsobj ); } massert( 10304, "Client Error: Remaining data too small for BSON object", theEnd - nextjsobj >= 5 ); if (serverGlobalParams.objcheck) { Status status = validateBSON( nextjsobj, theEnd - nextjsobj ); massert( 10307, str::stream() << "Client Error: bad object in message: " << status.reason(), status.isOK() ); } BSONObj js(nextjsobj); verify( js.objsize() >= 5 ); verify( js.objsize() < ( theEnd - data ) ); nextjsobj += js.objsize(); if ( nextjsobj >= theEnd ) nextjsobj = 0; return js; } const Message& msg() const { return m; } const char * markGet() { return nextjsobj; } void markSet() { mark = nextjsobj; } void markReset( const char * toMark = 0) { if( toMark == 0 ) toMark = mark; verify( toMark ); nextjsobj = toMark; } private: const Message& m; int* reserved; const char *data; const char *nextjsobj; const char *theEnd; const char * mark; }; /* a request to run a query, received from the database */ class QueryMessage { public: const char *ns; int ntoskip; int ntoreturn; int queryOptions; BSONObj query; BSONObj fields; /** * parses the message into the above fields * Warning: constructor mutates DbMessage. */ QueryMessage(DbMessage& d) { ns = d.getns(); ntoskip = d.pullInt(); ntoreturn = d.pullInt(); query = d.nextJsObj(); if ( d.moreJSObjs() ) { fields = d.nextJsObj(); } queryOptions = d.msg().header()->dataAsInt(); } }; /** * A response to a DbMessage. */ struct DbResponse { Message *response; MSGID responseTo; std::string exhaustNS; /* points to ns if exhaust mode. 0=normal mode*/ DbResponse(Message *r, MSGID rt) : response(r), responseTo(rt){ } DbResponse() { response = 0; } ~DbResponse() { delete response; } }; void replyToQuery(int queryResultFlags, AbstractMessagingPort* p, Message& requestMsg, void *data, int size, int nReturned, int startingFrom = 0, long long cursorId = 0 ); /* object reply helper. */ void replyToQuery(int queryResultFlags, AbstractMessagingPort* p, Message& requestMsg, const BSONObj& responseObj); /* helper to do a reply using a DbResponse object */ void replyToQuery( int queryResultFlags, Message& m, DbResponse& dbresponse, BSONObj obj ); /** * Helper method for setting up a response object. * * @param queryResultFlags The flags to set to the response object. * @param response The object to be used for building the response. The internal buffer of * this object will contain the raw data from resultObj after a successful call. * @param resultObj The bson object that contains the reply data. */ void replyToQuery( int queryResultFlags, Message& response, const BSONObj& resultObj ); } // namespace mongo