summaryrefslogtreecommitdiff
path: root/src/mongo/rpc/op_msg.h
diff options
context:
space:
mode:
authorAndrew Morrow <acm@mongodb.com>2018-04-18 18:28:13 -0400
committerAndrew Morrow <acm@mongodb.com>2018-05-05 12:29:55 -0400
commit51dce76324173089098e8c9fd09e46b98b32adc4 (patch)
tree1093a4866d1fc3e7da1cba2fa2ae4f56b536f033 /src/mongo/rpc/op_msg.h
parent30994f3bacb6e814ae015d83693c549a3b924ccc (diff)
downloadmongo-51dce76324173089098e8c9fd09e46b98b32adc4.tar.gz
SERVER-34805 Refactor the network libraries and move messages types to rpc/protocol
Diffstat (limited to 'src/mongo/rpc/op_msg.h')
-rw-r--r--src/mongo/rpc/op_msg.h313
1 files changed, 313 insertions, 0 deletions
diff --git a/src/mongo/rpc/op_msg.h b/src/mongo/rpc/op_msg.h
new file mode 100644
index 00000000000..ed7f1993d86
--- /dev/null
+++ b/src/mongo/rpc/op_msg.h
@@ -0,0 +1,313 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <string>
+#include <vector>
+
+#include "mongo/base/string_data.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/rpc/message.h"
+
+namespace mongo {
+
+struct OpMsg {
+ struct DocumentSequence {
+ std::string name;
+ std::vector<BSONObj> objs;
+ };
+
+ static constexpr uint32_t kChecksumPresent = 1 << 0;
+ static constexpr uint32_t kMoreToCome = 1 << 1;
+
+ /**
+ * Returns the unvalidated flags for the given message if it is an OP_MSG message.
+ * Returns 0 for other message kinds since they are the equivalent of no flags set.
+ * Throws if the message is too small to hold flags.
+ */
+ static uint32_t flags(const Message& message);
+ static bool isFlagSet(const Message& message, uint32_t flag) {
+ return flags(message) & flag;
+ }
+
+ /**
+ * Replaces the flags in message with the supplied flags.
+ * Only legal on an otherwise valid OP_MSG message.
+ */
+ static void replaceFlags(Message* message, uint32_t flags);
+
+ /**
+ * Adds flag to the list of set flags in message.
+ * Only legal on an otherwise valid OP_MSG message.
+ */
+ static void setFlag(Message* message, uint32_t flag) {
+ replaceFlags(message, flags(*message) | flag);
+ }
+
+ /**
+ * Parses and returns an OpMsg containing unowned BSON.
+ */
+ static OpMsg parse(const Message& message);
+
+ /**
+ * Parses and returns an OpMsg containing owned BSON.
+ */
+ static OpMsg parseOwned(const Message& message) {
+ auto msg = parse(message);
+ msg.shareOwnershipWith(message.sharedBuffer());
+ return msg;
+ }
+
+ Message serialize() const;
+
+ /**
+ * Makes all BSONObjs in this object share ownership with buffer.
+ */
+ void shareOwnershipWith(const ConstSharedBuffer& buffer);
+
+ /**
+ * Returns a pointer to the sequence with the given name or nullptr if there are none.
+ */
+ const DocumentSequence* getSequence(StringData name) const {
+ // Getting N sequences is technically O(N**2) but because there currently is at most 2
+ // sequences, this does either 1 or 2 comparisons. Consider making sequences a StringMap if
+ // there will be many sequences. This problem may also just go away with the IDL project.
+ auto it = std::find_if(
+ sequences.begin(), sequences.end(), [&](const auto& seq) { return seq.name == name; });
+ return it == sequences.end() ? nullptr : &*it;
+ }
+
+ BSONObj body;
+ std::vector<DocumentSequence> sequences;
+};
+
+/**
+ * An OpMsg that represents a request. This is a separate type from OpMsg only to provide better
+ * type-safety along with a place to hang request-specific methods.
+ */
+struct OpMsgRequest : public OpMsg {
+ // TODO in C++17 remove constructors so we can use aggregate initialization.
+ OpMsgRequest() = default;
+ explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {}
+
+ static OpMsgRequest parse(const Message& message) {
+ return OpMsgRequest(OpMsg::parse(message));
+ }
+
+ static OpMsgRequest fromDBAndBody(StringData db,
+ BSONObj body,
+ const BSONObj& extraFields = {}) {
+ OpMsgRequest request;
+ request.body = ([&] {
+ BSONObjBuilder bodyBuilder(std::move(body));
+ bodyBuilder.appendElements(extraFields);
+ bodyBuilder.append("$db", db);
+ return bodyBuilder.obj();
+ }());
+ return request;
+ }
+
+ StringData getDatabase() const {
+ if (auto elem = body["$db"])
+ return elem.checkAndGetStringData();
+ uasserted(40571, "OP_MSG requests require a $db argument");
+ }
+
+ StringData getCommandName() const {
+ return body.firstElementFieldName();
+ }
+
+ // DO NOT ADD MEMBERS! Since this type is essentially a strong typedef (see the class comment),
+ // it should not hold more data than an OpMsg. It should be freely interconvertible with OpMsg
+ // without issues like slicing.
+};
+
+/**
+ * Builds an OP_MSG message in-place in a Message buffer.
+ *
+ * While the OP_MSG format imposes no ordering of sections, in order to efficiently support our
+ * usage patterns, this class requires that all document sequences (if any) are built before the
+ * body. This allows repeatedly appending fields to the body until right before it is ready to be
+ * sent.
+ */
+class OpMsgBuilder {
+ MONGO_DISALLOW_COPYING(OpMsgBuilder);
+
+public:
+ OpMsgBuilder() {
+ skipHeaderAndFlags();
+ }
+
+ /**
+ * See the documentation for DocSequenceBuilder below.
+ */
+ class DocSequenceBuilder;
+ DocSequenceBuilder beginDocSequence(StringData name);
+
+ /**
+ * Returns an empty builder for the body.
+ * It is an error to call this if a body has already been begun. You must destroy or call
+ * done() on the returned builder before calling any methods on this object.
+ */
+ BSONObjBuilder beginBody();
+ void setBody(const BSONObj& body) {
+ beginBody().appendElements(body);
+ }
+
+ /**
+ * Returns a builder that can be used to append new fields to the body.
+ * It is an error to call this if beginBody() hasn't been called yet. It is an error to append
+ * elements with field names that already exist in the body. You must destroy or call done() on
+ * the returned builder before calling any methods on this object.
+ *
+ * TODO decide if it is worth keeping the begin/resume distinction in the public API.
+ */
+ BSONObjBuilder resumeBody();
+ void appendElementsToBody(const BSONObj& body) {
+ resumeBody().appendElements(body);
+ }
+
+ /**
+ * Finish building and return a Message ready to give to the networking layer for transmission.
+ * It is illegal to call any methods on this object after calling this.
+ */
+ Message finish();
+
+ /**
+ * Reset this object to its initial empty state. All previously appended data is lost.
+ */
+ void reset() {
+ invariant(!_openBuilder);
+
+ _buf.reset();
+ skipHeaderAndFlags();
+ _bodyStart = 0;
+ _state = kEmpty;
+ _openBuilder = false;
+ }
+
+ /**
+ * Set to true in tests that need to be able to generate duplicate top-level fields to see how
+ * the server handles them. Is false by default, although the check only happens in debug
+ * builds.
+ */
+ static AtomicBool disableDupeFieldCheck_forTest;
+
+private:
+ friend class DocSequenceBuilder;
+
+ enum State {
+ kEmpty,
+ kDocSequence,
+ kBody,
+ kDone,
+ };
+
+ void finishDocumentStream(DocSequenceBuilder* docSequenceBuilder);
+
+ void skipHeaderAndFlags() {
+ _buf.skip(sizeof(MSGHEADER::Layout)); // This is filled in by finish().
+ _buf.appendNum(uint32_t(0)); // flags (currently always 0).
+ }
+
+ // When adding members, remember to update reset().
+ BufBuilder _buf;
+ int _bodyStart = 0;
+ State _state = kEmpty;
+ bool _openBuilder = false;
+};
+
+/**
+ * Builds a document sequence in an OpMsgBuilder.
+ *
+ * Example:
+ *
+ * auto docSeq = msgBuilder.beginDocSequence("some.sequence");
+ *
+ * docSeq.append(BSON("a" << 1)); // Copy an obj into the sequence
+ *
+ * auto bob = docSeq.appendBuilder(); // Build an obj in-place
+ * bob.append("a", 2);
+ * bob.doneFast();
+ *
+ * docSeq.done(); // Or just let it go out of scope.
+ */
+class OpMsgBuilder::DocSequenceBuilder {
+ MONGO_DISALLOW_COPYING(DocSequenceBuilder);
+
+public:
+ DocSequenceBuilder(DocSequenceBuilder&& other)
+ : _buf(other._buf), _msgBuilder(other._msgBuilder), _sizeOffset(other._sizeOffset) {
+ other._buf = nullptr;
+ }
+
+ ~DocSequenceBuilder() {
+ if (_buf)
+ done();
+ }
+
+ /**
+ * Indicates that the caller is done with this stream prior to destruction.
+ * Following this call, it is illegal to call any methods on this object.
+ */
+ void done() {
+ invariant(_buf);
+ _msgBuilder->finishDocumentStream(this);
+ _buf = nullptr;
+ }
+
+ /**
+ * Appends a single document to this sequence.
+ */
+ void append(const BSONObj& obj) {
+ _buf->appendBuf(obj.objdata(), obj.objsize());
+ }
+
+ /**
+ * Returns a BSONObjBuilder that appends a single document to this sequence in place.
+ * It is illegal to call any methods on this DocSequenceBuilder until the returned builder
+ * is destroyed or done()/doneFast() is called on it.
+ */
+ BSONObjBuilder appendBuilder() {
+ return BSONObjBuilder(*_buf);
+ }
+
+private:
+ friend OpMsgBuilder;
+
+ DocSequenceBuilder(OpMsgBuilder* msgBuilder, BufBuilder* buf, int sizeOffset)
+ : _buf(buf), _msgBuilder(msgBuilder), _sizeOffset(sizeOffset) {}
+
+ BufBuilder* _buf;
+ OpMsgBuilder* const _msgBuilder;
+ const int _sizeOffset;
+};
+
+} // namespace mongo