summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-06-06 14:08:29 -0400
committerMathias Stearn <mathias@10gen.com>2016-06-22 16:04:36 -0400
commite508ddcb51eec941ae50d9c2efb06b601811dc19 (patch)
treecab51e8665c29d8220b64e7f69a8bbc592ca5339
parent40f20eca105a5e06a72df583ac654f946e9b058e (diff)
downloadmongo-e508ddcb51eec941ae50d9c2efb06b601811dc19.tar.gz
SERVER-24418 Make Message and BufBuilder use SharedBuffer for memory management
This makes it possible to get owned BSONObj out of a Message without copying. Hooked up to the following places: - Anything using the Fetcher (including oplog fetching on secondaries) - Anything using DBClientInterface::findOne() - Anything using CursorResponse (including Sharded queries) As a simplification, Messages no longer support non-contiguous buffers, or non-owning buffers. The former wasn't used by anything, and the latter was only used by mongosniff only for messages that fit in a single packet.
-rw-r--r--src/mongo/bson/bson_obj_test.cpp14
-rw-r--r--src/mongo/bson/bsonobj.cpp6
-rw-r--r--src/mongo/bson/bsonobj.h39
-rw-r--r--src/mongo/bson/bsonobjbuilder.h24
-rw-r--r--src/mongo/bson/util/builder.h132
-rw-r--r--src/mongo/client/dbclient.cpp2
-rw-r--r--src/mongo/client/dbclientcursor.h5
-rw-r--r--src/mongo/client/fetcher.cpp7
-rw-r--r--src/mongo/db/db.cpp8
-rw-r--r--src/mongo/db/dbdirectclient.cpp3
-rw-r--r--src/mongo/db/dbmessage.cpp3
-rw-r--r--src/mongo/db/instance.cpp10
-rw-r--r--src/mongo/db/query/cursor_response.cpp8
-rw-r--r--src/mongo/db/query/find.cpp28
-rw-r--r--src/mongo/db/query/find.h13
-rw-r--r--src/mongo/executor/async_mock_stream_factory.cpp3
-rw-r--r--src/mongo/executor/network_interface_asio_command.cpp2
-rw-r--r--src/mongo/executor/network_interface_asio_test.cpp4
-rw-r--r--src/mongo/executor/remote_command_response.h26
-rw-r--r--src/mongo/rpc/command_reply.cpp2
-rw-r--r--src/mongo/rpc/command_reply_builder.cpp3
-rw-r--r--src/mongo/rpc/command_request_builder.cpp3
-rw-r--r--src/mongo/rpc/legacy_reply_builder.cpp3
-rw-r--r--src/mongo/rpc/legacy_request_builder.cpp3
-rw-r--r--src/mongo/s/query/router_exec_stage.h4
-rw-r--r--src/mongo/tools/sniffer.cpp15
-rw-r--r--src/mongo/transport/service_entry_point_mock.cpp3
-rw-r--r--src/mongo/transport/service_entry_point_mock.h2
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.cpp3
-rw-r--r--src/mongo/util/duration.cpp24
-rw-r--r--src/mongo/util/net/asio_message_port.cpp12
-rw-r--r--src/mongo/util/net/message.h147
-rw-r--r--src/mongo/util/net/message_port.cpp11
-rw-r--r--src/mongo/util/shared_buffer.h92
34 files changed, 328 insertions, 336 deletions
diff --git a/src/mongo/bson/bson_obj_test.cpp b/src/mongo/bson/bson_obj_test.cpp
index 843dfb13829..f59f74ebd82 100644
--- a/src/mongo/bson/bson_obj_test.cpp
+++ b/src/mongo/bson/bson_obj_test.cpp
@@ -540,5 +540,19 @@ TEST(BSONObj, getFieldsWithDuplicates) {
ASSERT_EQUALS(fields[1].str(), "3");
}
+TEST(BSONObj, ShareOwnershipWith) {
+ BSONObj obj;
+ {
+ BSONObj tmp = BSON("sub" << BSON("a" << 1));
+ obj = tmp["sub"].Obj();
+ obj.shareOwnershipWith(tmp);
+ ASSERT(obj.isOwned());
+ }
+
+ // Now that tmp is out of scope, if obj didn't retain ownership, it would be accessing free'd
+ // memory which should error on ASAN and debug builds.
+ ASSERT(obj.isOwned());
+ ASSERT_EQ(obj, BSON("a" << 1));
+}
} // unnamed namespace
diff --git a/src/mongo/bson/bsonobj.cpp b/src/mongo/bson/bsonobj.cpp
index d9ba4143607..30b98e08525 100644
--- a/src/mongo/bson/bsonobj.cpp
+++ b/src/mongo/bson/bsonobj.cpp
@@ -76,9 +76,9 @@ void BSONObj::_assertInvalid() const {
}
BSONObj BSONObj::copy() const {
- char* storage = static_cast<char*>(mongoMalloc(sizeof(Holder) + objsize()));
- memcpy(storage + sizeof(Holder), objdata(), objsize());
- return BSONObj::takeOwnership(storage);
+ auto storage = SharedBuffer::allocate(objsize());
+ memcpy(storage.get(), objdata(), objsize());
+ return BSONObj(std::move(storage));
}
BSONObj BSONObj::getOwned() const {
diff --git a/src/mongo/bson/bsonobj.h b/src/mongo/bson/bsonobj.h
index acd103edf2e..d7a801eb966 100644
--- a/src/mongo/bson/bsonobj.h
+++ b/src/mongo/bson/bsonobj.h
@@ -114,7 +114,7 @@ public:
init(bsonData);
}
- explicit BSONObj(SharedBuffer ownedBuffer)
+ explicit BSONObj(ConstSharedBuffer ownedBuffer)
: _objdata(ownedBuffer.get() ? ownedBuffer.get() : BSONObj().objdata()),
_ownedBuffer(std::move(ownedBuffer)) {}
@@ -175,7 +175,27 @@ public:
@return true if this is in owned mode
*/
bool isOwned() const {
- return _ownedBuffer.get() != 0;
+ return bool(_ownedBuffer);
+ }
+
+ /**
+ * Share ownership with another object.
+ *
+ * It is the callers responsibility to ensure that the other object is owned and contains the
+ * data this BSONObj is viewing. This can happen if this is a subobject or sibling object
+ * contained in a larger buffer.
+ */
+ void shareOwnershipWith(ConstSharedBuffer buffer) {
+ invariant(buffer);
+ _ownedBuffer = buffer;
+ }
+ void shareOwnershipWith(const BSONObj& other) {
+ shareOwnershipWith(other.sharedBuffer());
+ }
+
+ ConstSharedBuffer sharedBuffer() const {
+ invariant(isOwned());
+ return _ownedBuffer;
}
/** If the data buffer is under the control of this BSONObj, return it.
@@ -540,19 +560,6 @@ public:
template <typename T>
bool coerceVector(std::vector<T>* out) const;
- typedef SharedBuffer::Holder Holder;
-
- /** Given a pointer to a region of un-owned memory containing BSON data, prefixed by
- * sufficient space for a BSONObj::Holder object, return a BSONObj that owns the
- * memory.
- *
- * This class will call free(holderPrefixedData), so it must have been allocated in a way
- * that makes that valid.
- */
- static BSONObj takeOwnership(char* holderPrefixedData) {
- return BSONObj(SharedBuffer::takeOwnership(holderPrefixedData));
- }
-
/// members for Sorter
struct SorterDeserializeSettings {}; // unused
void serializeForSorter(BufBuilder& buf) const {
@@ -586,7 +593,7 @@ private:
Status _okForStorage(bool root, bool deep) const;
const char* _objdata;
- SharedBuffer _ownedBuffer;
+ ConstSharedBuffer _ownedBuffer;
};
std::ostream& operator<<(std::ostream& s, const BSONObj& o);
diff --git a/src/mongo/bson/bsonobjbuilder.h b/src/mongo/bson/bsonobjbuilder.h
index 10b8a93b80e..224deb1ee6d 100644
--- a/src/mongo/bson/bsonobjbuilder.h
+++ b/src/mongo/bson/bsonobjbuilder.h
@@ -65,15 +65,8 @@ class BSONObjBuilder {
public:
/** @param initsize this is just a hint as to the final size of the object */
BSONObjBuilder(int initsize = 512)
- : _b(_buf),
- _buf(sizeof(BSONObj::Holder) + initsize),
- _offset(sizeof(BSONObj::Holder)),
- _s(this),
- _tracker(0),
- _doneCalled(false) {
- // Skip over space for a holder object at the beginning of the buffer, followed by
- // space for the object length. The length is filled in by _done.
- _b.skip(sizeof(BSONObj::Holder));
+ : _b(_buf), _buf(initsize), _offset(0), _s(this), _tracker(0), _doneCalled(false) {
+ // Skip over space for the object length. The length is filled in by _done.
_b.skip(sizeof(int));
// Reserve space for the EOO byte. This means _done() can't fail.
@@ -119,13 +112,12 @@ public:
BSONObjBuilder(const BSONSizeTracker& tracker)
: _b(_buf),
- _buf(sizeof(BSONObj::Holder) + tracker.getSize()),
- _offset(sizeof(BSONObj::Holder)),
+ _buf(tracker.getSize()),
+ _offset(0),
_s(this),
_tracker(const_cast<BSONSizeTracker*>(&tracker)),
_doneCalled(false) {
// See the comments in the first constructor for details.
- _b.skip(sizeof(BSONObj::Holder));
_b.skip(sizeof(int));
// Reserve space for the EOO byte. This means _done() can't fail.
@@ -639,9 +631,7 @@ public:
BSONObj obj() {
massert(10335, "builder does not own memory", owned());
doneFast();
- char* buf = _b.buf();
- decouple();
- return BSONObj::takeOwnership(buf);
+ return BSONObj(_b.release());
}
/** Fetch the object we have built.
@@ -681,10 +671,6 @@ public:
_doneCalled = true;
}
- void decouple() {
- _b.decouple(); // post done() call version. be sure jsobj frees...
- }
-
void appendKeys(const BSONObj& keyPattern, const BSONObj& values);
static std::string numStr(int i) {
diff --git a/src/mongo/bson/util/builder.h b/src/mongo/bson/util/builder.h
index 53ad7d25ebe..2cce0d93ad5 100644
--- a/src/mongo/bson/util/builder.h
+++ b/src/mongo/bson/util/builder.h
@@ -39,12 +39,14 @@
#include "mongo/base/data_type_endian.h"
#include "mongo/base/data_view.h"
+#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/inline_decls.h"
#include "mongo/platform/decimal128.h"
#include "mongo/stdx/type_traits.h"
#include "mongo/util/allocator.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/shared_buffer.h"
namespace mongo {
@@ -69,63 +71,80 @@ const int BufferMaxSize = 64 * 1024 * 1024;
template <typename Allocator>
class StringBuilderImpl;
-class TrivialAllocator {
+class SharedBufferAllocator {
+ MONGO_DISALLOW_COPYING(SharedBufferAllocator);
+
public:
- void* Malloc(size_t sz) {
- return mongoMalloc(sz);
+ SharedBufferAllocator() = default;
+
+ void malloc(size_t sz) {
+ _buf = SharedBuffer::allocate(sz);
+ }
+ void realloc(size_t sz) {
+ _buf.realloc(sz);
+ }
+ void free() {
+ _buf = {};
}
- void* Realloc(void* p, size_t sz) {
- return mongoRealloc(p, sz);
+ SharedBuffer release() {
+ return std::move(_buf);
}
- void Free(void* p) {
- free(p);
+
+ char* get() const {
+ return _buf.get();
}
+
+private:
+ SharedBuffer _buf;
};
class StackAllocator {
+ MONGO_DISALLOW_COPYING(StackAllocator);
+
public:
+ StackAllocator() = default;
+
enum { SZ = 512 };
- void* Malloc(size_t sz) {
- if (sz <= SZ)
- return buf;
- return mongoMalloc(sz);
- }
- void* Realloc(void* p, size_t sz) {
- if (p == buf) {
- if (sz <= SZ)
- return buf;
- void* d = mongoMalloc(sz);
- if (d == 0)
- msgasserted(15912, "out of memory StackAllocator::Realloc");
- memcpy(d, p, SZ);
- return d;
+ void malloc(size_t sz) {
+ if (sz > SZ)
+ _ptr = mongoMalloc(sz);
+ }
+ void realloc(size_t sz) {
+ if (_ptr == _buf) {
+ if (sz > SZ) {
+ _ptr = mongoMalloc(sz);
+ memcpy(_ptr, _buf, SZ);
+ }
+ } else {
+ _ptr = mongoRealloc(_ptr, sz);
}
- return mongoRealloc(p, sz);
}
- void Free(void* p) {
- if (p != buf)
- free(p);
+ void free() {
+ if (_ptr != _buf)
+ ::free(_ptr);
+ _ptr = _buf;
+ }
+
+ // Not supported on this allocator.
+ void release() = delete;
+
+ char* get() const {
+ return static_cast<char*>(_ptr);
}
private:
- char buf[SZ];
+ char _buf[SZ];
+ void* _ptr = _buf;
};
-template <class Allocator>
+template <class BufferAllocator>
class _BufBuilder {
- // non-copyable, non-assignable
- _BufBuilder(const _BufBuilder&);
- _BufBuilder& operator=(const _BufBuilder&);
- Allocator al;
+ MONGO_DISALLOW_COPYING(_BufBuilder);
public:
_BufBuilder(int initsize = 512) : size(initsize) {
if (size > 0) {
- data = (char*)al.Malloc(size);
- if (data == 0)
- msgasserted(10000, "out of memory BufBuilder");
- } else {
- data = 0;
+ _buf.malloc(size);
}
l = 0;
reservedBytes = 0;
@@ -135,10 +154,7 @@ public:
}
void kill() {
- if (data) {
- al.Free(data);
- data = 0;
- }
+ _buf.free();
}
void reset() {
@@ -149,10 +165,8 @@ public:
l = 0;
reservedBytes = 0;
if (maxSize && size > maxSize) {
- al.Free(data);
- data = (char*)al.Malloc(maxSize);
- if (data == 0)
- msgasserted(15913, "out of memory BufBuilder::reset");
+ _buf.free();
+ _buf.malloc(maxSize);
size = maxSize;
}
}
@@ -167,15 +181,15 @@ public:
/* note this may be deallocated (realloced) if you keep writing. */
char* buf() {
- return data;
+ return _buf.get();
}
const char* buf() const {
- return data;
+ return _buf.get();
}
- /* assume ownership of the buffer - you must then free() it */
- void decouple() {
- data = 0;
+ /* assume ownership of the buffer */
+ SharedBuffer release() {
+ return _buf.release();
}
void appendUChar(unsigned char j) {
@@ -268,7 +282,7 @@ public:
grow_reallocate(minSize);
}
l = newLen;
- return data + oldlen;
+ return _buf.get() + oldlen;
}
/**
@@ -313,24 +327,22 @@ private:
ss << "BufBuilder attempted to grow() to " << a << " bytes, past the 64MB limit.";
msgasserted(13548, ss.str().c_str());
}
- data = (char*)al.Realloc(data, a);
- if (data == NULL)
- msgasserted(16070, "out of memory BufBuilder::grow_reallocate");
+ _buf.realloc(a);
size = a;
}
- char* data;
+ BufferAllocator _buf;
int l;
int size;
int reservedBytes; // eagerly grow_reallocate to keep this many bytes of spare room.
- friend class StringBuilderImpl<Allocator>;
+ friend class StringBuilderImpl<BufferAllocator>;
};
-typedef _BufBuilder<TrivialAllocator> BufBuilder;
+typedef _BufBuilder<SharedBufferAllocator> BufBuilder;
/** The StackBufBuilder builds smaller datasets on the stack instead of using malloc.
- this can be significantly faster for small bufs. However, you can not decouple() the
+ this can be significantly faster for small bufs. However, you can not release() the
buffer with StackBufBuilder.
While designed to be a variable on the stack, if you were to dynamically allocate one,
nothing bad would happen. In fact in some circumstances this might make sense, say,
@@ -339,7 +351,7 @@ typedef _BufBuilder<TrivialAllocator> BufBuilder;
class StackBufBuilder : public _BufBuilder<StackAllocator> {
public:
StackBufBuilder() : _BufBuilder<StackAllocator>(StackAllocator::SZ) {}
- void decouple(); // not allowed. not implemented.
+ void release() = delete; // not allowed. not implemented.
};
/** std::stringstream deals with locale so this is a lot faster than std::stringstream for UTF8 */
@@ -426,7 +438,7 @@ public:
}
std::string str() const {
- return std::string(_buf.data, _buf.l);
+ return std::string(_buf.buf(), _buf.l);
}
/** size of current std::string */
@@ -452,6 +464,6 @@ private:
}
};
-typedef StringBuilderImpl<TrivialAllocator> StringBuilder;
+typedef StringBuilderImpl<SharedBufferAllocator> StringBuilder;
typedef StringBuilderImpl<StackAllocator> StackStringBuilder;
} // namespace mongo
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp
index fc9f4fd36d6..49b782dd2de 100644
--- a/src/mongo/client/dbclient.cpp
+++ b/src/mongo/client/dbclient.cpp
@@ -712,7 +712,7 @@ void DBClientInterface::findN(vector<BSONObj>& out,
for (int i = 0; i < nToReturn; i++) {
if (!c->more())
break;
- out.push_back(c->nextSafe().copy());
+ out.push_back(c->nextSafeOwned());
}
}
diff --git a/src/mongo/client/dbclientcursor.h b/src/mongo/client/dbclientcursor.h
index 7363cdb1453..dd3320550af 100644
--- a/src/mongo/client/dbclientcursor.h
+++ b/src/mongo/client/dbclientcursor.h
@@ -97,6 +97,11 @@ public:
/** throws AssertionException if get back { $err : ... } */
BSONObj nextSafe();
+ BSONObj nextSafeOwned() {
+ BSONObj out = nextSafe();
+ out.shareOwnershipWith(batch.m.sharedBuffer());
+ return out;
+ }
/** peek ahead at items buffered for future next() calls.
never requests new data from the server. so peek only effective
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp
index 57fef47f068..b042a6dc0e9 100644
--- a/src/mongo/client/fetcher.cpp
+++ b/src/mongo/client/fetcher.cpp
@@ -62,6 +62,7 @@ const char* kNextBatchFieldName = "nextBatch";
Status parseCursorResponse(const BSONObj& obj,
const std::string& batchFieldName,
Fetcher::QueryResponse* batchData) {
+ invariant(obj.isOwned());
invariant(batchFieldName == kFirstBatchFieldName || batchFieldName == kNextBatchFieldName);
invariant(batchData);
@@ -149,7 +150,11 @@ Status parseCursorResponse(const BSONObj& obj,
<< "' field: "
<< obj);
}
- batchData->documents.push_back(itemElement.Obj().getOwned());
+ batchData->documents.push_back(itemElement.Obj());
+ }
+
+ for (auto& doc : batchData->documents) {
+ doc.shareOwnershipWith(obj);
}
return Status::OK();
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 74f941a4dd0..0fafccb9649 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -202,7 +202,7 @@ public:
string ns = dbresponse.exhaustNS; // before reset() free's it...
m.reset();
BufBuilder b(512);
- b.appendNum((int)0 /*size set later in appendData()*/);
+ b.appendNum((int)0 /*size set later*/);
b.appendNum(header.getId());
b.appendNum(header.getResponseToMsgId());
b.appendNum((int)dbGetMore);
@@ -210,8 +210,10 @@ public:
b.appendStr(ns);
b.appendNum((int)0); // ntoreturn
b.appendNum(cursorid);
- m.appendData(b.buf(), b.len());
- b.decouple();
+
+ MsgData::View header = b.buf();
+ header.setLen(b.len());
+ m.setData(b.release());
DEV log() << "exhaust=true sending more";
continue; // this goes back to top loop
}
diff --git a/src/mongo/db/dbdirectclient.cpp b/src/mongo/db/dbdirectclient.cpp
index 763b6e3dde2..06c06991722 100644
--- a/src/mongo/db/dbdirectclient.cpp
+++ b/src/mongo/db/dbdirectclient.cpp
@@ -126,9 +126,6 @@ bool DBDirectClient::call(Message& toSend, Message& response, bool assertOk, str
CurOp curOp(_txn);
assembleResponse(_txn, toSend, dbResponse, dummyHost);
verify(!dbResponse.response.empty());
-
- // can get rid of this if we make response handling smarter
- dbResponse.response.concat();
response = std::move(dbResponse.response);
return true;
diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp
index 61a4d1509ac..67a5548dbfb 100644
--- a/src/mongo/db/dbmessage.cpp
+++ b/src/mongo/db/dbmessage.cpp
@@ -198,8 +198,7 @@ void OpQueryReplyBuilder::putInMessage(
qr.setCursorId(cursorId);
qr.setStartingFrom(startingFrom);
qr.setNReturned(nReturned);
- _buffer.decouple();
- out->setData(qr.view2ptr(), true); // transport will free
+ out->setData(_buffer.release()); // transport will free
}
void replyToQuery(int queryResultFlags,
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index 3961beae740..d3d1a1c34de 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -174,7 +174,6 @@ void generateLegacyQueryErrorResponse(const AssertionException* exception,
// TODO: call replyToQuery() from here instead of this!!! see dbmessage.h
QueryResult::View msgdata = bb.buf();
- bb.decouple();
QueryResult::View qr = msgdata;
qr.setResultFlags(ResultFlag_ErrSet);
if (scex)
@@ -184,7 +183,7 @@ void generateLegacyQueryErrorResponse(const AssertionException* exception,
qr.setCursorId(0);
qr.setStartingFrom(0);
qr.setNReturned(1);
- response->setData(msgdata.view2ptr(), true);
+ response->setData(bb.release());
}
/**
@@ -469,7 +468,6 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m,
}
bool exhaust = false;
- QueryResult::View msgdata = 0;
bool isCursorAuthorized = false;
try {
@@ -487,7 +485,7 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m,
sleepmillis(0);
}
- msgdata = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized);
+ dbresponse.response = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized);
} catch (AssertionException& e) {
if (isCursorAuthorized) {
// If a cursor with id 'cursorid' was authorized, it may have been advanced
@@ -510,9 +508,9 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m,
return false;
}
- dbresponse.response.setData(msgdata.view2ptr(), true);
curop.debug().responseLength = dbresponse.response.header().dataLen();
- curop.debug().nreturned = msgdata.getNReturned();
+ auto queryResult = QueryResult::ConstView(dbresponse.response.buf());
+ curop.debug().nreturned = queryResult.getNReturned();
dbresponse.responseToMsgId = m.header().getId();
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index 33a1661ea0b..7514adfa175 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -166,10 +166,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
<< elt};
}
- batch.push_back(elt.Obj().getOwned());
+ batch.push_back(elt.Obj());
}
- return {{NamespaceString(fullns), cursorId, batch}};
+ for (auto& doc : batch) {
+ doc.shareOwnershipWith(cmdResponse);
+ }
+
+ return {{NamespaceString(fullns), cursorId, std::move(batch)}};
}
void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 6e501242cee..e5b6646dae6 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -223,12 +223,12 @@ void generateBatch(int ntoreturn,
/**
* Called by db/instance.cpp. This is the getMore entry point.
*/
-QueryResult::View getMore(OperationContext* txn,
- const char* ns,
- int ntoreturn,
- long long cursorid,
- bool* exhaust,
- bool* isCursorAuthorized) {
+Message getMore(OperationContext* txn,
+ const char* ns,
+ int ntoreturn,
+ long long cursorid,
+ bool* exhaust,
+ bool* isCursorAuthorized) {
invariant(ntoreturn >= 0);
CurOp& curOp = *CurOp::get(txn);
@@ -485,9 +485,8 @@ QueryResult::View getMore(OperationContext* txn,
qr.setCursorId(cursorid);
qr.setStartingFrom(startingResult);
qr.setNReturned(numResults);
- bb.decouple();
LOG(5) << "getMore returned " << numResults << " results\n";
- return qr;
+ return Message(bb.release());
}
std::string runQuery(OperationContext* txn,
@@ -543,7 +542,6 @@ std::string runQuery(OperationContext* txn,
// Set query result fields.
QueryResult::View qr = bb.buf();
- bb.decouple();
qr.setResultFlagsToOk();
qr.msgdata().setLen(bb.len());
curOp.debug().responseLength = bb.len();
@@ -551,7 +549,7 @@ std::string runQuery(OperationContext* txn,
qr.setCursorId(0);
qr.setStartingFrom(0);
qr.setNReturned(1);
- result.setData(qr.view2ptr(), true);
+ result.setData(bb.release());
return "";
}
@@ -686,18 +684,18 @@ std::string runQuery(OperationContext* txn,
endQueryOp(txn, collection, *exec, numResults, ccId);
}
- // Add the results from the query into the output buffer.
- result.appendData(bb.buf(), bb.len());
- bb.decouple();
-
// Fill out the output buffer's header.
- QueryResult::View queryResultView = result.header().view2ptr();
+ QueryResult::View queryResultView = bb.buf();
queryResultView.setCursorId(ccId);
queryResultView.setResultFlagsToOk();
+ queryResultView.msgdata().setLen(bb.len());
queryResultView.msgdata().setOperation(opReply);
queryResultView.setStartingFrom(0);
queryResultView.setNReturned(numResults);
+ // Add the results from the query into the output buffer.
+ result.setData(bb.release());
+
// curOp.debug().exhaust is set above.
return curOp.debug().exhaust ? nss.ns() : "";
}
diff --git a/src/mongo/db/query/find.h b/src/mongo/db/query/find.h
index 4b968e58419..e6c8160b5cb 100644
--- a/src/mongo/db/query/find.h
+++ b/src/mongo/db/query/find.h
@@ -109,13 +109,14 @@ StatusWith<std::unique_ptr<PlanExecutor>> getOplogStartHack(OperationContext* tx
/**
* Called from the getMore entry point in ops/query.cpp.
+ * Returned buffer is the message to return to the client.
*/
-QueryResult::View getMore(OperationContext* txn,
- const char* ns,
- int ntoreturn,
- long long cursorid,
- bool* exhaust,
- bool* isCursorAuthorized);
+Message getMore(OperationContext* txn,
+ const char* ns,
+ int ntoreturn,
+ long long cursorid,
+ bool* exhaust,
+ bool* isCursorAuthorized);
/**
* Run the query 'q' and place the result in 'result'.
diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp
index c2fee263c60..ddfd61cf6f8 100644
--- a/src/mongo/executor/async_mock_stream_factory.cpp
+++ b/src/mongo/executor/async_mock_stream_factory.cpp
@@ -280,7 +280,8 @@ void AsyncMockStreamFactory::MockStream::simulateServer(
WriteEvent write{this};
std::vector<uint8_t> messageData = popWrite();
- Message msg(messageData.data(), false);
+ Message msg(SharedBuffer::allocate(messageData.size()));
+ memcpy(msg.buf(), messageData.data(), messageData.size());
auto parsedRequest = rpc::makeRequest(&msg);
ASSERT(parsedRequest->getProtocol() == proto);
diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp
index f6812fd9c5a..10cbcda7115 100644
--- a/src/mongo/executor/network_interface_asio_command.cpp
+++ b/src/mongo/executor/network_interface_asio_command.cpp
@@ -115,7 +115,7 @@ void asyncRecvMessageBody(AsyncStreamInterface& stream,
int z = (len + 1023) & 0xfffffc00;
invariant(z >= len);
- m->setData(reinterpret_cast<char*>(mongoMalloc(z)), true);
+ m->setData(SharedBuffer::allocate(z));
MsgData::View mdView = m->buf();
// copy header data into master buffer
diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp
index 2be37ef9779..bf96e029865 100644
--- a/src/mongo/executor/network_interface_asio_test.cpp
+++ b/src/mongo/executor/network_interface_asio_test.cpp
@@ -469,8 +469,8 @@ public:
// Get the appropriate message id
WriteEvent write{stream};
std::vector<uint8_t> messageData = stream->popWrite();
- Message msg(messageData.data(), false);
- messageId = msg.header().getId();
+ messageId =
+ MsgData::ConstView(reinterpret_cast<const char*>(messageData.data())).getId();
}
// Build a mock reply message
diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h
index 3e997e448de..742daea76c1 100644
--- a/src/mongo/executor/remote_command_response.h
+++ b/src/mongo/executor/remote_command_response.h
@@ -52,16 +52,28 @@ struct RemoteCommandResponse {
RemoteCommandResponse() = default;
RemoteCommandResponse(BSONObj dataObj, BSONObj metadataObj, Milliseconds millis)
- : data(std::move(dataObj)), metadata(std::move(metadataObj)), elapsedMillis(millis) {}
-
- RemoteCommandResponse(Message message,
+ : data(std::move(dataObj)), metadata(std::move(metadataObj)), elapsedMillis(millis) {
+ // The buffer backing the default empty BSONObj has static duration so it is effectively
+ // owned.
+ invariant(data.isOwned() || data.objdata() == BSONObj().objdata());
+ invariant(metadata.isOwned() || metadata.objdata() == BSONObj().objdata());
+ }
+
+ RemoteCommandResponse(Message messageArg,
BSONObj dataObj,
BSONObj metadataObj,
Milliseconds millis)
- : message(std::make_shared<const Message>(std::move(message))),
+ : message(std::make_shared<const Message>(std::move(messageArg))),
data(std::move(dataObj)),
metadata(std::move(metadataObj)),
- elapsedMillis(millis) {}
+ elapsedMillis(millis) {
+ if (!data.isOwned()) {
+ data.shareOwnershipWith(message->sharedBuffer());
+ }
+ if (!metadata.isOwned()) {
+ metadata.shareOwnershipWith(message->sharedBuffer());
+ }
+ }
RemoteCommandResponse(const rpc::ReplyInterface& rpcReply, Milliseconds millis);
@@ -71,8 +83,8 @@ struct RemoteCommandResponse {
bool operator!=(const RemoteCommandResponse& rhs) const;
std::shared_ptr<const Message> message; // May be null.
- BSONObj data; // Either owned or points into message.
- BSONObj metadata; // Either owned or points into message.
+ BSONObj data; // Always owned. May point into message.
+ BSONObj metadata; // Always owned. May point into message.
Milliseconds elapsedMillis = {};
};
diff --git a/src/mongo/rpc/command_reply.cpp b/src/mongo/rpc/command_reply.cpp
index b6d3a98586f..172e7ac1ebe 100644
--- a/src/mongo/rpc/command_reply.cpp
+++ b/src/mongo/rpc/command_reply.cpp
@@ -53,7 +53,9 @@ CommandReply::CommandReply(const Message* message) : _message(message) {
ConstDataRangeCursor cur(begin, messageEnd);
_commandReply = uassertStatusOK(cur.readAndAdvance<Validated<BSONObj>>()).val;
+ _commandReply.shareOwnershipWith(message->sharedBuffer());
_metadata = uassertStatusOK(cur.readAndAdvance<Validated<BSONObj>>()).val;
+ _metadata.shareOwnershipWith(message->sharedBuffer());
_outputDocs = DocumentRange(cur.data(), messageEnd);
}
diff --git a/src/mongo/rpc/command_reply_builder.cpp b/src/mongo/rpc/command_reply_builder.cpp
index 15b70123708..f5b48da87f9 100644
--- a/src/mongo/rpc/command_reply_builder.cpp
+++ b/src/mongo/rpc/command_reply_builder.cpp
@@ -110,8 +110,7 @@ Message CommandReplyBuilder::done() {
MsgData::View msg = _builder.buf();
msg.setLen(_builder.len());
msg.setOperation(dbCommandReply);
- _builder.decouple(); // release ownership from BufBuilder
- _message.setData(msg.view2ptr(), true); // transfer ownership to Message
+ _message.setData(_builder.release());
_state = State::kDone;
return std::move(_message);
}
diff --git a/src/mongo/rpc/command_request_builder.cpp b/src/mongo/rpc/command_request_builder.cpp
index 3945b6219a4..c78e493df05 100644
--- a/src/mongo/rpc/command_request_builder.cpp
+++ b/src/mongo/rpc/command_request_builder.cpp
@@ -101,8 +101,7 @@ Message CommandRequestBuilder::done() {
MsgData::View msg = _builder.buf();
msg.setLen(_builder.len());
msg.setOperation(dbCommand);
- _builder.decouple(); // release ownership from BufBuilder.
- _message.setData(msg.view2ptr(), true); // transfer ownership to Message.
+ _message.setData(_builder.release()); // transfer ownership to Message.
_state = State::kDone;
return std::move(_message);
}
diff --git a/src/mongo/rpc/legacy_reply_builder.cpp b/src/mongo/rpc/legacy_reply_builder.cpp
index 393c3bb3519..5880121ae66 100644
--- a/src/mongo/rpc/legacy_reply_builder.cpp
+++ b/src/mongo/rpc/legacy_reply_builder.cpp
@@ -158,8 +158,7 @@ Message LegacyReplyBuilder::done() {
qr.setStartingFrom(0);
qr.setNReturned(1);
- _message.setData(qr.view2ptr(), true);
- _builder.decouple();
+ _message.setData(_builder.release());
_state = State::kDone;
return std::move(_message);
diff --git a/src/mongo/rpc/legacy_request_builder.cpp b/src/mongo/rpc/legacy_request_builder.cpp
index c2155537611..4e126f6117f 100644
--- a/src/mongo/rpc/legacy_request_builder.cpp
+++ b/src/mongo/rpc/legacy_request_builder.cpp
@@ -114,8 +114,7 @@ Message LegacyRequestBuilder::done() {
MsgData::View msg = _builder.buf();
msg.setLen(_builder.len());
msg.setOperation(dbQuery);
- _builder.decouple(); // release ownership from BufBuilder
- _message.setData(msg.view2ptr(), true); // transfer ownership to Message
+ _message.setData(_builder.release());
_state = State::kDone;
return std::move(_message);
}
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index deb4bf34f9f..1f2dbdf6c7c 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -58,6 +58,10 @@ public:
* Returns the next query result, or an error.
*
* If there are no more results, returns boost::none.
+ *
+ * All returned BSONObjs are owned. They may own a buffer larger than the object. If you are
+ * holding on to a subset of the returned results and need to minimize memory usage, call copy()
+ * on the BSONObjs.
*/
virtual StatusWith<boost::optional<BSONObj>> next() = 0;
diff --git a/src/mongo/tools/sniffer.cpp b/src/mongo/tools/sniffer.cpp
index 255d41c1394..b5081a361ed 100644
--- a/src/mongo/tools/sniffer.cpp
+++ b/src/mongo/tools/sniffer.cpp
@@ -82,6 +82,7 @@ using mongo::BufBuilder;
using mongo::DBClientConnection;
using mongo::MemoryMappedFile;
using std::string;
+namespace MsgData = mongo::MsgData;
#define SNAP_LEN 65535
@@ -176,6 +177,13 @@ map<Connection, map<long long, long long>> mapCursor;
void processMessage(Connection& c, Message& d);
+Message copyToMessage(const char* source) {
+ auto msgLen = MsgData::ConstView(source).getLen();
+ auto msgData = mongo::SharedBuffer::allocate(msgLen);
+ memcpy(msgData.get(), source, msgLen);
+ return Message(std::move(msgData));
+}
+
void got_packet(u_char* args, const struct pcap_pkthdr* header, const u_char* packet) {
const struct sniff_ip* ip = (struct sniff_ip*)(packet + captureHeaderSize);
int size_ip = IP_HL(ip) * 4;
@@ -225,7 +233,7 @@ void got_packet(u_char* args, const struct pcap_pkthdr* header, const u_char* pa
Message m;
if (bytesRemainingInMessage[c] == 0) {
- m.setData(const_cast<char*>(reinterpret_cast<const char*>(payload)), false);
+ m = copyToMessage(reinterpret_cast<const char*>(payload));
if (!m.header().valid()) {
cerr << "Invalid message start, skipping packet." << endl;
return;
@@ -251,8 +259,7 @@ void got_packet(u_char* args, const struct pcap_pkthdr* header, const u_char* pa
}
if (bytesRemainingInMessage[c] > 0)
return;
- m.setData(messageBuilder[c]->buf(), true);
- messageBuilder[c]->decouple();
+ m.setData(messageBuilder[c]->release());
messageBuilder[c].reset();
}
@@ -454,7 +461,7 @@ void processDiagLog(const char* file) {
long read = 0;
while (read < length) {
- Message m(pos, false);
+ Message m = copyToMessage(pos);
int len = m.header().getLen();
DbMessage d(m);
cout << len << " " << d.getns() << endl;
diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp
index 778bfcc3758..6cfe3606043 100644
--- a/src/mongo/transport/service_entry_point_mock.cpp
+++ b/src/mongo/transport/service_entry_point_mock.cpp
@@ -65,8 +65,7 @@ void setOkResponse(Message* m) {
// Set the message, transfer buffer ownership to Message
m->reset();
- m->setData(msg.view2ptr(), true);
- b.decouple();
+ m->setData(b.release());
}
} // namespace
diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h
index 582de6178d1..ceeca2a0985 100644
--- a/src/mongo/transport/service_entry_point_mock.h
+++ b/src/mongo/transport/service_entry_point_mock.h
@@ -28,6 +28,8 @@
#pragma once
+#include <vector>
+
#include "mongo/base/disallow_copying.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp
index d155d3b24b5..e4f93ae3344 100644
--- a/src/mongo/transport/service_entry_point_test_suite.cpp
+++ b/src/mongo/transport/service_entry_point_test_suite.cpp
@@ -86,8 +86,7 @@ void setPingCommand(Message* m) {
m->reset();
// Transfer buffer ownership to the Message.
- m->setData(msg.view2ptr(), true);
- b.decouple();
+ m->setData(b.release());
}
// Some default method implementations
diff --git a/src/mongo/util/duration.cpp b/src/mongo/util/duration.cpp
index e3a29e858dc..9d7137ab259 100644
--- a/src/mongo/util/duration.cpp
+++ b/src/mongo/util/duration.cpp
@@ -129,16 +129,16 @@ template StringBuilderImpl<StackAllocator>& operator<<(StringBuilderImpl<StackAl
template StringBuilderImpl<StackAllocator>& operator<<(StringBuilderImpl<StackAllocator>&, Seconds);
template StringBuilderImpl<StackAllocator>& operator<<(StringBuilderImpl<StackAllocator>&, Minutes);
template StringBuilderImpl<StackAllocator>& operator<<(StringBuilderImpl<StackAllocator>&, Hours);
-template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&,
- Nanoseconds);
-template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&,
- Microseconds);
-template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&,
- Milliseconds);
-template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&,
- Seconds);
-template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&,
- Minutes);
-template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&,
- Hours);
+template StringBuilderImpl<SharedBufferAllocator>& operator<<(
+ StringBuilderImpl<SharedBufferAllocator>&, Nanoseconds);
+template StringBuilderImpl<SharedBufferAllocator>& operator<<(
+ StringBuilderImpl<SharedBufferAllocator>&, Microseconds);
+template StringBuilderImpl<SharedBufferAllocator>& operator<<(
+ StringBuilderImpl<SharedBufferAllocator>&, Milliseconds);
+template StringBuilderImpl<SharedBufferAllocator>& operator<<(
+ StringBuilderImpl<SharedBufferAllocator>&, Seconds);
+template StringBuilderImpl<SharedBufferAllocator>& operator<<(
+ StringBuilderImpl<SharedBufferAllocator>&, Minutes);
+template StringBuilderImpl<SharedBufferAllocator>& operator<<(
+ StringBuilderImpl<SharedBufferAllocator>&, Hours);
} // namespace mongo
diff --git a/src/mongo/util/net/asio_message_port.cpp b/src/mongo/util/net/asio_message_port.cpp
index 9ba852003dc..542bfab282c 100644
--- a/src/mongo/util/net/asio_message_port.cpp
+++ b/src/mongo/util/net/asio_message_port.cpp
@@ -303,8 +303,8 @@ bool ASIOMessagingPort::recv(Message& m) {
if (getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->shouldFail()) {
throw SocketException(SocketException::RECV_ERROR, "fail point set");
}
- MsgData::View md = reinterpret_cast<char*>(mongoMalloc(kInitialMessageSize));
- ScopeGuard guard = MakeGuard([&md]() { free(md.view2ptr()); });
+ SharedBuffer buf = SharedBuffer::allocate(kInitialMessageSize);
+ MsgData::View md = buf.get();
asio::error_code ec = _read(md.view2ptr(), kHeaderLen);
if (ec) {
@@ -378,7 +378,8 @@ bool ASIOMessagingPort::recv(Message& m) {
}
if (msgLen > kInitialMessageSize) {
- md = reinterpret_cast<char*>(mongoRealloc(md.view2ptr(), msgLen));
+ buf.realloc(msgLen);
+ md = buf.get();
}
ec = _read(md.data(), msgLen - kHeaderLen);
@@ -386,8 +387,7 @@ bool ASIOMessagingPort::recv(Message& m) {
throw asio::system_error(ec);
}
- guard.Dismiss();
- m.setData(md.view2ptr(), true);
+ m.setData(std::move(buf));
return true;
} catch (const asio::system_error& e) {
@@ -428,8 +428,6 @@ void ASIOMessagingPort::say(Message& toSend, int responseTo) {
auto buf = toSend.buf();
if (buf) {
send(buf, MsgData::ConstView(buf).getLen(), nullptr);
- } else {
- send(toSend.dataBuffers(), nullptr);
}
}
diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h
index 2c28aabe5cf..37a6c0bb66b 100644
--- a/src/mongo/util/net/message.h
+++ b/src/mongo/util/net/message.h
@@ -30,7 +30,6 @@
#pragma once
#include <cstdint>
-#include <vector>
#include "mongo/base/data_type_endian.h"
#include "mongo/base/data_view.h"
@@ -399,31 +398,13 @@ inline int ConstView::dataLen() const {
} // namespace MsgData
class Message {
- MONGO_DISALLOW_COPYING(Message);
-
public:
- using MsgVec = std::vector<std::pair<char*, int>>;
-
- // we assume here that a vector with initial size 0 does no allocation (0 is the default, but
- // wanted to make it explicit).
Message() = default;
-
- Message(void* data, bool freeIt) : _buf(reinterpret_cast<char*>(data)), _freeIt(freeIt) {}
-
- Message(Message&& r) : _buf(r._buf), _data(std::move(r._data)), _freeIt(r._freeIt) {
- r._buf = nullptr;
- r._freeIt = false;
- }
-
- ~Message() {
- reset();
- }
-
- SockAddr _from;
+ explicit Message(SharedBuffer data) : _buf(std::move(data)) {}
MsgData::View header() const {
verify(!empty());
- return _buf ? _buf : _data[0].first;
+ return _buf.get();
}
NetworkOp operation() const {
@@ -436,110 +417,28 @@ public:
}
bool empty() const {
- return !_buf && _data.empty();
+ return !_buf;
}
int size() const {
- int res = 0;
if (_buf) {
- res = MsgData::ConstView(_buf).getLen();
- } else {
- for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) {
- res += it->second;
- }
+ return MsgData::ConstView(_buf.get()).getLen();
}
- return res;
+ return 0;
}
int dataSize() const {
return size() - sizeof(MSGHEADER::Value);
}
- // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy
- // can get rid of this if we make response handling smarter
- void concat() {
- if (_buf || empty()) {
- return;
- }
-
- verify(_freeIt);
- int totalSize = 0;
- for (std::vector<std::pair<char*, int>>::const_iterator i = _data.begin(); i != _data.end();
- ++i) {
- totalSize += i->second;
- }
- char* buf = (char*)mongoMalloc(totalSize);
- char* p = buf;
- for (std::vector<std::pair<char*, int>>::const_iterator i = _data.begin(); i != _data.end();
- ++i) {
- memcpy(p, i->first, i->second);
- p += i->second;
- }
- reset();
- _setData(buf, true);
- }
-
- Message& operator=(Message&& r) {
- // This implementation was originally written using an auto_ptr-style fake move. When
- // converting to a real move assignment, checking for self-assignment was the simplest way
- // to ensure correctness.
- if (&r == this)
- return *this;
-
- if (!empty()) {
- reset();
- }
-
- _buf = r._buf;
- _data = std::move(r._data);
- _freeIt = r._freeIt;
-
- r._buf = nullptr;
- r._freeIt = false;
- return *this;
- }
-
void reset() {
- if (_freeIt) {
- if (_buf) {
- std::free(_buf);
- }
- for (std::vector<std::pair<char*, int>>::const_iterator i = _data.begin();
- i != _data.end();
- ++i) {
- std::free(i->first);
- }
- }
- _buf = nullptr;
- _data.clear();
- _freeIt = false;
- }
-
- // use to add a buffer
- // assumes message will free everything
- void appendData(char* d, int size) {
- if (size <= 0) {
- return;
- }
- if (empty()) {
- MsgData::View md = d;
- md.setLen(size); // can be updated later if more buffers added
- _setData(md.view2ptr(), true);
- return;
- }
- verify(_freeIt);
- if (_buf) {
- _data.push_back(std::make_pair(_buf, MsgData::ConstView(_buf).getLen()));
- _buf = 0;
- }
- _data.push_back(std::make_pair(d, size));
- header().setLen(header().getLen() + size);
+ _buf = {};
}
// use to set first buffer if empty
- void setData(char* d, bool freeIt) {
+ void setData(SharedBuffer buf) {
verify(empty());
- _setData(d, freeIt);
+ _buf = std::move(buf);
}
void setData(int operation, const char* msgtxt) {
setData(operation, msgtxt, strlen(msgtxt) + 1);
@@ -547,39 +446,29 @@ public:
void setData(int operation, const char* msgdata, size_t len) {
verify(empty());
size_t dataLen = len + sizeof(MsgData::Value) - 4;
- MsgData::View d = reinterpret_cast<char*>(mongoMalloc(dataLen));
+ _buf = SharedBuffer::allocate(dataLen);
+ MsgData::View d = _buf.get();
if (len)
memcpy(d.data(), msgdata, len);
d.setLen(dataLen);
d.setOperation(operation);
- _setData(d.view2ptr(), true);
}
- bool doIFreeIt() {
- return _freeIt;
+ char* buf() {
+ return _buf.get();
}
- char* buf() {
+ std::string toString() const;
+
+ SharedBuffer sharedBuffer() {
return _buf;
}
-
- const MsgVec& dataBuffers() const {
- return _data;
+ ConstSharedBuffer sharedBuffer() const {
+ return _buf;
}
- std::string toString() const;
-
private:
- void _setData(char* d, bool freeIt) {
- _freeIt = freeIt;
- _buf = d;
- }
- // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data
- char* _buf{nullptr};
- // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage
- // instead
- MsgVec _data{};
- bool _freeIt{false};
+ SharedBuffer _buf;
};
diff --git a/src/mongo/util/net/message_port.cpp b/src/mongo/util/net/message_port.cpp
index 69e33377e59..5051962ed5d 100644
--- a/src/mongo/util/net/message_port.cpp
+++ b/src/mongo/util/net/message_port.cpp
@@ -184,17 +184,14 @@ bool MessagingPort::recv(Message& m) {
_psock->setHandshakeReceived();
int z = (len + 1023) & 0xfffffc00;
verify(z >= len);
- MsgData::View md = reinterpret_cast<char*>(mongoMalloc(z));
- ScopeGuard guard = MakeGuard(free, md.view2ptr());
- verify(md.view2ptr());
-
+ auto buf = SharedBuffer::allocate(z);
+ MsgData::View md = buf.get();
memcpy(md.view2ptr(), &header, headerLen);
int left = len - headerLen;
_psock->recv(md.data(), left);
- guard.Dismiss();
- m.setData(md.view2ptr(), true);
+ m.setData(std::move(buf));
return true;
} catch (const SocketException& e) {
@@ -234,8 +231,6 @@ void MessagingPort::say(Message& toSend, int responseTo) {
auto buf = toSend.buf();
if (buf) {
send(buf, MsgData::ConstView(buf).getLen(), "say");
- } else {
- send(toSend.dataBuffers(), "say");
}
}
diff --git a/src/mongo/util/shared_buffer.h b/src/mongo/util/shared_buffer.h
index 6cb8103ded4..09951dc80dc 100644
--- a/src/mongo/util/shared_buffer.h
+++ b/src/mongo/util/shared_buffer.h
@@ -32,9 +32,13 @@
#include "mongo/platform/atomic_word.h"
#include "mongo/util/allocator.h"
+#include "mongo/util/assert_util.h"
namespace mongo {
+/**
+ * A mutable, ref-counted buffer.
+ */
class SharedBuffer {
public:
SharedBuffer() = default;
@@ -44,30 +48,39 @@ public:
}
static SharedBuffer allocate(size_t bytes) {
- return takeOwnership(static_cast<char*>(mongoMalloc(sizeof(Holder) + bytes)));
+ return takeOwnership(mongoMalloc(sizeof(Holder) + bytes));
}
/**
- * Given a pointer to a region of un-owned data, prefixed by sufficient space for a
- * SharedBuffer::Holder object, return an SharedBuffer that owns the
- * memory.
+ * Resizes the buffer, copying the current contents.
*
- * This class will call free(holderPrefixedData), so it must have been allocated in a way
- * that makes that valid.
+ * Like ::realloc() this can be called on a null SharedBuffer.
+ *
+ * This method is illegal to call if any other SharedBuffer instances share this buffer since
+ * they wouldn't be updated and would still try to delete the original buffer.
*/
- static SharedBuffer takeOwnership(char* holderPrefixedData) {
- // Initialize the refcount to 1 so we don't need to increment it in the constructor
- // (see private Holder* constructor below).
- //
- // TODO: Should dassert alignment of holderPrefixedData
- // here if possible.
- return SharedBuffer(new (holderPrefixedData) Holder(1U));
+ void realloc(size_t size) {
+ invariant(!_holder || !_holder->isShared());
+
+ const size_t realSize = size + sizeof(Holder);
+ void* newPtr = mongoRealloc(_holder.get(), realSize);
+
+ // Get newPtr into _holder with a ref-count of 1 without touching the current pointee of
+ // _holder which is now invalid.
+ auto tmp = SharedBuffer::takeOwnership(newPtr);
+ _holder.detach();
+ _holder = std::move(tmp._holder);
}
char* get() const {
return _holder ? _holder->data() : NULL;
}
+ explicit operator bool() const {
+ return bool(_holder);
+ }
+
+private:
class Holder {
public:
explicit Holder(AtomicUInt32::WordType initial = AtomicUInt32::WordType())
@@ -95,14 +108,31 @@ public:
return reinterpret_cast<const char*>(this + 1);
}
- private:
+ bool isShared() const {
+ return _refCount.load() > 1;
+ }
+
AtomicUInt32 _refCount;
};
-private:
explicit SharedBuffer(Holder* holder) : _holder(holder, /*add_ref=*/false) {
// NOTE: The 'false' above is because we have already initialized the Holder with a
- // refcount of '1' in takeOwnership above. This avoids an atomic increment.
+ // refcount of '1' in takeOwnership below. This avoids an atomic increment.
+ }
+
+ /**
+ * Given a pointer to a region of un-owned data, prefixed by sufficient space for a
+ * SharedBuffer::Holder object, return an SharedBuffer that owns the memory.
+ *
+ * This class will call free(holderPrefixedData), so it must have been allocated in a way
+ * that makes that valid.
+ */
+ static SharedBuffer takeOwnership(void* holderPrefixedData) {
+ // Initialize the refcount to 1 so we don't need to increment it in the constructor
+ // (see private Holder* constructor above).
+ //
+ // TODO: Should dassert alignment of holderPrefixedData here if possible.
+ return SharedBuffer(new (holderPrefixedData) Holder(1U));
}
boost::intrusive_ptr<Holder> _holder;
@@ -111,4 +141,34 @@ private:
inline void swap(SharedBuffer& one, SharedBuffer& two) {
one.swap(two);
}
+
+/**
+ * A constant view into a ref-counted buffer.
+ *
+ * Use SharedBuffer to allocate since allocating a const buffer is useless.
+ */
+class ConstSharedBuffer {
+public:
+ ConstSharedBuffer() = default;
+ /*implicit*/ ConstSharedBuffer(SharedBuffer source) : _buffer(std::move(source)) {}
+
+ void swap(ConstSharedBuffer& other) {
+ _buffer.swap(other._buffer);
+ }
+
+ const char* get() const {
+ return _buffer.get();
+ }
+
+ explicit operator bool() const {
+ return bool(_buffer);
+ }
+
+private:
+ SharedBuffer _buffer;
+};
+
+inline void swap(ConstSharedBuffer& one, ConstSharedBuffer& two) {
+ one.swap(two);
+}
}