diff options
author | Mathias Stearn <mathias@10gen.com> | 2016-06-06 14:08:29 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2016-06-22 16:04:36 -0400 |
commit | e508ddcb51eec941ae50d9c2efb06b601811dc19 (patch) | |
tree | cab51e8665c29d8220b64e7f69a8bbc592ca5339 /src/mongo | |
parent | 40f20eca105a5e06a72df583ac654f946e9b058e (diff) | |
download | mongo-e508ddcb51eec941ae50d9c2efb06b601811dc19.tar.gz |
SERVER-24418 Make Message and BufBuilder use SharedBuffer for memory management
This makes it possible to get owned BSONObj out of a Message without copying.
Hooked up to the following places:
- Anything using the Fetcher (including oplog fetching on secondaries)
- Anything using DBClientInterface::findOne()
- Anything using CursorResponse (including Sharded queries)
As a simplification, Messages no longer support non-contiguous buffers, or
non-owning buffers. The former wasn't used by anything, and the latter was
only used by mongosniff only for messages that fit in a single packet.
Diffstat (limited to 'src/mongo')
34 files changed, 328 insertions, 336 deletions
diff --git a/src/mongo/bson/bson_obj_test.cpp b/src/mongo/bson/bson_obj_test.cpp index 843dfb13829..f59f74ebd82 100644 --- a/src/mongo/bson/bson_obj_test.cpp +++ b/src/mongo/bson/bson_obj_test.cpp @@ -540,5 +540,19 @@ TEST(BSONObj, getFieldsWithDuplicates) { ASSERT_EQUALS(fields[1].str(), "3"); } +TEST(BSONObj, ShareOwnershipWith) { + BSONObj obj; + { + BSONObj tmp = BSON("sub" << BSON("a" << 1)); + obj = tmp["sub"].Obj(); + obj.shareOwnershipWith(tmp); + ASSERT(obj.isOwned()); + } + + // Now that tmp is out of scope, if obj didn't retain ownership, it would be accessing free'd + // memory which should error on ASAN and debug builds. + ASSERT(obj.isOwned()); + ASSERT_EQ(obj, BSON("a" << 1)); +} } // unnamed namespace diff --git a/src/mongo/bson/bsonobj.cpp b/src/mongo/bson/bsonobj.cpp index d9ba4143607..30b98e08525 100644 --- a/src/mongo/bson/bsonobj.cpp +++ b/src/mongo/bson/bsonobj.cpp @@ -76,9 +76,9 @@ void BSONObj::_assertInvalid() const { } BSONObj BSONObj::copy() const { - char* storage = static_cast<char*>(mongoMalloc(sizeof(Holder) + objsize())); - memcpy(storage + sizeof(Holder), objdata(), objsize()); - return BSONObj::takeOwnership(storage); + auto storage = SharedBuffer::allocate(objsize()); + memcpy(storage.get(), objdata(), objsize()); + return BSONObj(std::move(storage)); } BSONObj BSONObj::getOwned() const { diff --git a/src/mongo/bson/bsonobj.h b/src/mongo/bson/bsonobj.h index acd103edf2e..d7a801eb966 100644 --- a/src/mongo/bson/bsonobj.h +++ b/src/mongo/bson/bsonobj.h @@ -114,7 +114,7 @@ public: init(bsonData); } - explicit BSONObj(SharedBuffer ownedBuffer) + explicit BSONObj(ConstSharedBuffer ownedBuffer) : _objdata(ownedBuffer.get() ? ownedBuffer.get() : BSONObj().objdata()), _ownedBuffer(std::move(ownedBuffer)) {} @@ -175,7 +175,27 @@ public: @return true if this is in owned mode */ bool isOwned() const { - return _ownedBuffer.get() != 0; + return bool(_ownedBuffer); + } + + /** + * Share ownership with another object. + * + * It is the callers responsibility to ensure that the other object is owned and contains the + * data this BSONObj is viewing. This can happen if this is a subobject or sibling object + * contained in a larger buffer. + */ + void shareOwnershipWith(ConstSharedBuffer buffer) { + invariant(buffer); + _ownedBuffer = buffer; + } + void shareOwnershipWith(const BSONObj& other) { + shareOwnershipWith(other.sharedBuffer()); + } + + ConstSharedBuffer sharedBuffer() const { + invariant(isOwned()); + return _ownedBuffer; } /** If the data buffer is under the control of this BSONObj, return it. @@ -540,19 +560,6 @@ public: template <typename T> bool coerceVector(std::vector<T>* out) const; - typedef SharedBuffer::Holder Holder; - - /** Given a pointer to a region of un-owned memory containing BSON data, prefixed by - * sufficient space for a BSONObj::Holder object, return a BSONObj that owns the - * memory. - * - * This class will call free(holderPrefixedData), so it must have been allocated in a way - * that makes that valid. - */ - static BSONObj takeOwnership(char* holderPrefixedData) { - return BSONObj(SharedBuffer::takeOwnership(holderPrefixedData)); - } - /// members for Sorter struct SorterDeserializeSettings {}; // unused void serializeForSorter(BufBuilder& buf) const { @@ -586,7 +593,7 @@ private: Status _okForStorage(bool root, bool deep) const; const char* _objdata; - SharedBuffer _ownedBuffer; + ConstSharedBuffer _ownedBuffer; }; std::ostream& operator<<(std::ostream& s, const BSONObj& o); diff --git a/src/mongo/bson/bsonobjbuilder.h b/src/mongo/bson/bsonobjbuilder.h index 10b8a93b80e..224deb1ee6d 100644 --- a/src/mongo/bson/bsonobjbuilder.h +++ b/src/mongo/bson/bsonobjbuilder.h @@ -65,15 +65,8 @@ class BSONObjBuilder { public: /** @param initsize this is just a hint as to the final size of the object */ BSONObjBuilder(int initsize = 512) - : _b(_buf), - _buf(sizeof(BSONObj::Holder) + initsize), - _offset(sizeof(BSONObj::Holder)), - _s(this), - _tracker(0), - _doneCalled(false) { - // Skip over space for a holder object at the beginning of the buffer, followed by - // space for the object length. The length is filled in by _done. - _b.skip(sizeof(BSONObj::Holder)); + : _b(_buf), _buf(initsize), _offset(0), _s(this), _tracker(0), _doneCalled(false) { + // Skip over space for the object length. The length is filled in by _done. _b.skip(sizeof(int)); // Reserve space for the EOO byte. This means _done() can't fail. @@ -119,13 +112,12 @@ public: BSONObjBuilder(const BSONSizeTracker& tracker) : _b(_buf), - _buf(sizeof(BSONObj::Holder) + tracker.getSize()), - _offset(sizeof(BSONObj::Holder)), + _buf(tracker.getSize()), + _offset(0), _s(this), _tracker(const_cast<BSONSizeTracker*>(&tracker)), _doneCalled(false) { // See the comments in the first constructor for details. - _b.skip(sizeof(BSONObj::Holder)); _b.skip(sizeof(int)); // Reserve space for the EOO byte. This means _done() can't fail. @@ -639,9 +631,7 @@ public: BSONObj obj() { massert(10335, "builder does not own memory", owned()); doneFast(); - char* buf = _b.buf(); - decouple(); - return BSONObj::takeOwnership(buf); + return BSONObj(_b.release()); } /** Fetch the object we have built. @@ -681,10 +671,6 @@ public: _doneCalled = true; } - void decouple() { - _b.decouple(); // post done() call version. be sure jsobj frees... - } - void appendKeys(const BSONObj& keyPattern, const BSONObj& values); static std::string numStr(int i) { diff --git a/src/mongo/bson/util/builder.h b/src/mongo/bson/util/builder.h index 53ad7d25ebe..2cce0d93ad5 100644 --- a/src/mongo/bson/util/builder.h +++ b/src/mongo/bson/util/builder.h @@ -39,12 +39,14 @@ #include "mongo/base/data_type_endian.h" #include "mongo/base/data_view.h" +#include "mongo/base/disallow_copying.h" #include "mongo/base/string_data.h" #include "mongo/bson/inline_decls.h" #include "mongo/platform/decimal128.h" #include "mongo/stdx/type_traits.h" #include "mongo/util/allocator.h" #include "mongo/util/assert_util.h" +#include "mongo/util/shared_buffer.h" namespace mongo { @@ -69,63 +71,80 @@ const int BufferMaxSize = 64 * 1024 * 1024; template <typename Allocator> class StringBuilderImpl; -class TrivialAllocator { +class SharedBufferAllocator { + MONGO_DISALLOW_COPYING(SharedBufferAllocator); + public: - void* Malloc(size_t sz) { - return mongoMalloc(sz); + SharedBufferAllocator() = default; + + void malloc(size_t sz) { + _buf = SharedBuffer::allocate(sz); + } + void realloc(size_t sz) { + _buf.realloc(sz); + } + void free() { + _buf = {}; } - void* Realloc(void* p, size_t sz) { - return mongoRealloc(p, sz); + SharedBuffer release() { + return std::move(_buf); } - void Free(void* p) { - free(p); + + char* get() const { + return _buf.get(); } + +private: + SharedBuffer _buf; }; class StackAllocator { + MONGO_DISALLOW_COPYING(StackAllocator); + public: + StackAllocator() = default; + enum { SZ = 512 }; - void* Malloc(size_t sz) { - if (sz <= SZ) - return buf; - return mongoMalloc(sz); - } - void* Realloc(void* p, size_t sz) { - if (p == buf) { - if (sz <= SZ) - return buf; - void* d = mongoMalloc(sz); - if (d == 0) - msgasserted(15912, "out of memory StackAllocator::Realloc"); - memcpy(d, p, SZ); - return d; + void malloc(size_t sz) { + if (sz > SZ) + _ptr = mongoMalloc(sz); + } + void realloc(size_t sz) { + if (_ptr == _buf) { + if (sz > SZ) { + _ptr = mongoMalloc(sz); + memcpy(_ptr, _buf, SZ); + } + } else { + _ptr = mongoRealloc(_ptr, sz); } - return mongoRealloc(p, sz); } - void Free(void* p) { - if (p != buf) - free(p); + void free() { + if (_ptr != _buf) + ::free(_ptr); + _ptr = _buf; + } + + // Not supported on this allocator. + void release() = delete; + + char* get() const { + return static_cast<char*>(_ptr); } private: - char buf[SZ]; + char _buf[SZ]; + void* _ptr = _buf; }; -template <class Allocator> +template <class BufferAllocator> class _BufBuilder { - // non-copyable, non-assignable - _BufBuilder(const _BufBuilder&); - _BufBuilder& operator=(const _BufBuilder&); - Allocator al; + MONGO_DISALLOW_COPYING(_BufBuilder); public: _BufBuilder(int initsize = 512) : size(initsize) { if (size > 0) { - data = (char*)al.Malloc(size); - if (data == 0) - msgasserted(10000, "out of memory BufBuilder"); - } else { - data = 0; + _buf.malloc(size); } l = 0; reservedBytes = 0; @@ -135,10 +154,7 @@ public: } void kill() { - if (data) { - al.Free(data); - data = 0; - } + _buf.free(); } void reset() { @@ -149,10 +165,8 @@ public: l = 0; reservedBytes = 0; if (maxSize && size > maxSize) { - al.Free(data); - data = (char*)al.Malloc(maxSize); - if (data == 0) - msgasserted(15913, "out of memory BufBuilder::reset"); + _buf.free(); + _buf.malloc(maxSize); size = maxSize; } } @@ -167,15 +181,15 @@ public: /* note this may be deallocated (realloced) if you keep writing. */ char* buf() { - return data; + return _buf.get(); } const char* buf() const { - return data; + return _buf.get(); } - /* assume ownership of the buffer - you must then free() it */ - void decouple() { - data = 0; + /* assume ownership of the buffer */ + SharedBuffer release() { + return _buf.release(); } void appendUChar(unsigned char j) { @@ -268,7 +282,7 @@ public: grow_reallocate(minSize); } l = newLen; - return data + oldlen; + return _buf.get() + oldlen; } /** @@ -313,24 +327,22 @@ private: ss << "BufBuilder attempted to grow() to " << a << " bytes, past the 64MB limit."; msgasserted(13548, ss.str().c_str()); } - data = (char*)al.Realloc(data, a); - if (data == NULL) - msgasserted(16070, "out of memory BufBuilder::grow_reallocate"); + _buf.realloc(a); size = a; } - char* data; + BufferAllocator _buf; int l; int size; int reservedBytes; // eagerly grow_reallocate to keep this many bytes of spare room. - friend class StringBuilderImpl<Allocator>; + friend class StringBuilderImpl<BufferAllocator>; }; -typedef _BufBuilder<TrivialAllocator> BufBuilder; +typedef _BufBuilder<SharedBufferAllocator> BufBuilder; /** The StackBufBuilder builds smaller datasets on the stack instead of using malloc. - this can be significantly faster for small bufs. However, you can not decouple() the + this can be significantly faster for small bufs. However, you can not release() the buffer with StackBufBuilder. While designed to be a variable on the stack, if you were to dynamically allocate one, nothing bad would happen. In fact in some circumstances this might make sense, say, @@ -339,7 +351,7 @@ typedef _BufBuilder<TrivialAllocator> BufBuilder; class StackBufBuilder : public _BufBuilder<StackAllocator> { public: StackBufBuilder() : _BufBuilder<StackAllocator>(StackAllocator::SZ) {} - void decouple(); // not allowed. not implemented. + void release() = delete; // not allowed. not implemented. }; /** std::stringstream deals with locale so this is a lot faster than std::stringstream for UTF8 */ @@ -426,7 +438,7 @@ public: } std::string str() const { - return std::string(_buf.data, _buf.l); + return std::string(_buf.buf(), _buf.l); } /** size of current std::string */ @@ -452,6 +464,6 @@ private: } }; -typedef StringBuilderImpl<TrivialAllocator> StringBuilder; +typedef StringBuilderImpl<SharedBufferAllocator> StringBuilder; typedef StringBuilderImpl<StackAllocator> StackStringBuilder; } // namespace mongo diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp index fc9f4fd36d6..49b782dd2de 100644 --- a/src/mongo/client/dbclient.cpp +++ b/src/mongo/client/dbclient.cpp @@ -712,7 +712,7 @@ void DBClientInterface::findN(vector<BSONObj>& out, for (int i = 0; i < nToReturn; i++) { if (!c->more()) break; - out.push_back(c->nextSafe().copy()); + out.push_back(c->nextSafeOwned()); } } diff --git a/src/mongo/client/dbclientcursor.h b/src/mongo/client/dbclientcursor.h index 7363cdb1453..dd3320550af 100644 --- a/src/mongo/client/dbclientcursor.h +++ b/src/mongo/client/dbclientcursor.h @@ -97,6 +97,11 @@ public: /** throws AssertionException if get back { $err : ... } */ BSONObj nextSafe(); + BSONObj nextSafeOwned() { + BSONObj out = nextSafe(); + out.shareOwnershipWith(batch.m.sharedBuffer()); + return out; + } /** peek ahead at items buffered for future next() calls. never requests new data from the server. so peek only effective diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index 57fef47f068..b042a6dc0e9 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -62,6 +62,7 @@ const char* kNextBatchFieldName = "nextBatch"; Status parseCursorResponse(const BSONObj& obj, const std::string& batchFieldName, Fetcher::QueryResponse* batchData) { + invariant(obj.isOwned()); invariant(batchFieldName == kFirstBatchFieldName || batchFieldName == kNextBatchFieldName); invariant(batchData); @@ -149,7 +150,11 @@ Status parseCursorResponse(const BSONObj& obj, << "' field: " << obj); } - batchData->documents.push_back(itemElement.Obj().getOwned()); + batchData->documents.push_back(itemElement.Obj()); + } + + for (auto& doc : batchData->documents) { + doc.shareOwnershipWith(obj); } return Status::OK(); diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 74f941a4dd0..0fafccb9649 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -202,7 +202,7 @@ public: string ns = dbresponse.exhaustNS; // before reset() free's it... m.reset(); BufBuilder b(512); - b.appendNum((int)0 /*size set later in appendData()*/); + b.appendNum((int)0 /*size set later*/); b.appendNum(header.getId()); b.appendNum(header.getResponseToMsgId()); b.appendNum((int)dbGetMore); @@ -210,8 +210,10 @@ public: b.appendStr(ns); b.appendNum((int)0); // ntoreturn b.appendNum(cursorid); - m.appendData(b.buf(), b.len()); - b.decouple(); + + MsgData::View header = b.buf(); + header.setLen(b.len()); + m.setData(b.release()); DEV log() << "exhaust=true sending more"; continue; // this goes back to top loop } diff --git a/src/mongo/db/dbdirectclient.cpp b/src/mongo/db/dbdirectclient.cpp index 763b6e3dde2..06c06991722 100644 --- a/src/mongo/db/dbdirectclient.cpp +++ b/src/mongo/db/dbdirectclient.cpp @@ -126,9 +126,6 @@ bool DBDirectClient::call(Message& toSend, Message& response, bool assertOk, str CurOp curOp(_txn); assembleResponse(_txn, toSend, dbResponse, dummyHost); verify(!dbResponse.response.empty()); - - // can get rid of this if we make response handling smarter - dbResponse.response.concat(); response = std::move(dbResponse.response); return true; diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp index 61a4d1509ac..67a5548dbfb 100644 --- a/src/mongo/db/dbmessage.cpp +++ b/src/mongo/db/dbmessage.cpp @@ -198,8 +198,7 @@ void OpQueryReplyBuilder::putInMessage( qr.setCursorId(cursorId); qr.setStartingFrom(startingFrom); qr.setNReturned(nReturned); - _buffer.decouple(); - out->setData(qr.view2ptr(), true); // transport will free + out->setData(_buffer.release()); // transport will free } void replyToQuery(int queryResultFlags, diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index 3961beae740..d3d1a1c34de 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -174,7 +174,6 @@ void generateLegacyQueryErrorResponse(const AssertionException* exception, // TODO: call replyToQuery() from here instead of this!!! see dbmessage.h QueryResult::View msgdata = bb.buf(); - bb.decouple(); QueryResult::View qr = msgdata; qr.setResultFlags(ResultFlag_ErrSet); if (scex) @@ -184,7 +183,7 @@ void generateLegacyQueryErrorResponse(const AssertionException* exception, qr.setCursorId(0); qr.setStartingFrom(0); qr.setNReturned(1); - response->setData(msgdata.view2ptr(), true); + response->setData(bb.release()); } /** @@ -469,7 +468,6 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, } bool exhaust = false; - QueryResult::View msgdata = 0; bool isCursorAuthorized = false; try { @@ -487,7 +485,7 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, sleepmillis(0); } - msgdata = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); + dbresponse.response = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); } catch (AssertionException& e) { if (isCursorAuthorized) { // If a cursor with id 'cursorid' was authorized, it may have been advanced @@ -510,9 +508,9 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, return false; } - dbresponse.response.setData(msgdata.view2ptr(), true); curop.debug().responseLength = dbresponse.response.header().dataLen(); - curop.debug().nreturned = msgdata.getNReturned(); + auto queryResult = QueryResult::ConstView(dbresponse.response.buf()); + curop.debug().nreturned = queryResult.getNReturned(); dbresponse.responseToMsgId = m.header().getId(); diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index 33a1661ea0b..7514adfa175 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -166,10 +166,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo << elt}; } - batch.push_back(elt.Obj().getOwned()); + batch.push_back(elt.Obj()); } - return {{NamespaceString(fullns), cursorId, batch}}; + for (auto& doc : batch) { + doc.shareOwnershipWith(cmdResponse); + } + + return {{NamespaceString(fullns), cursorId, std::move(batch)}}; } void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 6e501242cee..e5b6646dae6 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -223,12 +223,12 @@ void generateBatch(int ntoreturn, /** * Called by db/instance.cpp. This is the getMore entry point. */ -QueryResult::View getMore(OperationContext* txn, - const char* ns, - int ntoreturn, - long long cursorid, - bool* exhaust, - bool* isCursorAuthorized) { +Message getMore(OperationContext* txn, + const char* ns, + int ntoreturn, + long long cursorid, + bool* exhaust, + bool* isCursorAuthorized) { invariant(ntoreturn >= 0); CurOp& curOp = *CurOp::get(txn); @@ -485,9 +485,8 @@ QueryResult::View getMore(OperationContext* txn, qr.setCursorId(cursorid); qr.setStartingFrom(startingResult); qr.setNReturned(numResults); - bb.decouple(); LOG(5) << "getMore returned " << numResults << " results\n"; - return qr; + return Message(bb.release()); } std::string runQuery(OperationContext* txn, @@ -543,7 +542,6 @@ std::string runQuery(OperationContext* txn, // Set query result fields. QueryResult::View qr = bb.buf(); - bb.decouple(); qr.setResultFlagsToOk(); qr.msgdata().setLen(bb.len()); curOp.debug().responseLength = bb.len(); @@ -551,7 +549,7 @@ std::string runQuery(OperationContext* txn, qr.setCursorId(0); qr.setStartingFrom(0); qr.setNReturned(1); - result.setData(qr.view2ptr(), true); + result.setData(bb.release()); return ""; } @@ -686,18 +684,18 @@ std::string runQuery(OperationContext* txn, endQueryOp(txn, collection, *exec, numResults, ccId); } - // Add the results from the query into the output buffer. - result.appendData(bb.buf(), bb.len()); - bb.decouple(); - // Fill out the output buffer's header. - QueryResult::View queryResultView = result.header().view2ptr(); + QueryResult::View queryResultView = bb.buf(); queryResultView.setCursorId(ccId); queryResultView.setResultFlagsToOk(); + queryResultView.msgdata().setLen(bb.len()); queryResultView.msgdata().setOperation(opReply); queryResultView.setStartingFrom(0); queryResultView.setNReturned(numResults); + // Add the results from the query into the output buffer. + result.setData(bb.release()); + // curOp.debug().exhaust is set above. return curOp.debug().exhaust ? nss.ns() : ""; } diff --git a/src/mongo/db/query/find.h b/src/mongo/db/query/find.h index 4b968e58419..e6c8160b5cb 100644 --- a/src/mongo/db/query/find.h +++ b/src/mongo/db/query/find.h @@ -109,13 +109,14 @@ StatusWith<std::unique_ptr<PlanExecutor>> getOplogStartHack(OperationContext* tx /** * Called from the getMore entry point in ops/query.cpp. + * Returned buffer is the message to return to the client. */ -QueryResult::View getMore(OperationContext* txn, - const char* ns, - int ntoreturn, - long long cursorid, - bool* exhaust, - bool* isCursorAuthorized); +Message getMore(OperationContext* txn, + const char* ns, + int ntoreturn, + long long cursorid, + bool* exhaust, + bool* isCursorAuthorized); /** * Run the query 'q' and place the result in 'result'. diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp index c2fee263c60..ddfd61cf6f8 100644 --- a/src/mongo/executor/async_mock_stream_factory.cpp +++ b/src/mongo/executor/async_mock_stream_factory.cpp @@ -280,7 +280,8 @@ void AsyncMockStreamFactory::MockStream::simulateServer( WriteEvent write{this}; std::vector<uint8_t> messageData = popWrite(); - Message msg(messageData.data(), false); + Message msg(SharedBuffer::allocate(messageData.size())); + memcpy(msg.buf(), messageData.data(), messageData.size()); auto parsedRequest = rpc::makeRequest(&msg); ASSERT(parsedRequest->getProtocol() == proto); diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index f6812fd9c5a..10cbcda7115 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -115,7 +115,7 @@ void asyncRecvMessageBody(AsyncStreamInterface& stream, int z = (len + 1023) & 0xfffffc00; invariant(z >= len); - m->setData(reinterpret_cast<char*>(mongoMalloc(z)), true); + m->setData(SharedBuffer::allocate(z)); MsgData::View mdView = m->buf(); // copy header data into master buffer diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp index 2be37ef9779..bf96e029865 100644 --- a/src/mongo/executor/network_interface_asio_test.cpp +++ b/src/mongo/executor/network_interface_asio_test.cpp @@ -469,8 +469,8 @@ public: // Get the appropriate message id WriteEvent write{stream}; std::vector<uint8_t> messageData = stream->popWrite(); - Message msg(messageData.data(), false); - messageId = msg.header().getId(); + messageId = + MsgData::ConstView(reinterpret_cast<const char*>(messageData.data())).getId(); } // Build a mock reply message diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h index 3e997e448de..742daea76c1 100644 --- a/src/mongo/executor/remote_command_response.h +++ b/src/mongo/executor/remote_command_response.h @@ -52,16 +52,28 @@ struct RemoteCommandResponse { RemoteCommandResponse() = default; RemoteCommandResponse(BSONObj dataObj, BSONObj metadataObj, Milliseconds millis) - : data(std::move(dataObj)), metadata(std::move(metadataObj)), elapsedMillis(millis) {} - - RemoteCommandResponse(Message message, + : data(std::move(dataObj)), metadata(std::move(metadataObj)), elapsedMillis(millis) { + // The buffer backing the default empty BSONObj has static duration so it is effectively + // owned. + invariant(data.isOwned() || data.objdata() == BSONObj().objdata()); + invariant(metadata.isOwned() || metadata.objdata() == BSONObj().objdata()); + } + + RemoteCommandResponse(Message messageArg, BSONObj dataObj, BSONObj metadataObj, Milliseconds millis) - : message(std::make_shared<const Message>(std::move(message))), + : message(std::make_shared<const Message>(std::move(messageArg))), data(std::move(dataObj)), metadata(std::move(metadataObj)), - elapsedMillis(millis) {} + elapsedMillis(millis) { + if (!data.isOwned()) { + data.shareOwnershipWith(message->sharedBuffer()); + } + if (!metadata.isOwned()) { + metadata.shareOwnershipWith(message->sharedBuffer()); + } + } RemoteCommandResponse(const rpc::ReplyInterface& rpcReply, Milliseconds millis); @@ -71,8 +83,8 @@ struct RemoteCommandResponse { bool operator!=(const RemoteCommandResponse& rhs) const; std::shared_ptr<const Message> message; // May be null. - BSONObj data; // Either owned or points into message. - BSONObj metadata; // Either owned or points into message. + BSONObj data; // Always owned. May point into message. + BSONObj metadata; // Always owned. May point into message. Milliseconds elapsedMillis = {}; }; diff --git a/src/mongo/rpc/command_reply.cpp b/src/mongo/rpc/command_reply.cpp index b6d3a98586f..172e7ac1ebe 100644 --- a/src/mongo/rpc/command_reply.cpp +++ b/src/mongo/rpc/command_reply.cpp @@ -53,7 +53,9 @@ CommandReply::CommandReply(const Message* message) : _message(message) { ConstDataRangeCursor cur(begin, messageEnd); _commandReply = uassertStatusOK(cur.readAndAdvance<Validated<BSONObj>>()).val; + _commandReply.shareOwnershipWith(message->sharedBuffer()); _metadata = uassertStatusOK(cur.readAndAdvance<Validated<BSONObj>>()).val; + _metadata.shareOwnershipWith(message->sharedBuffer()); _outputDocs = DocumentRange(cur.data(), messageEnd); } diff --git a/src/mongo/rpc/command_reply_builder.cpp b/src/mongo/rpc/command_reply_builder.cpp index 15b70123708..f5b48da87f9 100644 --- a/src/mongo/rpc/command_reply_builder.cpp +++ b/src/mongo/rpc/command_reply_builder.cpp @@ -110,8 +110,7 @@ Message CommandReplyBuilder::done() { MsgData::View msg = _builder.buf(); msg.setLen(_builder.len()); msg.setOperation(dbCommandReply); - _builder.decouple(); // release ownership from BufBuilder - _message.setData(msg.view2ptr(), true); // transfer ownership to Message + _message.setData(_builder.release()); _state = State::kDone; return std::move(_message); } diff --git a/src/mongo/rpc/command_request_builder.cpp b/src/mongo/rpc/command_request_builder.cpp index 3945b6219a4..c78e493df05 100644 --- a/src/mongo/rpc/command_request_builder.cpp +++ b/src/mongo/rpc/command_request_builder.cpp @@ -101,8 +101,7 @@ Message CommandRequestBuilder::done() { MsgData::View msg = _builder.buf(); msg.setLen(_builder.len()); msg.setOperation(dbCommand); - _builder.decouple(); // release ownership from BufBuilder. - _message.setData(msg.view2ptr(), true); // transfer ownership to Message. + _message.setData(_builder.release()); // transfer ownership to Message. _state = State::kDone; return std::move(_message); } diff --git a/src/mongo/rpc/legacy_reply_builder.cpp b/src/mongo/rpc/legacy_reply_builder.cpp index 393c3bb3519..5880121ae66 100644 --- a/src/mongo/rpc/legacy_reply_builder.cpp +++ b/src/mongo/rpc/legacy_reply_builder.cpp @@ -158,8 +158,7 @@ Message LegacyReplyBuilder::done() { qr.setStartingFrom(0); qr.setNReturned(1); - _message.setData(qr.view2ptr(), true); - _builder.decouple(); + _message.setData(_builder.release()); _state = State::kDone; return std::move(_message); diff --git a/src/mongo/rpc/legacy_request_builder.cpp b/src/mongo/rpc/legacy_request_builder.cpp index c2155537611..4e126f6117f 100644 --- a/src/mongo/rpc/legacy_request_builder.cpp +++ b/src/mongo/rpc/legacy_request_builder.cpp @@ -114,8 +114,7 @@ Message LegacyRequestBuilder::done() { MsgData::View msg = _builder.buf(); msg.setLen(_builder.len()); msg.setOperation(dbQuery); - _builder.decouple(); // release ownership from BufBuilder - _message.setData(msg.view2ptr(), true); // transfer ownership to Message + _message.setData(_builder.release()); _state = State::kDone; return std::move(_message); } diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index deb4bf34f9f..1f2dbdf6c7c 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -58,6 +58,10 @@ public: * Returns the next query result, or an error. * * If there are no more results, returns boost::none. + * + * All returned BSONObjs are owned. They may own a buffer larger than the object. If you are + * holding on to a subset of the returned results and need to minimize memory usage, call copy() + * on the BSONObjs. */ virtual StatusWith<boost::optional<BSONObj>> next() = 0; diff --git a/src/mongo/tools/sniffer.cpp b/src/mongo/tools/sniffer.cpp index 255d41c1394..b5081a361ed 100644 --- a/src/mongo/tools/sniffer.cpp +++ b/src/mongo/tools/sniffer.cpp @@ -82,6 +82,7 @@ using mongo::BufBuilder; using mongo::DBClientConnection; using mongo::MemoryMappedFile; using std::string; +namespace MsgData = mongo::MsgData; #define SNAP_LEN 65535 @@ -176,6 +177,13 @@ map<Connection, map<long long, long long>> mapCursor; void processMessage(Connection& c, Message& d); +Message copyToMessage(const char* source) { + auto msgLen = MsgData::ConstView(source).getLen(); + auto msgData = mongo::SharedBuffer::allocate(msgLen); + memcpy(msgData.get(), source, msgLen); + return Message(std::move(msgData)); +} + void got_packet(u_char* args, const struct pcap_pkthdr* header, const u_char* packet) { const struct sniff_ip* ip = (struct sniff_ip*)(packet + captureHeaderSize); int size_ip = IP_HL(ip) * 4; @@ -225,7 +233,7 @@ void got_packet(u_char* args, const struct pcap_pkthdr* header, const u_char* pa Message m; if (bytesRemainingInMessage[c] == 0) { - m.setData(const_cast<char*>(reinterpret_cast<const char*>(payload)), false); + m = copyToMessage(reinterpret_cast<const char*>(payload)); if (!m.header().valid()) { cerr << "Invalid message start, skipping packet." << endl; return; @@ -251,8 +259,7 @@ void got_packet(u_char* args, const struct pcap_pkthdr* header, const u_char* pa } if (bytesRemainingInMessage[c] > 0) return; - m.setData(messageBuilder[c]->buf(), true); - messageBuilder[c]->decouple(); + m.setData(messageBuilder[c]->release()); messageBuilder[c].reset(); } @@ -454,7 +461,7 @@ void processDiagLog(const char* file) { long read = 0; while (read < length) { - Message m(pos, false); + Message m = copyToMessage(pos); int len = m.header().getLen(); DbMessage d(m); cout << len << " " << d.getns() << endl; diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp index 778bfcc3758..6cfe3606043 100644 --- a/src/mongo/transport/service_entry_point_mock.cpp +++ b/src/mongo/transport/service_entry_point_mock.cpp @@ -65,8 +65,7 @@ void setOkResponse(Message* m) { // Set the message, transfer buffer ownership to Message m->reset(); - m->setData(msg.view2ptr(), true); - b.decouple(); + m->setData(b.release()); } } // namespace diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h index 582de6178d1..ceeca2a0985 100644 --- a/src/mongo/transport/service_entry_point_mock.h +++ b/src/mongo/transport/service_entry_point_mock.h @@ -28,6 +28,8 @@ #pragma once +#include <vector> + #include "mongo/base/disallow_copying.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp index d155d3b24b5..e4f93ae3344 100644 --- a/src/mongo/transport/service_entry_point_test_suite.cpp +++ b/src/mongo/transport/service_entry_point_test_suite.cpp @@ -86,8 +86,7 @@ void setPingCommand(Message* m) { m->reset(); // Transfer buffer ownership to the Message. - m->setData(msg.view2ptr(), true); - b.decouple(); + m->setData(b.release()); } // Some default method implementations diff --git a/src/mongo/util/duration.cpp b/src/mongo/util/duration.cpp index e3a29e858dc..9d7137ab259 100644 --- a/src/mongo/util/duration.cpp +++ b/src/mongo/util/duration.cpp @@ -129,16 +129,16 @@ template StringBuilderImpl<StackAllocator>& operator<<(StringBuilderImpl<StackAl template StringBuilderImpl<StackAllocator>& operator<<(StringBuilderImpl<StackAllocator>&, Seconds); template StringBuilderImpl<StackAllocator>& operator<<(StringBuilderImpl<StackAllocator>&, Minutes); template StringBuilderImpl<StackAllocator>& operator<<(StringBuilderImpl<StackAllocator>&, Hours); -template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&, - Nanoseconds); -template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&, - Microseconds); -template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&, - Milliseconds); -template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&, - Seconds); -template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&, - Minutes); -template StringBuilderImpl<TrivialAllocator>& operator<<(StringBuilderImpl<TrivialAllocator>&, - Hours); +template StringBuilderImpl<SharedBufferAllocator>& operator<<( + StringBuilderImpl<SharedBufferAllocator>&, Nanoseconds); +template StringBuilderImpl<SharedBufferAllocator>& operator<<( + StringBuilderImpl<SharedBufferAllocator>&, Microseconds); +template StringBuilderImpl<SharedBufferAllocator>& operator<<( + StringBuilderImpl<SharedBufferAllocator>&, Milliseconds); +template StringBuilderImpl<SharedBufferAllocator>& operator<<( + StringBuilderImpl<SharedBufferAllocator>&, Seconds); +template StringBuilderImpl<SharedBufferAllocator>& operator<<( + StringBuilderImpl<SharedBufferAllocator>&, Minutes); +template StringBuilderImpl<SharedBufferAllocator>& operator<<( + StringBuilderImpl<SharedBufferAllocator>&, Hours); } // namespace mongo diff --git a/src/mongo/util/net/asio_message_port.cpp b/src/mongo/util/net/asio_message_port.cpp index 9ba852003dc..542bfab282c 100644 --- a/src/mongo/util/net/asio_message_port.cpp +++ b/src/mongo/util/net/asio_message_port.cpp @@ -303,8 +303,8 @@ bool ASIOMessagingPort::recv(Message& m) { if (getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->shouldFail()) { throw SocketException(SocketException::RECV_ERROR, "fail point set"); } - MsgData::View md = reinterpret_cast<char*>(mongoMalloc(kInitialMessageSize)); - ScopeGuard guard = MakeGuard([&md]() { free(md.view2ptr()); }); + SharedBuffer buf = SharedBuffer::allocate(kInitialMessageSize); + MsgData::View md = buf.get(); asio::error_code ec = _read(md.view2ptr(), kHeaderLen); if (ec) { @@ -378,7 +378,8 @@ bool ASIOMessagingPort::recv(Message& m) { } if (msgLen > kInitialMessageSize) { - md = reinterpret_cast<char*>(mongoRealloc(md.view2ptr(), msgLen)); + buf.realloc(msgLen); + md = buf.get(); } ec = _read(md.data(), msgLen - kHeaderLen); @@ -386,8 +387,7 @@ bool ASIOMessagingPort::recv(Message& m) { throw asio::system_error(ec); } - guard.Dismiss(); - m.setData(md.view2ptr(), true); + m.setData(std::move(buf)); return true; } catch (const asio::system_error& e) { @@ -428,8 +428,6 @@ void ASIOMessagingPort::say(Message& toSend, int responseTo) { auto buf = toSend.buf(); if (buf) { send(buf, MsgData::ConstView(buf).getLen(), nullptr); - } else { - send(toSend.dataBuffers(), nullptr); } } diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h index 2c28aabe5cf..37a6c0bb66b 100644 --- a/src/mongo/util/net/message.h +++ b/src/mongo/util/net/message.h @@ -30,7 +30,6 @@ #pragma once #include <cstdint> -#include <vector> #include "mongo/base/data_type_endian.h" #include "mongo/base/data_view.h" @@ -399,31 +398,13 @@ inline int ConstView::dataLen() const { } // namespace MsgData class Message { - MONGO_DISALLOW_COPYING(Message); - public: - using MsgVec = std::vector<std::pair<char*, int>>; - - // we assume here that a vector with initial size 0 does no allocation (0 is the default, but - // wanted to make it explicit). Message() = default; - - Message(void* data, bool freeIt) : _buf(reinterpret_cast<char*>(data)), _freeIt(freeIt) {} - - Message(Message&& r) : _buf(r._buf), _data(std::move(r._data)), _freeIt(r._freeIt) { - r._buf = nullptr; - r._freeIt = false; - } - - ~Message() { - reset(); - } - - SockAddr _from; + explicit Message(SharedBuffer data) : _buf(std::move(data)) {} MsgData::View header() const { verify(!empty()); - return _buf ? _buf : _data[0].first; + return _buf.get(); } NetworkOp operation() const { @@ -436,110 +417,28 @@ public: } bool empty() const { - return !_buf && _data.empty(); + return !_buf; } int size() const { - int res = 0; if (_buf) { - res = MsgData::ConstView(_buf).getLen(); - } else { - for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) { - res += it->second; - } + return MsgData::ConstView(_buf.get()).getLen(); } - return res; + return 0; } int dataSize() const { return size() - sizeof(MSGHEADER::Value); } - // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy - // can get rid of this if we make response handling smarter - void concat() { - if (_buf || empty()) { - return; - } - - verify(_freeIt); - int totalSize = 0; - for (std::vector<std::pair<char*, int>>::const_iterator i = _data.begin(); i != _data.end(); - ++i) { - totalSize += i->second; - } - char* buf = (char*)mongoMalloc(totalSize); - char* p = buf; - for (std::vector<std::pair<char*, int>>::const_iterator i = _data.begin(); i != _data.end(); - ++i) { - memcpy(p, i->first, i->second); - p += i->second; - } - reset(); - _setData(buf, true); - } - - Message& operator=(Message&& r) { - // This implementation was originally written using an auto_ptr-style fake move. When - // converting to a real move assignment, checking for self-assignment was the simplest way - // to ensure correctness. - if (&r == this) - return *this; - - if (!empty()) { - reset(); - } - - _buf = r._buf; - _data = std::move(r._data); - _freeIt = r._freeIt; - - r._buf = nullptr; - r._freeIt = false; - return *this; - } - void reset() { - if (_freeIt) { - if (_buf) { - std::free(_buf); - } - for (std::vector<std::pair<char*, int>>::const_iterator i = _data.begin(); - i != _data.end(); - ++i) { - std::free(i->first); - } - } - _buf = nullptr; - _data.clear(); - _freeIt = false; - } - - // use to add a buffer - // assumes message will free everything - void appendData(char* d, int size) { - if (size <= 0) { - return; - } - if (empty()) { - MsgData::View md = d; - md.setLen(size); // can be updated later if more buffers added - _setData(md.view2ptr(), true); - return; - } - verify(_freeIt); - if (_buf) { - _data.push_back(std::make_pair(_buf, MsgData::ConstView(_buf).getLen())); - _buf = 0; - } - _data.push_back(std::make_pair(d, size)); - header().setLen(header().getLen() + size); + _buf = {}; } // use to set first buffer if empty - void setData(char* d, bool freeIt) { + void setData(SharedBuffer buf) { verify(empty()); - _setData(d, freeIt); + _buf = std::move(buf); } void setData(int operation, const char* msgtxt) { setData(operation, msgtxt, strlen(msgtxt) + 1); @@ -547,39 +446,29 @@ public: void setData(int operation, const char* msgdata, size_t len) { verify(empty()); size_t dataLen = len + sizeof(MsgData::Value) - 4; - MsgData::View d = reinterpret_cast<char*>(mongoMalloc(dataLen)); + _buf = SharedBuffer::allocate(dataLen); + MsgData::View d = _buf.get(); if (len) memcpy(d.data(), msgdata, len); d.setLen(dataLen); d.setOperation(operation); - _setData(d.view2ptr(), true); } - bool doIFreeIt() { - return _freeIt; + char* buf() { + return _buf.get(); } - char* buf() { + std::string toString() const; + + SharedBuffer sharedBuffer() { return _buf; } - - const MsgVec& dataBuffers() const { - return _data; + ConstSharedBuffer sharedBuffer() const { + return _buf; } - std::string toString() const; - private: - void _setData(char* d, bool freeIt) { - _freeIt = freeIt; - _buf = d; - } - // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data - char* _buf{nullptr}; - // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage - // instead - MsgVec _data{}; - bool _freeIt{false}; + SharedBuffer _buf; }; diff --git a/src/mongo/util/net/message_port.cpp b/src/mongo/util/net/message_port.cpp index 69e33377e59..5051962ed5d 100644 --- a/src/mongo/util/net/message_port.cpp +++ b/src/mongo/util/net/message_port.cpp @@ -184,17 +184,14 @@ bool MessagingPort::recv(Message& m) { _psock->setHandshakeReceived(); int z = (len + 1023) & 0xfffffc00; verify(z >= len); - MsgData::View md = reinterpret_cast<char*>(mongoMalloc(z)); - ScopeGuard guard = MakeGuard(free, md.view2ptr()); - verify(md.view2ptr()); - + auto buf = SharedBuffer::allocate(z); + MsgData::View md = buf.get(); memcpy(md.view2ptr(), &header, headerLen); int left = len - headerLen; _psock->recv(md.data(), left); - guard.Dismiss(); - m.setData(md.view2ptr(), true); + m.setData(std::move(buf)); return true; } catch (const SocketException& e) { @@ -234,8 +231,6 @@ void MessagingPort::say(Message& toSend, int responseTo) { auto buf = toSend.buf(); if (buf) { send(buf, MsgData::ConstView(buf).getLen(), "say"); - } else { - send(toSend.dataBuffers(), "say"); } } diff --git a/src/mongo/util/shared_buffer.h b/src/mongo/util/shared_buffer.h index 6cb8103ded4..09951dc80dc 100644 --- a/src/mongo/util/shared_buffer.h +++ b/src/mongo/util/shared_buffer.h @@ -32,9 +32,13 @@ #include "mongo/platform/atomic_word.h" #include "mongo/util/allocator.h" +#include "mongo/util/assert_util.h" namespace mongo { +/** + * A mutable, ref-counted buffer. + */ class SharedBuffer { public: SharedBuffer() = default; @@ -44,30 +48,39 @@ public: } static SharedBuffer allocate(size_t bytes) { - return takeOwnership(static_cast<char*>(mongoMalloc(sizeof(Holder) + bytes))); + return takeOwnership(mongoMalloc(sizeof(Holder) + bytes)); } /** - * Given a pointer to a region of un-owned data, prefixed by sufficient space for a - * SharedBuffer::Holder object, return an SharedBuffer that owns the - * memory. + * Resizes the buffer, copying the current contents. * - * This class will call free(holderPrefixedData), so it must have been allocated in a way - * that makes that valid. + * Like ::realloc() this can be called on a null SharedBuffer. + * + * This method is illegal to call if any other SharedBuffer instances share this buffer since + * they wouldn't be updated and would still try to delete the original buffer. */ - static SharedBuffer takeOwnership(char* holderPrefixedData) { - // Initialize the refcount to 1 so we don't need to increment it in the constructor - // (see private Holder* constructor below). - // - // TODO: Should dassert alignment of holderPrefixedData - // here if possible. - return SharedBuffer(new (holderPrefixedData) Holder(1U)); + void realloc(size_t size) { + invariant(!_holder || !_holder->isShared()); + + const size_t realSize = size + sizeof(Holder); + void* newPtr = mongoRealloc(_holder.get(), realSize); + + // Get newPtr into _holder with a ref-count of 1 without touching the current pointee of + // _holder which is now invalid. + auto tmp = SharedBuffer::takeOwnership(newPtr); + _holder.detach(); + _holder = std::move(tmp._holder); } char* get() const { return _holder ? _holder->data() : NULL; } + explicit operator bool() const { + return bool(_holder); + } + +private: class Holder { public: explicit Holder(AtomicUInt32::WordType initial = AtomicUInt32::WordType()) @@ -95,14 +108,31 @@ public: return reinterpret_cast<const char*>(this + 1); } - private: + bool isShared() const { + return _refCount.load() > 1; + } + AtomicUInt32 _refCount; }; -private: explicit SharedBuffer(Holder* holder) : _holder(holder, /*add_ref=*/false) { // NOTE: The 'false' above is because we have already initialized the Holder with a - // refcount of '1' in takeOwnership above. This avoids an atomic increment. + // refcount of '1' in takeOwnership below. This avoids an atomic increment. + } + + /** + * Given a pointer to a region of un-owned data, prefixed by sufficient space for a + * SharedBuffer::Holder object, return an SharedBuffer that owns the memory. + * + * This class will call free(holderPrefixedData), so it must have been allocated in a way + * that makes that valid. + */ + static SharedBuffer takeOwnership(void* holderPrefixedData) { + // Initialize the refcount to 1 so we don't need to increment it in the constructor + // (see private Holder* constructor above). + // + // TODO: Should dassert alignment of holderPrefixedData here if possible. + return SharedBuffer(new (holderPrefixedData) Holder(1U)); } boost::intrusive_ptr<Holder> _holder; @@ -111,4 +141,34 @@ private: inline void swap(SharedBuffer& one, SharedBuffer& two) { one.swap(two); } + +/** + * A constant view into a ref-counted buffer. + * + * Use SharedBuffer to allocate since allocating a const buffer is useless. + */ +class ConstSharedBuffer { +public: + ConstSharedBuffer() = default; + /*implicit*/ ConstSharedBuffer(SharedBuffer source) : _buffer(std::move(source)) {} + + void swap(ConstSharedBuffer& other) { + _buffer.swap(other._buffer); + } + + const char* get() const { + return _buffer.get(); + } + + explicit operator bool() const { + return bool(_buffer); + } + +private: + SharedBuffer _buffer; +}; + +inline void swap(ConstSharedBuffer& one, ConstSharedBuffer& two) { + one.swap(two); +} } |