diff options
author | Benety Goh <benety@mongodb.com> | 2016-10-20 10:34:15 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-10-23 17:25:10 -0400 |
commit | ab1ee41ecf1c96ae8b17a2b1da1c7ee9b8c58676 (patch) | |
tree | 539ec205f2a5065222f017dce331fe85305317ad | |
parent | f50bc8e26d5720f396149286ebb31e8c3c51df4a (diff) | |
download | mongo-ab1ee41ecf1c96ae8b17a2b1da1c7ee9b8c58676.tar.gz |
SERVER-26191 OplogBufferCollection supports batch reads from collection
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.cpp | 57 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.h | 33 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection_test.cpp | 95 |
3 files changed, 168 insertions, 17 deletions
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp index f8bad286d0f..1c74963cc14 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection.cpp @@ -36,6 +36,7 @@ #include <numeric> #include "mongo/base/string_data.h" +#include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/util/assert_util.h" @@ -76,17 +77,22 @@ BSONObj OplogBufferCollection::extractEmbeddedOplogDocument(const BSONObj& orig) } -OplogBufferCollection::OplogBufferCollection(StorageInterface* storageInterface) - : OplogBufferCollection(storageInterface, getDefaultNamespace()) {} +OplogBufferCollection::OplogBufferCollection(StorageInterface* storageInterface, Options options) + : OplogBufferCollection(storageInterface, getDefaultNamespace(), std::move(options)) {} OplogBufferCollection::OplogBufferCollection(StorageInterface* storageInterface, - const NamespaceString& nss) - : _storageInterface(storageInterface), _nss(nss), _count(0), _size(0) {} + const NamespaceString& nss, + Options options) + : _storageInterface(storageInterface), _nss(nss), _options(std::move(options)) {} NamespaceString OplogBufferCollection::getNamespace() const { return _nss; } +OplogBufferCollection::Options OplogBufferCollection::getOptions() const { + return _options; +} + void OplogBufferCollection::startup(OperationContext* txn) { clear(txn); } @@ -168,6 +174,7 @@ void OplogBufferCollection::clear(OperationContext* txn) { _sentinelCount = 0; _lastPushedTimestamp = {}; _lastPoppedKey = {}; + _peekCache = std::queue<BSONObj>(); } bool OplogBufferCollection::tryPop(OperationContext* txn, Value* value) { @@ -220,6 +227,11 @@ bool OplogBufferCollection::_pop_inlock(OperationContext* txn, Value* value) { _peek_inlock(txn, PeekMode::kReturnUnmodifiedDocumentFromCollection); _lastPoppedKey = docFromCollection["_id"].wrap(""); *value = extractEmbeddedOplogDocument(docFromCollection).getOwned(); + + invariant(!_peekCache.empty()); + invariant(!SimpleBSONObjComparator::kInstance.compare(docFromCollection, _peekCache.front())); + _peekCache.pop(); + invariant(_count > 0); invariant(_size >= std::size_t(value->objsize())); _count--; @@ -240,20 +252,34 @@ BSONObj OplogBufferCollection::_peek_inlock(OperationContext* txn, PeekMode peek boundInclusion = BoundInclusion::kIncludeEndKeyOnly; } - auto scanDirection = StorageInterface::ScanDirection::kForward; - const auto docs = - fassertStatusOK(40163, - _storageInterface->findDocuments( - txn, _nss, kIdIdxName, scanDirection, startKey, boundInclusion, 1U)); - invariant(1U == docs.size()); + bool isPeekCacheEnabled = _options.peekCacheSize > 0; + // Check read ahead cache and read additional documents into cache if necessary - only valid + // when size of read ahead cache is greater than zero in the options. + if (_peekCache.empty()) { + std::size_t limit = isPeekCacheEnabled ? _options.peekCacheSize : 1U; + const auto docs = fassertStatusOK( + 40163, + _storageInterface->findDocuments(txn, + _nss, + kIdIdxName, + StorageInterface::ScanDirection::kForward, + startKey, + boundInclusion, + limit)); + invariant(!docs.empty()); + for (const auto& doc : docs) { + _peekCache.push(doc); + } + } + auto&& doc = _peekCache.front(); switch (peekMode) { case PeekMode::kExtractEmbeddedDocument: - return extractEmbeddedOplogDocument(docs.front()).getOwned(); + return extractEmbeddedOplogDocument(doc).getOwned(); break; case PeekMode::kReturnUnmodifiedDocumentFromCollection: - invariant(docs.front().isOwned()); - return docs.front(); + invariant(doc.isOwned()); + return doc; break; } @@ -284,5 +310,10 @@ Timestamp OplogBufferCollection::getLastPoppedTimestamp_forTest() const { return _lastPoppedKey.isEmpty() ? Timestamp() : _lastPoppedKey[""].Obj()["ts"].timestamp(); } +std::queue<BSONObj> OplogBufferCollection::getPeekCache_forTest() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _peekCache; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h index 34dc2d66243..cb4cbcac8b8 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.h +++ b/src/mongo/db/repl/oplog_buffer_collection.h @@ -28,6 +28,7 @@ #pragma once +#include <queue> #include <tuple> #include "mongo/db/namespace_string.h" @@ -47,6 +48,15 @@ class StorageInterface; class OplogBufferCollection : public OplogBuffer { public: /** + * Structure used to configure an instance of OplogBufferCollection. + */ + struct Options { + // If equal to 0, the cache size will be set to 1. + std::size_t peekCacheSize = 0; + Options() {} + }; + + /** * Returns default namespace for temporary collection used to hold data in oplog buffer. */ static NamespaceString getDefaultNamespace(); @@ -84,14 +94,21 @@ public: static std::tuple<BSONObj, Timestamp, std::size_t> addIdToDocument( const BSONObj& orig, const Timestamp& lastTimestamp, std::size_t sentinelCount); - explicit OplogBufferCollection(StorageInterface* storageInterface); - OplogBufferCollection(StorageInterface* storageInterface, const NamespaceString& nss); + explicit OplogBufferCollection(StorageInterface* storageInterface, Options options = Options()); + OplogBufferCollection(StorageInterface* storageInterface, + const NamespaceString& nss, + Options options = Options()); /** * Returns the namespace string of the collection used by this oplog buffer. */ NamespaceString getNamespace() const; + /** + * Returns the options used to configure this OplogBufferCollection + */ + Options getOptions() const; + void startup(OperationContext* txn) override; void shutdown(OperationContext* txn) override; void pushEvenIfFull(OperationContext* txn, const Value& value) override; @@ -118,6 +135,7 @@ public: std::size_t getSentinelCount_forTest() const; Timestamp getLastPushedTimestamp_forTest() const; Timestamp getLastPoppedTimestamp_forTest() const; + std::queue<BSONObj> getPeekCache_forTest() const; private: /* @@ -148,6 +166,9 @@ private: // The namespace for the oplog buffer collection. const NamespaceString _nss; + // These are the options with which the oplog buffer was configured at construction time. + const Options _options; + // Allows functions to wait until the queue has data. This condition variable is used with // _mutex below. stdx::condition_variable _cvNoLongerEmpty; @@ -156,10 +177,10 @@ private: mutable stdx::mutex _mutex; // Number of documents in buffer. - std::size_t _count; + std::size_t _count = 0; // Size of documents in buffer. - std::size_t _size; + std::size_t _size = 0; // Number of sentinel values inserted so far with the same timestamp as '_lastPoppedKey'. std::size_t _sentinelCount = 0; @@ -167,6 +188,10 @@ private: Timestamp _lastPushedTimestamp; BSONObj _lastPoppedKey; + + // Used by _peek_inlock() to hold results of the read ahead query that will be used for pop/peek + // results. + std::queue<BSONObj> _peekCache; }; } // namespace repl diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp index 11d6030d472..e3f335603d9 100644 --- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp @@ -1016,5 +1016,100 @@ TEST_F(OplogBufferCollectionTest, TwoWaitForDataInvocationsBlockAndFindSameSenti ASSERT_EQUALS(count2, 1UL); } +OplogBufferCollection::Options _makeOptions(std::size_t peekCacheSize) { + OplogBufferCollection::Options options; + options.peekCacheSize = peekCacheSize; + return options; +} + +/** + * Converts expectedDocs to collection format (with _id field) and compare with peek cache contents. + */ +void _assertDocumentsEqualCache(const std::vector<BSONObj>& expectedDocs, + std::queue<BSONObj> actualDocsInCache) { + for (const auto& doc : expectedDocs) { + ASSERT_FALSE(actualDocsInCache.empty()); + ASSERT_BSONOBJ_EQ( + doc, OplogBufferCollection::extractEmbeddedOplogDocument(actualDocsInCache.front())); + actualDocsInCache.pop(); + } + ASSERT_TRUE(actualDocsInCache.empty()); +} + +TEST_F(OplogBufferCollectionTest, PeekFillsCacheWithDocumentsFromCollection) { + auto nss = makeNamespace(_agent); + std::size_t peekCacheSize = 3U; + OplogBufferCollection oplogBuffer(_storageInterface, nss, _makeOptions(3)); + ASSERT_EQUALS(peekCacheSize, oplogBuffer.getOptions().peekCacheSize); + oplogBuffer.startup(_txn.get()); + + std::vector<BSONObj> oplog; + for (int i = 0; i < 5; ++i) { + oplog.push_back(makeOplogEntry(i + 1)); + }; + oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.cbegin(), oplog.cend()); + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); + + // Before any peek operations, peek cache should be empty. + _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); + + // First peek operation should trigger a read of 'peekCacheSize' documents from the collection. + BSONObj doc; + ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[0], doc); + _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); + + // Repeated peek operation should not modify the cache. + ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[0], doc); + _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); + + // Pop operation should remove the first element in the cache + ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[0], doc); + _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); + + // Next peek operation should not modify the cache. + ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[1], doc); + _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); + + // Pop the rest of the items in the cache. + ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[1], doc); + _assertDocumentsEqualCache({oplog[2]}, oplogBuffer.getPeekCache_forTest()); + + ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[2], doc); + _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); + + // Next peek operation should replenish the cache. + // Cache size will be less than the configured 'peekCacheSize' because + // there will not be enough documents left unread in the collection. + ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[3], doc); + _assertDocumentsEqualCache({oplog[3], oplog[4]}, oplogBuffer.getPeekCache_forTest()); + + // Pop the remaining documents from the buffer. + ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[3], doc); + _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest()); + + // Verify state of cache between pops using peek. + ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[4], doc); + _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest()); + + ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[4], doc); + _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); + + // Nothing left in the collection. + ASSERT_FALSE(oplogBuffer.peek(_txn.get(), &doc)); + _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); + + ASSERT_FALSE(oplogBuffer.tryPop(_txn.get(), &doc)); + _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); +} } // namespace |