summaryrefslogtreecommitdiff
path: root/src/mongo/rpc/message.h
diff options
context:
space:
mode:
authorAndrew Morrow <acm@mongodb.com>2018-04-18 18:28:13 -0400
committerAndrew Morrow <acm@mongodb.com>2018-05-05 12:29:55 -0400
commit51dce76324173089098e8c9fd09e46b98b32adc4 (patch)
tree1093a4866d1fc3e7da1cba2fa2ae4f56b536f033 /src/mongo/rpc/message.h
parent30994f3bacb6e814ae015d83693c549a3b924ccc (diff)
downloadmongo-51dce76324173089098e8c9fd09e46b98b32adc4.tar.gz
SERVER-34805 Refactor the network libraries and move messages types to rpc/protocol
Diffstat (limited to 'src/mongo/rpc/message.h')
-rw-r--r--src/mongo/rpc/message.h478
1 files changed, 478 insertions, 0 deletions
diff --git a/src/mongo/rpc/message.h b/src/mongo/rpc/message.h
new file mode 100644
index 00000000000..a2fe2ec8c9d
--- /dev/null
+++ b/src/mongo/rpc/message.h
@@ -0,0 +1,478 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <cstdint>
+
+#include "mongo/base/data_type_endian.h"
+#include "mongo/base/data_view.h"
+#include "mongo/base/encoded_value_storage.h"
+#include "mongo/base/static_assert.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+/**
+ * Maximum accepted message size on the wire protocol.
+ */
+const size_t MaxMessageSizeBytes = 48 * 1000 * 1000;
+
+enum NetworkOp : int32_t {
+ opInvalid = 0,
+ opReply = 1, /* reply. responseTo is set. */
+ dbUpdate = 2001, /* update object */
+ dbInsert = 2002,
+ // dbGetByOID = 2003,
+ dbQuery = 2004,
+ dbGetMore = 2005,
+ dbDelete = 2006,
+ dbKillCursors = 2007,
+ // dbCommand_DEPRECATED = 2008, //
+ // dbCommandReply_DEPRECATED = 2009, //
+ dbCommand = 2010,
+ dbCommandReply = 2011,
+ dbCompressed = 2012,
+ dbMsg = 2013,
+};
+
+inline bool isSupportedRequestNetworkOp(NetworkOp op) {
+ switch (op) {
+ case dbUpdate:
+ case dbInsert:
+ case dbQuery:
+ case dbGetMore:
+ case dbDelete:
+ case dbKillCursors:
+ case dbCommand:
+ case dbCompressed:
+ case dbMsg:
+ return true;
+ case dbCommandReply:
+ case opReply:
+ default:
+ return false;
+ }
+}
+
+enum class LogicalOp {
+ opInvalid,
+ opUpdate,
+ opInsert,
+ opQuery,
+ opGetMore,
+ opDelete,
+ opKillCursors,
+ opCommand,
+ opCompressed,
+};
+
+inline LogicalOp networkOpToLogicalOp(NetworkOp networkOp) {
+ switch (networkOp) {
+ case dbUpdate:
+ return LogicalOp::opUpdate;
+ case dbInsert:
+ return LogicalOp::opInsert;
+ case dbQuery:
+ return LogicalOp::opQuery;
+ case dbGetMore:
+ return LogicalOp::opGetMore;
+ case dbDelete:
+ return LogicalOp::opDelete;
+ case dbKillCursors:
+ return LogicalOp::opKillCursors;
+ case dbMsg:
+ case dbCommand:
+ return LogicalOp::opCommand;
+ case dbCompressed:
+ return LogicalOp::opCompressed;
+ default:
+ int op = int(networkOp);
+ massert(34348, str::stream() << "cannot translate opcode " << op, !op);
+ return LogicalOp::opInvalid;
+ }
+}
+
+inline const char* networkOpToString(NetworkOp networkOp) {
+ switch (networkOp) {
+ case opInvalid:
+ return "none";
+ case opReply:
+ return "reply";
+ case dbUpdate:
+ return "update";
+ case dbInsert:
+ return "insert";
+ case dbQuery:
+ return "query";
+ case dbGetMore:
+ return "getmore";
+ case dbDelete:
+ return "remove";
+ case dbKillCursors:
+ return "killcursors";
+ case dbCommand:
+ return "command";
+ case dbCommandReply:
+ return "commandReply";
+ case dbCompressed:
+ return "compressed";
+ case dbMsg:
+ return "msg";
+ default:
+ int op = static_cast<int>(networkOp);
+ massert(16141, str::stream() << "cannot translate opcode " << op, !op);
+ return "";
+ }
+}
+
+inline const char* logicalOpToString(LogicalOp logicalOp) {
+ switch (logicalOp) {
+ case LogicalOp::opInvalid:
+ return "none";
+ case LogicalOp::opUpdate:
+ return "update";
+ case LogicalOp::opInsert:
+ return "insert";
+ case LogicalOp::opQuery:
+ return "query";
+ case LogicalOp::opGetMore:
+ return "getmore";
+ case LogicalOp::opDelete:
+ return "remove";
+ case LogicalOp::opKillCursors:
+ return "killcursors";
+ case LogicalOp::opCommand:
+ return "command";
+ case LogicalOp::opCompressed:
+ return "compressed";
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
+namespace MSGHEADER {
+
+#pragma pack(1)
+/**
+ * See http://dochub.mongodb.org/core/mongowireprotocol
+ */
+struct Layout {
+ int32_t messageLength; // total message size, including this
+ int32_t requestID; // identifier for this message
+ int32_t responseTo; // requestID from the original request
+ // (used in responses from db)
+ int32_t opCode;
+};
+#pragma pack()
+
+class ConstView {
+public:
+ typedef ConstDataView view_type;
+
+ ConstView(const char* data) : _data(data) {}
+
+ const char* view2ptr() const {
+ return data().view();
+ }
+
+ int32_t getMessageLength() const {
+ return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength));
+ }
+
+ int32_t getRequestMsgId() const {
+ return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID));
+ }
+
+ int32_t getResponseToMsgId() const {
+ return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo));
+ }
+
+ int32_t getOpCode() const {
+ return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode));
+ }
+
+protected:
+ const view_type& data() const {
+ return _data;
+ }
+
+private:
+ view_type _data;
+};
+
+class View : public ConstView {
+public:
+ typedef DataView view_type;
+
+ View(char* data) : ConstView(data) {}
+
+ using ConstView::view2ptr;
+ char* view2ptr() {
+ return data().view();
+ }
+
+ void setMessageLength(int32_t value) {
+ data().write(tagLittleEndian(value), offsetof(Layout, messageLength));
+ }
+
+ void setRequestMsgId(int32_t value) {
+ data().write(tagLittleEndian(value), offsetof(Layout, requestID));
+ }
+
+ void setResponseToMsgId(int32_t value) {
+ data().write(tagLittleEndian(value), offsetof(Layout, responseTo));
+ }
+
+ void setOpCode(int32_t value) {
+ data().write(tagLittleEndian(value), offsetof(Layout, opCode));
+ }
+
+private:
+ view_type data() const {
+ return const_cast<char*>(ConstView::view2ptr());
+ }
+};
+
+class Value : public EncodedValueStorage<Layout, ConstView, View> {
+public:
+ Value() {
+ MONGO_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
+ }
+
+ Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {}
+};
+
+} // namespace MSGHEADER
+
+namespace MsgData {
+
+#pragma pack(1)
+struct Layout {
+ MSGHEADER::Layout header;
+ char data[4];
+};
+#pragma pack()
+
+class ConstView {
+public:
+ ConstView(const char* storage) : _storage(storage) {}
+
+ const char* view2ptr() const {
+ return storage().view();
+ }
+
+ int32_t getLen() const {
+ return header().getMessageLength();
+ }
+
+ int32_t getId() const {
+ return header().getRequestMsgId();
+ }
+
+ int32_t getResponseToMsgId() const {
+ return header().getResponseToMsgId();
+ }
+
+ NetworkOp getNetworkOp() const {
+ return NetworkOp(header().getOpCode());
+ }
+
+ const char* data() const {
+ return storage().view(offsetof(Layout, data));
+ }
+
+ bool valid() const {
+ if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize))
+ return false;
+ if (getNetworkOp() < 0 || getNetworkOp() > 30000)
+ return false;
+ return true;
+ }
+
+ int64_t getCursor() const {
+ verify(getResponseToMsgId() > 0);
+ verify(getNetworkOp() == opReply);
+ return ConstDataView(data() + sizeof(int32_t)).read<LittleEndian<int64_t>>();
+ }
+
+ int dataLen() const; // len without header
+
+protected:
+ const ConstDataView& storage() const {
+ return _storage;
+ }
+
+ MSGHEADER::ConstView header() const {
+ return storage().view(offsetof(Layout, header));
+ }
+
+private:
+ ConstDataView _storage;
+};
+
+class View : public ConstView {
+public:
+ View(char* storage) : ConstView(storage) {}
+
+ using ConstView::view2ptr;
+ char* view2ptr() {
+ return storage().view();
+ }
+
+ void setLen(int value) {
+ return header().setMessageLength(value);
+ }
+
+ void setId(int32_t value) {
+ return header().setRequestMsgId(value);
+ }
+
+ void setResponseToMsgId(int32_t value) {
+ return header().setResponseToMsgId(value);
+ }
+
+ void setOperation(int value) {
+ return header().setOpCode(value);
+ }
+
+ using ConstView::data;
+ char* data() {
+ return storage().view(offsetof(Layout, data));
+ }
+
+private:
+ DataView storage() const {
+ return const_cast<char*>(ConstView::view2ptr());
+ }
+
+ MSGHEADER::View header() const {
+ return storage().view(offsetof(Layout, header));
+ }
+};
+
+class Value : public EncodedValueStorage<Layout, ConstView, View> {
+public:
+ Value() {
+ MONGO_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
+ }
+
+ Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {}
+};
+
+const int MsgDataHeaderSize = sizeof(Value) - 4;
+
+inline int ConstView::dataLen() const {
+ return getLen() - MsgDataHeaderSize;
+}
+
+} // namespace MsgData
+
+class Message {
+public:
+ Message() = default;
+ explicit Message(SharedBuffer data) : _buf(std::move(data)) {}
+
+ MsgData::View header() const {
+ verify(!empty());
+ return _buf.get();
+ }
+
+ NetworkOp operation() const {
+ return header().getNetworkOp();
+ }
+
+ MsgData::View singleData() const {
+ massert(13273, "single data buffer expected", _buf);
+ return header();
+ }
+
+ bool empty() const {
+ return !_buf;
+ }
+
+ int size() const {
+ if (_buf) {
+ return MsgData::ConstView(_buf.get()).getLen();
+ }
+ return 0;
+ }
+
+ int dataSize() const {
+ return size() - sizeof(MSGHEADER::Value);
+ }
+
+ void reset() {
+ _buf = {};
+ }
+
+ // use to set first buffer if empty
+ void setData(SharedBuffer buf) {
+ verify(empty());
+ _buf = std::move(buf);
+ }
+ void setData(int operation, const char* msgtxt) {
+ setData(operation, msgtxt, strlen(msgtxt) + 1);
+ }
+ void setData(int operation, const char* msgdata, size_t len) {
+ verify(empty());
+ size_t dataLen = len + sizeof(MsgData::Value) - 4;
+ _buf = SharedBuffer::allocate(dataLen);
+ MsgData::View d = _buf.get();
+ if (len)
+ memcpy(d.data(), msgdata, len);
+ d.setLen(dataLen);
+ d.setOperation(operation);
+ }
+
+ char* buf() {
+ return _buf.get();
+ }
+
+ const char* buf() const {
+ return _buf.get();
+ }
+
+ SharedBuffer sharedBuffer() {
+ return _buf;
+ }
+
+ ConstSharedBuffer sharedBuffer() const {
+ return _buf;
+ }
+
+private:
+ SharedBuffer _buf;
+};
+
+/**
+ * Returns an always incrementing value to be used to assign to the next received network message.
+ */
+int32_t nextMessageId();
+
+} // namespace mongo