summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-10-20 10:34:15 -0400
committerBenety Goh <benety@mongodb.com>2016-10-23 17:25:10 -0400
commitab1ee41ecf1c96ae8b17a2b1da1c7ee9b8c58676 (patch)
tree539ec205f2a5065222f017dce331fe85305317ad
parentf50bc8e26d5720f396149286ebb31e8c3c51df4a (diff)
downloadmongo-ab1ee41ecf1c96ae8b17a2b1da1c7ee9b8c58676.tar.gz
SERVER-26191 OplogBufferCollection supports batch reads from collection
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.cpp57
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.h33
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection_test.cpp95
3 files changed, 168 insertions, 17 deletions
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp
index f8bad286d0f..1c74963cc14 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection.cpp
@@ -36,6 +36,7 @@
#include <numeric>
#include "mongo/base/string_data.h"
+#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/util/assert_util.h"
@@ -76,17 +77,22 @@ BSONObj OplogBufferCollection::extractEmbeddedOplogDocument(const BSONObj& orig)
}
-OplogBufferCollection::OplogBufferCollection(StorageInterface* storageInterface)
- : OplogBufferCollection(storageInterface, getDefaultNamespace()) {}
+OplogBufferCollection::OplogBufferCollection(StorageInterface* storageInterface, Options options)
+ : OplogBufferCollection(storageInterface, getDefaultNamespace(), std::move(options)) {}
OplogBufferCollection::OplogBufferCollection(StorageInterface* storageInterface,
- const NamespaceString& nss)
- : _storageInterface(storageInterface), _nss(nss), _count(0), _size(0) {}
+ const NamespaceString& nss,
+ Options options)
+ : _storageInterface(storageInterface), _nss(nss), _options(std::move(options)) {}
NamespaceString OplogBufferCollection::getNamespace() const {
return _nss;
}
+OplogBufferCollection::Options OplogBufferCollection::getOptions() const {
+ return _options;
+}
+
void OplogBufferCollection::startup(OperationContext* txn) {
clear(txn);
}
@@ -168,6 +174,7 @@ void OplogBufferCollection::clear(OperationContext* txn) {
_sentinelCount = 0;
_lastPushedTimestamp = {};
_lastPoppedKey = {};
+ _peekCache = std::queue<BSONObj>();
}
bool OplogBufferCollection::tryPop(OperationContext* txn, Value* value) {
@@ -220,6 +227,11 @@ bool OplogBufferCollection::_pop_inlock(OperationContext* txn, Value* value) {
_peek_inlock(txn, PeekMode::kReturnUnmodifiedDocumentFromCollection);
_lastPoppedKey = docFromCollection["_id"].wrap("");
*value = extractEmbeddedOplogDocument(docFromCollection).getOwned();
+
+ invariant(!_peekCache.empty());
+ invariant(!SimpleBSONObjComparator::kInstance.compare(docFromCollection, _peekCache.front()));
+ _peekCache.pop();
+
invariant(_count > 0);
invariant(_size >= std::size_t(value->objsize()));
_count--;
@@ -240,20 +252,34 @@ BSONObj OplogBufferCollection::_peek_inlock(OperationContext* txn, PeekMode peek
boundInclusion = BoundInclusion::kIncludeEndKeyOnly;
}
- auto scanDirection = StorageInterface::ScanDirection::kForward;
- const auto docs =
- fassertStatusOK(40163,
- _storageInterface->findDocuments(
- txn, _nss, kIdIdxName, scanDirection, startKey, boundInclusion, 1U));
- invariant(1U == docs.size());
+ bool isPeekCacheEnabled = _options.peekCacheSize > 0;
+ // Check read ahead cache and read additional documents into cache if necessary - only valid
+ // when size of read ahead cache is greater than zero in the options.
+ if (_peekCache.empty()) {
+ std::size_t limit = isPeekCacheEnabled ? _options.peekCacheSize : 1U;
+ const auto docs = fassertStatusOK(
+ 40163,
+ _storageInterface->findDocuments(txn,
+ _nss,
+ kIdIdxName,
+ StorageInterface::ScanDirection::kForward,
+ startKey,
+ boundInclusion,
+ limit));
+ invariant(!docs.empty());
+ for (const auto& doc : docs) {
+ _peekCache.push(doc);
+ }
+ }
+ auto&& doc = _peekCache.front();
switch (peekMode) {
case PeekMode::kExtractEmbeddedDocument:
- return extractEmbeddedOplogDocument(docs.front()).getOwned();
+ return extractEmbeddedOplogDocument(doc).getOwned();
break;
case PeekMode::kReturnUnmodifiedDocumentFromCollection:
- invariant(docs.front().isOwned());
- return docs.front();
+ invariant(doc.isOwned());
+ return doc;
break;
}
@@ -284,5 +310,10 @@ Timestamp OplogBufferCollection::getLastPoppedTimestamp_forTest() const {
return _lastPoppedKey.isEmpty() ? Timestamp() : _lastPoppedKey[""].Obj()["ts"].timestamp();
}
+std::queue<BSONObj> OplogBufferCollection::getPeekCache_forTest() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _peekCache;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h
index 34dc2d66243..cb4cbcac8b8 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.h
+++ b/src/mongo/db/repl/oplog_buffer_collection.h
@@ -28,6 +28,7 @@
#pragma once
+#include <queue>
#include <tuple>
#include "mongo/db/namespace_string.h"
@@ -47,6 +48,15 @@ class StorageInterface;
class OplogBufferCollection : public OplogBuffer {
public:
/**
+ * Structure used to configure an instance of OplogBufferCollection.
+ */
+ struct Options {
+ // If equal to 0, the cache size will be set to 1.
+ std::size_t peekCacheSize = 0;
+ Options() {}
+ };
+
+ /**
* Returns default namespace for temporary collection used to hold data in oplog buffer.
*/
static NamespaceString getDefaultNamespace();
@@ -84,14 +94,21 @@ public:
static std::tuple<BSONObj, Timestamp, std::size_t> addIdToDocument(
const BSONObj& orig, const Timestamp& lastTimestamp, std::size_t sentinelCount);
- explicit OplogBufferCollection(StorageInterface* storageInterface);
- OplogBufferCollection(StorageInterface* storageInterface, const NamespaceString& nss);
+ explicit OplogBufferCollection(StorageInterface* storageInterface, Options options = Options());
+ OplogBufferCollection(StorageInterface* storageInterface,
+ const NamespaceString& nss,
+ Options options = Options());
/**
* Returns the namespace string of the collection used by this oplog buffer.
*/
NamespaceString getNamespace() const;
+ /**
+ * Returns the options used to configure this OplogBufferCollection
+ */
+ Options getOptions() const;
+
void startup(OperationContext* txn) override;
void shutdown(OperationContext* txn) override;
void pushEvenIfFull(OperationContext* txn, const Value& value) override;
@@ -118,6 +135,7 @@ public:
std::size_t getSentinelCount_forTest() const;
Timestamp getLastPushedTimestamp_forTest() const;
Timestamp getLastPoppedTimestamp_forTest() const;
+ std::queue<BSONObj> getPeekCache_forTest() const;
private:
/*
@@ -148,6 +166,9 @@ private:
// The namespace for the oplog buffer collection.
const NamespaceString _nss;
+ // These are the options with which the oplog buffer was configured at construction time.
+ const Options _options;
+
// Allows functions to wait until the queue has data. This condition variable is used with
// _mutex below.
stdx::condition_variable _cvNoLongerEmpty;
@@ -156,10 +177,10 @@ private:
mutable stdx::mutex _mutex;
// Number of documents in buffer.
- std::size_t _count;
+ std::size_t _count = 0;
// Size of documents in buffer.
- std::size_t _size;
+ std::size_t _size = 0;
// Number of sentinel values inserted so far with the same timestamp as '_lastPoppedKey'.
std::size_t _sentinelCount = 0;
@@ -167,6 +188,10 @@ private:
Timestamp _lastPushedTimestamp;
BSONObj _lastPoppedKey;
+
+ // Used by _peek_inlock() to hold results of the read ahead query that will be used for pop/peek
+ // results.
+ std::queue<BSONObj> _peekCache;
};
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
index 11d6030d472..e3f335603d9 100644
--- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
@@ -1016,5 +1016,100 @@ TEST_F(OplogBufferCollectionTest, TwoWaitForDataInvocationsBlockAndFindSameSenti
ASSERT_EQUALS(count2, 1UL);
}
+OplogBufferCollection::Options _makeOptions(std::size_t peekCacheSize) {
+ OplogBufferCollection::Options options;
+ options.peekCacheSize = peekCacheSize;
+ return options;
+}
+
+/**
+ * Converts expectedDocs to collection format (with _id field) and compare with peek cache contents.
+ */
+void _assertDocumentsEqualCache(const std::vector<BSONObj>& expectedDocs,
+ std::queue<BSONObj> actualDocsInCache) {
+ for (const auto& doc : expectedDocs) {
+ ASSERT_FALSE(actualDocsInCache.empty());
+ ASSERT_BSONOBJ_EQ(
+ doc, OplogBufferCollection::extractEmbeddedOplogDocument(actualDocsInCache.front()));
+ actualDocsInCache.pop();
+ }
+ ASSERT_TRUE(actualDocsInCache.empty());
+}
+
+TEST_F(OplogBufferCollectionTest, PeekFillsCacheWithDocumentsFromCollection) {
+ auto nss = makeNamespace(_agent);
+ std::size_t peekCacheSize = 3U;
+ OplogBufferCollection oplogBuffer(_storageInterface, nss, _makeOptions(3));
+ ASSERT_EQUALS(peekCacheSize, oplogBuffer.getOptions().peekCacheSize);
+ oplogBuffer.startup(_txn.get());
+
+ std::vector<BSONObj> oplog;
+ for (int i = 0; i < 5; ++i) {
+ oplog.push_back(makeOplogEntry(i + 1));
+ };
+ oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.cbegin(), oplog.cend());
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
+
+ // Before any peek operations, peek cache should be empty.
+ _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
+
+ // First peek operation should trigger a read of 'peekCacheSize' documents from the collection.
+ BSONObj doc;
+ ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[0], doc);
+ _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
+
+ // Repeated peek operation should not modify the cache.
+ ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[0], doc);
+ _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
+
+ // Pop operation should remove the first element in the cache
+ ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[0], doc);
+ _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
+
+ // Next peek operation should not modify the cache.
+ ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[1], doc);
+ _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
+
+ // Pop the rest of the items in the cache.
+ ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[1], doc);
+ _assertDocumentsEqualCache({oplog[2]}, oplogBuffer.getPeekCache_forTest());
+
+ ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[2], doc);
+ _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
+
+ // Next peek operation should replenish the cache.
+ // Cache size will be less than the configured 'peekCacheSize' because
+ // there will not be enough documents left unread in the collection.
+ ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[3], doc);
+ _assertDocumentsEqualCache({oplog[3], oplog[4]}, oplogBuffer.getPeekCache_forTest());
+
+ // Pop the remaining documents from the buffer.
+ ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[3], doc);
+ _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest());
+
+ // Verify state of cache between pops using peek.
+ ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[4], doc);
+ _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest());
+
+ ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[4], doc);
+ _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
+
+ // Nothing left in the collection.
+ ASSERT_FALSE(oplogBuffer.peek(_txn.get(), &doc));
+ _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
+
+ ASSERT_FALSE(oplogBuffer.tryPop(_txn.get(), &doc));
+ _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
+}
} // namespace