summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/oplog_batcher_test_fixture.cpp6
-rw-r--r--src/mongo/db/repl/oplog_batcher_test_fixture.h4
-rw-r--r--src/mongo/db/repl/oplog_buffer.h7
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.cpp112
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.h50
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection_test.cpp892
6 files changed, 334 insertions, 737 deletions
diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
index fcc08cec359..b8b9a8ea88f 100644
--- a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
+++ b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
@@ -147,7 +147,9 @@ StatusWith<OplogBufferMock::Value> OplogBufferMock::findByTimestamp(OperationCon
str::stream() << "No such timestamp in collection: " << ts.toString()};
}
-Status OplogBufferMock::seekToTimestamp(OperationContext* opCtx, const Timestamp& ts, bool exact) {
+Status OplogBufferMock::seekToTimestamp(OperationContext* opCtx,
+ const Timestamp& ts,
+ SeekStrategy exact) {
stdx::unique_lock<Latch> lk(_mutex);
for (std::size_t i = 0; i < _data.size(); i++) {
if (_data[i]["ts"].timestamp() == ts) {
@@ -158,7 +160,7 @@ Status OplogBufferMock::seekToTimestamp(OperationContext* opCtx, const Timestamp
break;
}
}
- if (!exact)
+ if (exact != SeekStrategy::kExact)
return Status::OK();
return {ErrorCodes::KeyNotFound, str::stream() << "Timestamp not found: " << ts.toString()};
}
diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.h b/src/mongo/db/repl/oplog_batcher_test_fixture.h
index ec39e84275b..735e0c56195 100644
--- a/src/mongo/db/repl/oplog_batcher_test_fixture.h
+++ b/src/mongo/db/repl/oplog_batcher_test_fixture.h
@@ -62,7 +62,9 @@ public:
bool peek(OperationContext* opCtx, Value* value) final;
boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const final;
StatusWith<Value> findByTimestamp(OperationContext* opCtx, const Timestamp& ts) final;
- Status seekToTimestamp(OperationContext* opCtx, const Timestamp& ts, bool exact = true) final;
+ Status seekToTimestamp(OperationContext* opCtx,
+ const Timestamp& ts,
+ SeekStrategy exact = SeekStrategy::kExact) final;
private:
mutable Mutex _mutex = MONGO_MAKE_LATCH("OplogBufferMock::_mutex");
diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h
index b8dc879286f..96faaf7f37f 100644
--- a/src/mongo/db/repl/oplog_buffer.h
+++ b/src/mongo/db/repl/oplog_buffer.h
@@ -215,6 +215,11 @@ public:
*/
class RandomAccessOplogBuffer : public OplogBuffer {
public:
+ enum SeekStrategy {
+ kInexact = 0,
+ kExact = 1,
+ };
+
/**
* Retrieves an oplog entry by timestamp. Returns ErrorCodes::NoSuchKey if no such entry is
* found. Does not change current position of oplog buffer.
@@ -228,7 +233,7 @@ public:
*/
virtual Status seekToTimestamp(OperationContext* opCtx,
const Timestamp& ts,
- bool exact = true) = 0;
+ SeekStrategy exact = SeekStrategy::kExact) = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp
index b01659be904..e6b9a97f643 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection.cpp
@@ -40,6 +40,7 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/util/assert_util.h"
@@ -52,7 +53,6 @@ const StringData kDefaultOplogCollectionNamespace = "local.temp_oplog_buffer"_sd
const StringData kOplogEntryFieldName = "entry"_sd;
const StringData kIdFieldName = "_id"_sd;
const StringData kTimestampFieldName = "ts"_sd;
-const StringData kSentinelFieldName = "s"_sd;
const StringData kIdIdxName = "_id_"_sd;
} // namespace
@@ -61,21 +61,12 @@ NamespaceString OplogBufferCollection::getDefaultNamespace() {
return NamespaceString(kDefaultOplogCollectionNamespace);
}
-std::tuple<BSONObj, Timestamp, std::size_t> OplogBufferCollection::addIdToDocument(
- const BSONObj& orig, const Timestamp& lastTimestamp, std::size_t sentinelCount) {
- if (orig.isEmpty()) {
- return std::make_tuple(
- BSON(kIdFieldName << BSON(kTimestampFieldName
- << lastTimestamp << kSentinelFieldName
- << static_cast<long long>(sentinelCount + 1))),
- lastTimestamp,
- sentinelCount + 1);
- }
+std::tuple<BSONObj, Timestamp> OplogBufferCollection::addIdToDocument(const BSONObj& orig) {
+ invariant(!orig.isEmpty());
const auto ts = orig[kTimestampFieldName].timestamp();
invariant(!ts.isNull());
- auto doc = BSON(kIdFieldName << BSON(kTimestampFieldName << ts << kSentinelFieldName << 0)
- << kOplogEntryFieldName << orig);
- return std::make_tuple(doc, ts, 0);
+ auto doc = BSON(_keyForTimestamp(ts).firstElement() << kOplogEntryFieldName << orig);
+ return std::make_tuple(doc, ts);
}
BSONObj OplogBufferCollection::extractEmbeddedOplogDocument(const BSONObj& orig) {
@@ -111,6 +102,7 @@ void OplogBufferCollection::startup(OperationContext* opCtx) {
auto sizeResult = _storageInterface->getCollectionSize(opCtx, _nss);
fassert(40403, sizeResult);
_size = sizeResult.getValue();
+ _sizeIsValid = true;
auto countResult = _storageInterface->getCollectionCount(opCtx, _nss);
fassert(40404, countResult);
@@ -124,7 +116,6 @@ void OplogBufferCollection::startup(OperationContext* opCtx) {
_peekCache = std::queue<BSONObj>();
if (_count == 0) {
- _sentinelCount = 0;
_lastPushedTimestamp = {};
return;
}
@@ -135,13 +126,8 @@ void OplogBufferCollection::startup(OperationContext* opCtx) {
fassert(
40405,
bsonExtractTimestampField(lastPushedId, kTimestampFieldName, &_lastPushedTimestamp));
- long long countAtTimestamp = 0;
- fassert(40406,
- bsonExtractIntegerField(lastPushedId, kSentinelFieldName, &countAtTimestamp));
- _sentinelCount = countAtTimestamp--;
} else {
_lastPushedTimestamp = {};
- _sentinelCount = 0;
}
}
@@ -151,7 +137,6 @@ void OplogBufferCollection::shutdown(OperationContext* opCtx) {
_dropCollection(opCtx);
_size = 0;
_count = 0;
- _sentinelCount = 0;
_lastPushedTimestamp = {};
_lastPoppedKey = {};
_peekCache = std::queue<BSONObj>();
@@ -168,12 +153,12 @@ void OplogBufferCollection::push(OperationContext* opCtx,
std::vector<InsertStatement> docsToInsert(numDocs);
stdx::lock_guard<Latch> lk(_mutex);
auto ts = _lastPushedTimestamp;
- auto sentinelCount = _sentinelCount;
- std::transform(begin, end, docsToInsert.begin(), [&sentinelCount, &ts](const Value& value) {
+ std::transform(begin, end, docsToInsert.begin(), [&ts](const Value& value) {
BSONObj doc;
auto previousTimestamp = ts;
- std::tie(doc, ts, sentinelCount) = addIdToDocument(value, ts, sentinelCount);
- invariant(value.isEmpty() ? ts == previousTimestamp : ts > previousTimestamp);
+ std::tie(doc, ts) = addIdToDocument(value);
+ invariant(!value.isEmpty());
+ invariant(ts > previousTimestamp);
return InsertStatement(doc);
});
@@ -181,11 +166,12 @@ void OplogBufferCollection::push(OperationContext* opCtx,
fassert(40161, status);
_lastPushedTimestamp = ts;
- _sentinelCount = sentinelCount;
_count += numDocs;
- _size += std::accumulate(begin, end, 0U, [](const size_t& docSize, const Value& value) {
- return docSize + size_t(value.objsize());
- });
+ if (_sizeIsValid) {
+ _size += std::accumulate(begin, end, 0U, [](const size_t& docSize, const Value& value) {
+ return docSize + size_t(value.objsize());
+ });
+ }
_cvNoLongerEmpty.notify_all();
}
@@ -202,6 +188,7 @@ std::size_t OplogBufferCollection::getMaxSize() const {
std::size_t OplogBufferCollection::getSize() const {
stdx::lock_guard<Latch> lk(_mutex);
+ uassert(4940100, "getSize() called on OplogBufferCollection after seek", _sizeIsValid);
return _size;
}
@@ -215,8 +202,8 @@ void OplogBufferCollection::clear(OperationContext* opCtx) {
_dropCollection(opCtx);
_createCollection(opCtx);
_size = 0;
+ _sizeIsValid = true;
_count = 0;
- _sentinelCount = 0;
_lastPushedTimestamp = {};
_lastPoppedKey = {};
_peekCache = std::queue<BSONObj>();
@@ -260,6 +247,60 @@ boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed(
return boost::none;
}
+/* static */
+BSONObj OplogBufferCollection::_keyForTimestamp(const Timestamp& ts) {
+ return BSON(kIdFieldName << BSON(kTimestampFieldName << ts));
+}
+
+StatusWith<BSONObj> OplogBufferCollection::_getDocumentWithTimestamp(OperationContext* opCtx,
+ const Timestamp& ts) {
+ return _storageInterface->findById(opCtx, _nss, _keyForTimestamp(ts).firstElement());
+}
+
+StatusWith<OplogBuffer::Value> OplogBufferCollection::findByTimestamp(OperationContext* opCtx,
+ const Timestamp& ts) {
+ auto docWithStatus = _getDocumentWithTimestamp(opCtx, ts);
+ if (!docWithStatus.isOK()) {
+ return docWithStatus.getStatus();
+ }
+ return extractEmbeddedOplogDocument(docWithStatus.getValue()).getOwned();
+}
+
+Status OplogBufferCollection::seekToTimestamp(OperationContext* opCtx,
+ const Timestamp& ts,
+ SeekStrategy exact) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ BSONObj docWithTimestamp;
+ auto docWithStatus = _getDocumentWithTimestamp(opCtx, ts);
+ if (docWithStatus.isOK()) {
+ docWithTimestamp = std::move(docWithStatus.getValue());
+ } else if (exact == SeekStrategy::kExact) {
+ return docWithStatus.getStatus();
+ }
+ _peekCache = std::queue<BSONObj>();
+ auto key = _keyForTimestamp(ts);
+ if (docWithTimestamp.isEmpty()) {
+ // The document with the requested timestamp was not found. Set _lastPoppedKey to
+ // the key for that document, so next time we pop we will read the next document after
+ // the requested timestamp.
+ _lastPoppedKey = key;
+ } else {
+ // The document with the requested timestamp was found. _lastPoppedKey will be set to that
+ // document's timestamp once the document is popped from the peek cache in _pop_inlock().
+ _lastPoppedKey = {};
+ _peekCache.push(docWithTimestamp);
+ }
+ // Unfortunately StorageInterface and InternalPlanner don't support count-by-index, so we're
+ // stuck with DBDirectClient.
+ DBDirectClient client(opCtx);
+ auto query = BSON(kIdFieldName << BSON("$gte" << key[kIdFieldName]));
+ _count = client.count(_nss, query);
+
+ // We have no way of accurately determining the size remaining after the seek
+ _sizeIsValid = false;
+ return Status::OK();
+}
+
boost::optional<OplogBuffer::Value> OplogBufferCollection::_lastDocumentPushed_inlock(
OperationContext* opCtx) const {
if (_count == 0) {
@@ -289,9 +330,11 @@ bool OplogBufferCollection::_pop_inlock(OperationContext* opCtx, Value* value) {
_peekCache.pop();
invariant(_count > 0);
- invariant(_size >= std::size_t(value->objsize()));
+ if (_sizeIsValid) {
+ invariant(_size >= std::size_t(value->objsize()));
+ _size -= value->objsize();
+ }
_count--;
- _size -= value->objsize();
return true;
}
@@ -354,11 +397,6 @@ void OplogBufferCollection::_dropCollection(OperationContext* opCtx) {
fassert(40155, _storageInterface->dropCollection(opCtx, _nss));
}
-std::size_t OplogBufferCollection::getSentinelCount_forTest() const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _sentinelCount;
-}
-
Timestamp OplogBufferCollection::getLastPushedTimestamp_forTest() const {
stdx::lock_guard<Latch> lk(_mutex);
return _lastPushedTimestamp;
diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h
index 40356c834be..30cee7fc9cf 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.h
+++ b/src/mongo/db/repl/oplog_buffer_collection.h
@@ -46,7 +46,7 @@ class StorageInterface;
* Oplog buffer backed by a temporary collection. This collection is created in startup() and
* removed in shutdown(). The documents will be popped and peeked in timestamp order.
*/
-class OplogBufferCollection : public OplogBuffer {
+class OplogBufferCollection : public RandomAccessOplogBuffer {
public:
/**
* Structure used to configure an instance of OplogBufferCollection.
@@ -72,30 +72,16 @@ public:
/**
* Creates and returns a document suitable for storing in the collection together with the
- * associated timestamp and sentinel count that determines the position of this document in the
- * _id index.
+ * associated timestamp that determines the position of this document in the _id index.
*
- * If 'orig' is a valid oplog entry, the '_id' field of the returned BSONObj will be:
+ * The '_id' field of the returned BSONObj will be:
* {
* ts: 'ts' field of the provided document,
- * s: 0
* }
- * The timestamp returned will be equal to as the 'ts' field in the BSONObj.
- * Assumes there is a 'ts' field in the original document.
*
- * If 'orig' is an empty document (ie. we're inserting a sentinel value), the '_id' field will
- * be generated based on the timestamp of the last document processed and the total number of
- * sentinels with the same timestamp (including the document about to be inserted. For example,
- * the first sentinel to be inserted after a valid oplog entry will have the following '_id'
- * field:
- * {
- * ts: 'ts' field of the last inserted valid oplog entry,
- * s: 1
- * }
- * The sentinel counter will be reset to 0 on inserting the next valid oplog entry.
+ * The oplog entry itself will be stored in the 'entry' field of the returned BSONObj.
*/
- static std::tuple<BSONObj, Timestamp, std::size_t> addIdToDocument(
- const BSONObj& orig, const Timestamp& lastTimestamp, std::size_t sentinelCount);
+ static std::tuple<BSONObj, Timestamp> addIdToDocument(const BSONObj& orig);
explicit OplogBufferCollection(StorageInterface* storageInterface, Options options = Options());
OplogBufferCollection(StorageInterface* storageInterface,
@@ -128,8 +114,15 @@ public:
bool peek(OperationContext* opCtx, Value* value) override;
boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override;
+ // ---- Random access API ----
+ StatusWith<Value> findByTimestamp(OperationContext* opCtx, const Timestamp& ts) final;
+ // Note: once you use seekToTimestamp, calling getSize() is no longer legal.
+ Status seekToTimestamp(OperationContext* opCtx,
+ const Timestamp& ts,
+ SeekStrategy exact = SeekStrategy::kExact) final;
+
+
// ---- Testing API ----
- std::size_t getSentinelCount_forTest() const;
Timestamp getLastPushedTimestamp_forTest() const;
Timestamp getLastPoppedTimestamp_forTest() const;
std::queue<BSONObj> getPeekCache_forTest() const;
@@ -166,6 +159,16 @@ private:
*/
boost::optional<Value> _lastDocumentPushed_inlock(OperationContext* opCtx) const;
+ /**
+ * Returns the document with the given timestamp, or ErrorCodes::NoSuchKey if not found.
+ */
+ StatusWith<BSONObj> _getDocumentWithTimestamp(OperationContext* opCtx, const Timestamp& ts);
+
+ /**
+ * Returns the key for the document with the given timestamp.
+ */
+ static BSONObj _keyForTimestamp(const Timestamp& ts);
+
// The namespace for the oplog buffer collection.
const NamespaceString _nss;
@@ -185,9 +188,6 @@ private:
// Size of documents in buffer.
std::size_t _size = 0;
- // Number of sentinel values inserted so far with the same timestamp as '_lastPoppedKey'.
- std::size_t _sentinelCount = 0;
-
Timestamp _lastPushedTimestamp;
BSONObj _lastPoppedKey;
@@ -195,6 +195,10 @@ private:
// Used by _peek_inlock() to hold results of the read ahead query that will be used for pop/peek
// results.
std::queue<BSONObj> _peekCache;
+
+ // Whether or not the size() method can be called. This is set to false on seek, because
+ // we do not know how much we skipped when seeking.
+ bool _sizeIsValid = true;
};
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
index 81921cee85a..c609ba52849 100644
--- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
@@ -60,7 +60,6 @@ protected:
protected:
ServiceContext::UniqueOperationContext makeOperationContext() const;
- void pushSentinel(OplogBuffer& oplogBuffer);
StorageInterface* _storageInterface = nullptr;
ServiceContext::UniqueOperationContext _opCtx;
@@ -70,11 +69,6 @@ private:
void tearDown() override;
};
-void OplogBufferCollectionTest::pushSentinel(OplogBuffer& oplogBuffer) {
- const std::vector<BSONObj> oplog = {BSONObj()};
- oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
-}
-
void OplogBufferCollectionTest::setUp() {
ServiceContextMongoDTest::setUp();
auto service = getServiceContext();
@@ -182,64 +176,30 @@ TEST_F(OplogBufferCollectionTest, extractEmbeddedOplogDocumentChangesIdToTimesta
}
void _assertOplogDocAndTimestampEquals(
- const BSONObj& oplog,
- const std::tuple<BSONObj, Timestamp, std::size_t>& actualDocTimestampSentinelTuple) {
+ const BSONObj& oplog, const std::tuple<BSONObj, Timestamp>& actualDocTimestampTuple) {
auto expectedTimestamp = oplog["ts"].timestamp();
- auto expectedDoc =
- BSON("_id" << BSON("ts" << expectedTimestamp << "s" << 0) << "entry" << oplog);
- ASSERT_BSONOBJ_EQ(expectedDoc, std::get<0>(actualDocTimestampSentinelTuple));
- ASSERT_EQUALS(expectedTimestamp, std::get<1>(actualDocTimestampSentinelTuple));
- ASSERT_EQUALS(0U, std::get<2>(actualDocTimestampSentinelTuple));
+ auto expectedDoc = BSON("_id" << BSON("ts" << expectedTimestamp) << "entry" << oplog);
+ ASSERT_BSONOBJ_EQ(expectedDoc, std::get<0>(actualDocTimestampTuple));
+ ASSERT_EQUALS(expectedTimestamp, std::get<1>(actualDocTimestampTuple));
}
-void _assertSentinelDocAndTimestampEquals(
- const Timestamp& expectedTimestamp,
- std::size_t expectedSentinelCount,
- const std::tuple<BSONObj, Timestamp, std::size_t>& actualDocTimestampSentinelTuple) {
- auto expectedDoc =
- BSON("_id" << BSON("ts" << expectedTimestamp << "s" << int(expectedSentinelCount)));
- ASSERT_BSONOBJ_EQ(expectedDoc, std::get<0>(actualDocTimestampSentinelTuple));
- ASSERT_EQUALS(expectedTimestamp, std::get<1>(actualDocTimestampSentinelTuple));
- ASSERT_EQUALS(expectedSentinelCount, std::get<2>(actualDocTimestampSentinelTuple));
+void _assertDocAndTimestampEquals(const Timestamp& expectedTimestamp,
+ const std::tuple<BSONObj, Timestamp>& actualDocTimestampTuple) {
+ auto expectedDoc = BSON("_id" << BSON("ts" << expectedTimestamp));
+ ASSERT_BSONOBJ_EQ(expectedDoc, std::get<0>(actualDocTimestampTuple));
+ ASSERT_EQUALS(expectedTimestamp, std::get<1>(actualDocTimestampTuple));
}
TEST_F(OplogBufferCollectionTest, addIdToDocumentChangesTimestampToId) {
const BSONObj originalOp = makeOplogEntry(1);
_assertOplogDocAndTimestampEquals(originalOp,
- OplogBufferCollection::addIdToDocument(originalOp, {}, 0));
-}
-
-TEST_F(OplogBufferCollectionTest, addIdToDocumentGeneratesIdForSentinelFromLastPushedTimestamp) {
- const BSONObj oplog1 = makeOplogEntry(1);
- _assertOplogDocAndTimestampEquals(oplog1,
- OplogBufferCollection::addIdToDocument(oplog1, {}, 0U));
- auto ts1 = oplog1["ts"].timestamp();
-
- _assertSentinelDocAndTimestampEquals(
- ts1, 1U, OplogBufferCollection::addIdToDocument({}, ts1, 0U));
- _assertSentinelDocAndTimestampEquals(
- ts1, 2U, OplogBufferCollection::addIdToDocument({}, ts1, 1U));
- _assertSentinelDocAndTimestampEquals(
- ts1, 3U, OplogBufferCollection::addIdToDocument({}, ts1, 2U));
-
- // Processing valid oplog entry resets the sentinel count.
- const BSONObj oplog2 = makeOplogEntry(2);
- _assertOplogDocAndTimestampEquals(oplog1,
- OplogBufferCollection::addIdToDocument(oplog1, ts1, 3U));
- auto ts2 = oplog2["ts"].timestamp();
-
- _assertSentinelDocAndTimestampEquals(
- ts2, 1U, OplogBufferCollection::addIdToDocument({}, ts2, 0U));
- _assertSentinelDocAndTimestampEquals(
- ts2, 2U, OplogBufferCollection::addIdToDocument({}, ts2, 1U));
- _assertSentinelDocAndTimestampEquals(
- ts2, 3U, OplogBufferCollection::addIdToDocument({}, ts2, 2U));
+ OplogBufferCollection::addIdToDocument(originalOp));
}
DEATH_TEST_REGEX_F(OplogBufferCollectionTest,
addIdToDocumentWithMissingTimestampFieldTriggersInvariantFailure,
R"#(Invariant failure.*!ts.isNull\(\))#") {
- OplogBufferCollection::addIdToDocument(BSON("x" << 1), {}, 0);
+ OplogBufferCollection::addIdToDocument(BSON("x" << 1));
}
/**
@@ -250,12 +210,10 @@ void _assertDocumentsInCollectionEquals(OperationContext* opCtx,
const std::vector<BSONObj>& docs) {
std::vector<BSONObj> transformedDocs;
Timestamp ts;
- std::size_t sentinelCount = 0;
for (const auto& doc : docs) {
auto previousTimestamp = ts;
BSONObj newDoc;
- std::tie(newDoc, ts, sentinelCount) =
- OplogBufferCollection::addIdToDocument(doc, ts, sentinelCount);
+ std::tie(newDoc, ts) = OplogBufferCollection::addIdToDocument(doc);
transformedDocs.push_back(newDoc);
if (doc.isEmpty()) {
ASSERT_EQUALS(previousTimestamp, ts);
@@ -277,7 +235,7 @@ TEST_F(OplogBufferCollectionTest, StartupWithExistingCollectionInitializesCorrec
ASSERT_OK(_storageInterface->insertDocument(
_opCtx.get(),
nss,
- TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0)),
+ TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0])),
Timestamp(0)},
OpTime::kUninitializedTerm));
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
@@ -288,7 +246,6 @@ TEST_F(OplogBufferCollectionTest, StartupWithExistingCollectionInitializesCorrec
oplogBuffer.startup(_opCtx.get());
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
ASSERT_NOT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest());
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
@@ -319,7 +276,6 @@ TEST_F(OplogBufferCollectionTest, StartupWithEmptyExistingCollectionInitializesC
oplogBuffer.startup(_opCtx.get());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest());
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, {});
@@ -398,23 +354,6 @@ DEATH_TEST_REGEX_F(OplogBufferCollectionTest,
oplogBuffer.startup(_opCtx.get());
}
-DEATH_TEST_REGEX_F(OplogBufferCollectionTest,
- StartupWithExistingCollectionFailsWhenEntryHasNoSentinelCount,
- R"#(Fatal assertion.*40406.*NoSuchKey: Missing expected field \\"s\\")#") {
- auto nss = makeNamespace(_agent);
- ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions()));
- ASSERT_OK(_storageInterface->insertDocument(
- _opCtx.get(),
- nss,
- TimestampedBSONObj{BSON("_id" << BSON("ts" << Timestamp(1, 1))), Timestamp(1)},
- OpTime::kUninitializedTerm));
-
- OplogBufferCollection::Options opts;
- opts.dropCollectionAtStartup = false;
- OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
- oplogBuffer.startup(_opCtx.get());
-}
-
TEST_F(OplogBufferCollectionTest, PeekWithExistingCollectionReturnsEmptyObjectWhenEntryHasNoEntry) {
auto nss = makeNamespace(_agent);
ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions()));
@@ -442,7 +381,7 @@ TEST_F(OplogBufferCollectionTest,
ASSERT_OK(_storageInterface->insertDocument(
_opCtx.get(),
nss,
- TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0)),
+ TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0])),
Timestamp(0)},
OpTime::kUninitializedTerm));
@@ -451,7 +390,6 @@ TEST_F(OplogBufferCollectionTest,
oplogBuffer.startup(_opCtx.get());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest());
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, {});
@@ -518,7 +456,7 @@ TEST_F(OplogBufferCollectionTest, PeekingFromExistingCollectionReturnsDocument)
ASSERT_OK(_storageInterface->insertDocument(
_opCtx.get(),
nss,
- TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(entry1, {}, 0)),
+ TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(entry1)),
Timestamp(0)},
OpTime::kUninitializedTerm));
@@ -667,14 +605,14 @@ TEST_F(OplogBufferCollectionTest,
ASSERT_OK(_storageInterface->insertDocument(
_opCtx.get(),
nss,
- TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(firstDoc, {}, 0)),
+ TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(firstDoc)),
Timestamp(0)},
OpTime::kUninitializedTerm));
auto secondDoc = makeOplogEntry(2);
ASSERT_OK(_storageInterface->insertDocument(
_opCtx.get(),
nss,
- TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(secondDoc, {}, 0)),
+ TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(secondDoc)),
Timestamp(0)},
OpTime::kUninitializedTerm));
@@ -726,7 +664,6 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) {
oplogBuffer.startup(_opCtx.get());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
@@ -734,48 +671,34 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) {
oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog[0].objsize()));
- ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog});
- pushSentinel(oplogBuffer);
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
- ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog[0].objsize() + BSONObj().objsize()));
- ASSERT_EQUALS(1U, oplogBuffer.getSentinelCount_forTest());
- ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
- ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
-
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], BSONObj()});
-
const std::vector<BSONObj> oplog2 = {makeOplogEntry(2)};
oplogBuffer.push(_opCtx.get(), oplog2.cbegin(), oplog2.cend());
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
- ASSERT_EQUALS(oplogBuffer.getSize(),
- std::size_t(oplog[0].objsize() + BSONObj().objsize() + oplog2[0].objsize()));
- ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest());
+ ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
+ ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog[0].objsize() + oplog2[0].objsize()));
ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], BSONObj(), oplog2[0]});
+ _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], oplog2[0]});
BSONObj poppedDoc;
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &poppedDoc));
ASSERT_BSONOBJ_EQ(oplog[0], poppedDoc);
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
- ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(BSONObj().objsize() + oplog2[0].objsize()));
- ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest());
+ ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
+ ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog2[0].objsize()));
ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], BSONObj(), oplog2[0]});
+ _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], oplog2[0]});
oplogBuffer.clear(_opCtx.get());
ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
@@ -794,9 +717,8 @@ TEST_F(OplogBufferCollectionTest, WaitForDataReturnsImmediatelyWhenStartedWithEx
ASSERT_OK(_storageInterface->insertDocument(
_opCtx.get(),
nss,
- TimestampedBSONObj{
- std::get<0>(OplogBufferCollection::addIdToDocument(makeOplogEntry(1), {}, 0)),
- Timestamp(0)},
+ TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(makeOplogEntry(1))),
+ Timestamp(0)},
OpTime::kUninitializedTerm));
OplogBufferCollection::Options opts;
@@ -902,45 +824,9 @@ TEST_F(OplogBufferCollectionTest, WaitForDataBlocksAndTimesOutWhenItDoesNotFindD
ASSERT_EQUALS(count, 0UL);
}
-void _testPushSentinelsProperly(OperationContext* opCtx,
- const NamespaceString& nss,
- StorageInterface* storageInterface,
- std::function<void(OperationContext* opCtx,
- OplogBufferCollection* oplogBuffer,
- const std::vector<BSONObj>& oplog)> pushDocsFn) {
- OplogBufferCollection oplogBuffer(storageInterface, nss);
- oplogBuffer.startup(opCtx);
- const std::vector<BSONObj> oplog = {
- BSONObj(),
- makeOplogEntry(1),
- BSONObj(),
- BSONObj(),
- makeOplogEntry(2),
- BSONObj(),
- };
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- pushDocsFn(opCtx, &oplogBuffer, oplog);
- ASSERT_EQUALS(oplogBuffer.getCount(), 6UL);
- _assertDocumentsInCollectionEquals(opCtx, nss, oplog);
-}
-
-TEST_F(OplogBufferCollectionTest, PushAllNonBlockingPushesOnSentinelsProperly) {
- auto nss = makeNamespace(_agent);
- _testPushSentinelsProperly(_opCtx.get(),
- nss,
- _storageInterface,
- [](OperationContext* opCtx,
- OplogBufferCollection* oplogBuffer,
- const std::vector<BSONObj>& oplog) {
- oplogBuffer->push(opCtx, oplog.cbegin(), oplog.cend());
- ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest());
- });
-}
-
-DEATH_TEST_REGEX_F(
- OplogBufferCollectionTest,
- PushAllNonBlockingWithOutOfOrderDocumentsTriggersInvariantFailure,
- R"#(Invariant failure.*value.isEmpty\(\) \? ts == previousTimestamp : ts > previousTimestamp)#") {
+DEATH_TEST_REGEX_F(OplogBufferCollectionTest,
+ PushAllNonBlockingWithOutOfOrderDocumentsTriggersInvariantFailure,
+ R"#(Invariant failure.*ts > previousTimestamp)#") {
auto nss = makeNamespace(_agent);
OplogBufferCollection oplogBuffer(_storageInterface, nss);
@@ -953,634 +839,294 @@ DEATH_TEST_REGEX_F(
oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end());
}
-TEST_F(OplogBufferCollectionTest, SentinelInMiddleIsReturnedInOrder) {
- auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(_storageInterface, nss);
-
- oplogBuffer.startup(_opCtx.get());
- const std::vector<BSONObj> oplog = {
- makeOplogEntry(1),
- makeOplogEntry(2),
- BSONObj(),
- makeOplogEntry(3),
- };
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
-
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
-
- BSONObj doc;
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[0]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[0]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[1]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[1]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
-
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
-
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[3]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[3]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
-
- // tryPop does not remove documents from collection.
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
+OplogBufferCollection::Options _makeOptions(std::size_t peekCacheSize) {
+ OplogBufferCollection::Options options;
+ options.peekCacheSize = peekCacheSize;
+ return options;
}
-TEST_F(OplogBufferCollectionTest, SentinelAtBeginningIsReturnedAtBeginning) {
- auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(_storageInterface, nss);
-
- oplogBuffer.startup(_opCtx.get());
- const std::vector<BSONObj> oplog = {BSONObj(), makeOplogEntry(1)};
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
-
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
-
- BSONObj doc;
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
-
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[1]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[1]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
-
- // tryPop does not remove documents from collection.
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
+/**
+ * Converts expectedDocs to collection format (with _id field) and compare with peek cache contents.
+ */
+void _assertDocumentsEqualCache(const std::vector<BSONObj>& expectedDocs,
+ std::queue<BSONObj> actualDocsInCache) {
+ for (const auto& doc : expectedDocs) {
+ ASSERT_FALSE(actualDocsInCache.empty());
+ ASSERT_BSONOBJ_EQ(
+ doc, OplogBufferCollection::extractEmbeddedOplogDocument(actualDocsInCache.front()));
+ actualDocsInCache.pop();
+ }
+ ASSERT_TRUE(actualDocsInCache.empty());
}
-TEST_F(OplogBufferCollectionTest, SentinelAtEndIsReturnedAtEnd) {
+TEST_F(OplogBufferCollectionTest, PeekFillsCacheWithDocumentsFromCollection) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(_storageInterface, nss);
-
+ std::size_t peekCacheSize = 3U;
+ OplogBufferCollection oplogBuffer(_storageInterface, nss, _makeOptions(3));
+ ASSERT_EQUALS(peekCacheSize, oplogBuffer.getOptions().peekCacheSize);
oplogBuffer.startup(_opCtx.get());
- const std::vector<BSONObj> oplog = {makeOplogEntry(1), BSONObj()};
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
-
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
-
- BSONObj doc;
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[0]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[0]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
-
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
-
- // tryPop does not remove documents from collection.
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
-}
-
-TEST_F(OplogBufferCollectionTest, MultipleSentinelsAreReturnedInOrder) {
- auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(_storageInterface, nss);
-
- oplogBuffer.startup(_opCtx.get());
- const std::vector<BSONObj> oplog = {
- BSONObj(),
- makeOplogEntry(1),
- BSONObj(),
- BSONObj(),
- makeOplogEntry(2),
- BSONObj(),
+ std::vector<BSONObj> oplog;
+ for (int i = 0; i < 5; ++i) {
+ oplog.push_back(makeOplogEntry(i + 1));
};
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
- ASSERT_EQUALS(oplogBuffer.getCount(), 6UL);
-
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
- BSONObj doc;
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 6UL);
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 5UL);
+ // Before any peek operations, peek cache should be empty.
+ _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
+ // First peek operation should trigger a read of 'peekCacheSize' documents from the collection.
+ BSONObj doc;
ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[1]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 5UL);
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[1]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
+ ASSERT_BSONOBJ_EQ(oplog[0], doc);
+ _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
+ // Repeated peek operation should not modify the cache.
ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
+ ASSERT_BSONOBJ_EQ(oplog[0], doc);
+ _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
+ // Pop operation should remove the first element in the cache
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
+ ASSERT_BSONOBJ_EQ(oplog[0], doc);
+ _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
+ // Next peek operation should not modify the cache.
ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
+ ASSERT_BSONOBJ_EQ(oplog[1], doc);
+ _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
+ // Pop the rest of the items in the cache.
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
-
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[4]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
+ ASSERT_BSONOBJ_EQ(oplog[1], doc);
+ _assertDocumentsEqualCache({oplog[2]}, oplogBuffer.getPeekCache_forTest());
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[4]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
+ ASSERT_BSONOBJ_EQ(oplog[2], doc);
+ _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
+ // Next peek operation should replenish the cache.
+ // Cache size will be less than the configured 'peekCacheSize' because
+ // there will not be enough documents left unread in the collection.
ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
+ ASSERT_BSONOBJ_EQ(oplog[3], doc);
+ _assertDocumentsEqualCache({oplog[3], oplog[4]}, oplogBuffer.getPeekCache_forTest());
+ // Pop the remaining documents from the buffer.
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
-
- // tryPop does not remove documents from collection.
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
-}
-
-TEST_F(OplogBufferCollectionTest, SentinelAtFrontOfExistingCollectionIsReturnedProperly) {
- auto nss = makeNamespace(_agent);
- const std::vector<BSONObj> oplog = {BSONObj(), makeOplogEntry(1)};
- {
- OplogBufferCollection::Options opts;
- opts.dropCollectionAtShutdown = false;
- OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
-
- oplogBuffer.startup(_opCtx.get());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
- oplogBuffer.shutdown(_opCtx.get());
- }
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
-
- OplogBufferCollection::Options opts;
- opts.dropCollectionAtStartup = false;
- OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
- oplogBuffer.startup(_opCtx.get());
- BSONObj doc;
-
- ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest());
- ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest());
- ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest());
+ ASSERT_BSONOBJ_EQ(oplog[3], doc);
+ _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
+ // Verify state of cache between pops using peek.
ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_BSONOBJ_EQ(*oplogBuffer.lastObjectPushed(_opCtx.get()), oplog[1]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
-
- // Push and pop another sentinel to make sure that they're counted correctly.
- pushSentinel(oplogBuffer);
- ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
+ ASSERT_BSONOBJ_EQ(oplog[4], doc);
+ _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest());
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
+ ASSERT_BSONOBJ_EQ(oplog[4], doc);
+ _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[1]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
+ // Nothing left in the collection.
+ ASSERT_FALSE(oplogBuffer.peek(_opCtx.get(), &doc));
+ _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
+ ASSERT_FALSE(oplogBuffer.tryPop(_opCtx.get(), &doc));
+ _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
}
-TEST_F(OplogBufferCollectionTest, TwoSentinelsAtFrontOfExistingCollectionAreReturnedProperly) {
+TEST_F(OplogBufferCollectionTest, FindByTimestampFindsDocuments) {
auto nss = makeNamespace(_agent);
- const std::vector<BSONObj> oplog = {BSONObj(), BSONObj(), makeOplogEntry(1), BSONObj()};
- {
- OplogBufferCollection::Options opts;
- opts.dropCollectionAtShutdown = false;
- OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
-
- oplogBuffer.startup(_opCtx.get());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end());
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
- oplogBuffer.shutdown(_opCtx.get());
- }
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
-
- OplogBufferCollection::Options opts;
- opts.dropCollectionAtStartup = false;
- OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_opCtx.get());
- BSONObj doc;
- ASSERT_EQUALS(1UL, oplogBuffer.getSentinelCount_forTest());
- ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest());
- ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest());
-
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
-
- // Push and pop another sentinel to make sure that they're counted correctly.
- pushSentinel(oplogBuffer);
- ASSERT_EQUALS(oplogBuffer.getCount(), 5UL);
- ASSERT_EQUALS(2UL, oplogBuffer.getSentinelCount_forTest());
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
+ std::vector<BSONObj> oplog;
+ for (int i = 0; i < 5; ++i) {
+ oplog.push_back(makeOplogEntry(i + 1));
+ };
+ oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
+ _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[2]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
+ // We should find the documents before popping them.
+ for (int i = 0; i < 5; ++i) {
+ Timestamp ts(i + 1, i + 1);
+ auto docWithStatus = oplogBuffer.findByTimestamp(_opCtx.get(), ts);
+ ASSERT_OK(docWithStatus.getStatus());
+ ASSERT_BSONOBJ_EQ(oplog[i], docWithStatus.getValue());
+ }
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
+ // We can still pop the documents
+ for (int i = 0; i < 5; ++i) {
+ BSONObj doc;
+ ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[i], doc);
+ }
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
+ // We should find the documents after popping them.
+ for (int i = 0; i < 5; ++i) {
+ Timestamp ts(i + 1, i + 1);
+ auto docWithStatus = oplogBuffer.findByTimestamp(_opCtx.get(), ts);
+ ASSERT_OK(docWithStatus.getStatus());
+ ASSERT_BSONOBJ_EQ(oplog[i], docWithStatus.getValue());
+ }
}
-TEST_F(OplogBufferCollectionTest, SentinelAtBackOfExistingCollectionIsReturnedProperly) {
+TEST_F(OplogBufferCollectionTest, FindByTimestampNotFound) {
auto nss = makeNamespace(_agent);
- const std::vector<BSONObj> oplog = {makeOplogEntry(1), BSONObj()};
- {
- OplogBufferCollection::Options opts;
- opts.dropCollectionAtShutdown = false;
- OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
-
- oplogBuffer.startup(_opCtx.get());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
- oplogBuffer.shutdown(_opCtx.get());
- }
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
-
- OplogBufferCollection::Options opts;
- opts.dropCollectionAtStartup = false;
- OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_opCtx.get());
- BSONObj doc;
- ASSERT_EQUALS(1UL, oplogBuffer.getSentinelCount_forTest());
- ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest());
- ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest());
-
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[0]);
- ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
-
- // Push and pop another sentinel to make sure that they're counted correctly.
- pushSentinel(oplogBuffer);
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
- ASSERT_EQUALS(2UL, oplogBuffer.getSentinelCount_forTest());
+ std::vector<BSONObj> oplog;
+ oplog.push_back(makeOplogEntry(2));
+ oplog.push_back(makeOplogEntry(3));
+ oplog.push_back(makeOplogEntry(5));
+ oplog.push_back(makeOplogEntry(6));
+ oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
+ _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[0]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
+ // Timestamp 1 not found.
+ Timestamp ts1(1, 1);
+ auto docWithStatus = oplogBuffer.findByTimestamp(_opCtx.get(), ts1);
+ ASSERT_EQ(ErrorCodes::NoSuchKey, docWithStatus.getStatus().code());
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
+ // Timestamp 4 not found.
+ Timestamp ts4(4, 4);
+ docWithStatus = oplogBuffer.findByTimestamp(_opCtx.get(), ts4);
+ ASSERT_EQ(ErrorCodes::NoSuchKey, docWithStatus.getStatus().code());
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
+ // Timestamp 7 not found.
+ Timestamp ts7(7, 7);
+ docWithStatus = oplogBuffer.findByTimestamp(_opCtx.get(), ts7);
+ ASSERT_EQ(ErrorCodes::NoSuchKey, docWithStatus.getStatus().code());
}
-TEST_F(OplogBufferCollectionTest, TwoSentinelsAtBackOfExistingCollectionAreReturnedProperly) {
+TEST_F(OplogBufferCollectionTest, SeekToTimestamp) {
auto nss = makeNamespace(_agent);
- const std::vector<BSONObj> oplog = {BSONObj(), makeOplogEntry(1), BSONObj(), BSONObj()};
- {
- OplogBufferCollection::Options opts;
- opts.dropCollectionAtShutdown = false;
- OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
-
- oplogBuffer.startup(_opCtx.get());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end());
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
- oplogBuffer.shutdown(_opCtx.get());
- }
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
-
- OplogBufferCollection::Options opts;
- opts.dropCollectionAtStartup = false;
- OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss, _makeOptions(3));
oplogBuffer.startup(_opCtx.get());
- BSONObj doc;
-
- ASSERT_EQUALS(2UL, oplogBuffer.getSentinelCount_forTest());
- ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest());
- ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest());
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
-
- // Push and pop another sentinel to make sure that they're counted correctly.
- pushSentinel(oplogBuffer);
- ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 5UL);
- ASSERT_EQUALS(3UL, oplogBuffer.getSentinelCount_forTest());
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
+ std::vector<BSONObj> oplog;
+ oplog.push_back(makeOplogEntry(2));
+ oplog.push_back(makeOplogEntry(3));
+ oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
+ oplog.push_back(makeOplogEntry(5));
+ oplog.push_back(makeOplogEntry(6));
+ oplogBuffer.push(_opCtx.get(), oplog.cbegin() + 2, oplog.cend());
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[1]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
+ BSONObj doc;
+ // Seek to last entry..
+ ASSERT_OK(oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(6, 6)));
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
+ ASSERT_BSONOBJ_EQ(oplog[3], doc);
+ ASSERT_FALSE(oplogBuffer.tryPop(_opCtx.get(), &doc));
+ // Seek to middle and read entire buffer.
+ ASSERT_OK(oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(3, 3)));
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
-
+ ASSERT_BSONOBJ_EQ(oplog[1], doc);
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
-}
-
-TEST_F(OplogBufferCollectionTest, SentinelInMiddleOfExistingCollectionIsReturnedProperly) {
- auto nss = makeNamespace(_agent);
- const std::vector<BSONObj> oplog = {makeOplogEntry(1), BSONObj(), makeOplogEntry(2)};
- {
- OplogBufferCollection::Options opts;
- opts.dropCollectionAtShutdown = false;
- OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
-
- oplogBuffer.startup(_opCtx.get());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end());
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
- oplogBuffer.shutdown(_opCtx.get());
- }
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
-
- OplogBufferCollection::Options opts;
- opts.dropCollectionAtStartup = false;
- OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
- oplogBuffer.startup(_opCtx.get());
- BSONObj doc;
-
- ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest());
- ASSERT_EQUALS(Timestamp(2, 2), oplogBuffer.getLastPushedTimestamp_forTest());
- ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest());
-
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[0]);
- ASSERT_BSONOBJ_EQ(*oplogBuffer.lastObjectPushed(_opCtx.get()), oplog[2]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
-
- // Push and pop another sentinel to make sure that they're counted correctly.
- pushSentinel(oplogBuffer);
- ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
- ASSERT_EQUALS(1UL, oplogBuffer.getSentinelCount_forTest());
-
+ ASSERT_BSONOBJ_EQ(oplog[2], doc);
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[0]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
+ ASSERT_BSONOBJ_EQ(oplog[3], doc);
+ ASSERT_FALSE(oplogBuffer.tryPop(_opCtx.get(), &doc));
+ // Seek to beginning.
+ ASSERT_OK(oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(2, 2)));
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
-
+ ASSERT_BSONOBJ_EQ(oplog[0], doc);
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(doc, oplog[2]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
+ ASSERT_BSONOBJ_EQ(oplog[1], doc);
+ // With the readahead cache containing documents, seek forward.
+ _assertDocumentsEqualCache({oplog[2], oplog[3]}, oplogBuffer.getPeekCache_forTest());
+ ASSERT_OK(oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(5, 5)));
ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
+ ASSERT_BSONOBJ_EQ(oplog[2], doc);
}
-
-TEST_F(OplogBufferCollectionTest, WaitForDataBlocksAndFindsSentinel) {
+TEST_F(OplogBufferCollectionTest, SeekToTimestampFails) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(_storageInterface, nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss, _makeOptions(3));
oplogBuffer.startup(_opCtx.get());
- unittest::Barrier barrier(2U);
- BSONObj doc;
- bool success = false;
- std::size_t count = 0;
+ std::vector<BSONObj> oplog;
+ oplog.push_back(makeOplogEntry(2));
+ oplog.push_back(makeOplogEntry(3));
+ oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
+ oplog.push_back(makeOplogEntry(5));
+ oplog.push_back(makeOplogEntry(6));
+ oplogBuffer.push(_opCtx.get(), oplog.cbegin() + 2, oplog.cend());
- stdx::thread peekingThread([&]() {
- Client::initThread("peekingThread");
- barrier.countDownAndWait();
- success = oplogBuffer.waitForData(Seconds(30));
- count = oplogBuffer.getCount();
- });
+ BSONObj doc;
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- barrier.countDownAndWait();
- pushSentinel(oplogBuffer);
- peekingThread.join();
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
- ASSERT_TRUE(success);
+ // Seek past end.
+ auto status = oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(7, 7));
+ ASSERT_EQ(ErrorCodes::NoSuchKey, status.code());
+ // Failed seeks do not affect the oplog buffer.
ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(count, 1UL);
-}
-
-TEST_F(OplogBufferCollectionTest, TwoWaitForDataInvocationsBlockAndFindSameSentinel) {
- auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(_storageInterface, nss);
- oplogBuffer.startup(_opCtx.get());
-
- unittest::Barrier barrier(3U);
- bool success1 = false;
- std::size_t count1 = 0;
-
- bool success2 = false;
- std::size_t count2 = 0;
-
- stdx::thread peekingThread1([&]() {
- Client::initThread("peekingThread1");
- barrier.countDownAndWait();
- success1 = oplogBuffer.waitForData(Seconds(30));
- count1 = oplogBuffer.getCount();
- });
-
- stdx::thread peekingThread2([&]() {
- Client::initThread("peekingThread2");
- barrier.countDownAndWait();
- success2 = oplogBuffer.waitForData(Seconds(30));
- count2 = oplogBuffer.getCount();
- });
+ ASSERT_BSONOBJ_EQ(oplog[0], doc);
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- barrier.countDownAndWait();
- pushSentinel(oplogBuffer);
- peekingThread1.join();
- peekingThread2.join();
- ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
- ASSERT_TRUE(success1);
- BSONObj doc;
+ // Seek to non-existent timestamp in middle.
+ status = oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(4, 4));
+ ASSERT_EQ(ErrorCodes::NoSuchKey, status.code());
ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_TRUE(doc.isEmpty());
- ASSERT_EQUALS(count1, 1UL);
- ASSERT_TRUE(success2);
- ASSERT_EQUALS(count2, 1UL);
-}
-
-OplogBufferCollection::Options _makeOptions(std::size_t peekCacheSize) {
- OplogBufferCollection::Options options;
- options.peekCacheSize = peekCacheSize;
- return options;
-}
+ ASSERT_BSONOBJ_EQ(oplog[0], doc);
-/**
- * Converts expectedDocs to collection format (with _id field) and compare with peek cache contents.
- */
-void _assertDocumentsEqualCache(const std::vector<BSONObj>& expectedDocs,
- std::queue<BSONObj> actualDocsInCache) {
- for (const auto& doc : expectedDocs) {
- ASSERT_FALSE(actualDocsInCache.empty());
- ASSERT_BSONOBJ_EQ(
- doc, OplogBufferCollection::extractEmbeddedOplogDocument(actualDocsInCache.front()));
- actualDocsInCache.pop();
- }
- ASSERT_TRUE(actualDocsInCache.empty());
+ // Seek before beginning
+ status = oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(1, 1));
+ ASSERT_EQ(ErrorCodes::NoSuchKey, status.code());
+ ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
+ ASSERT_BSONOBJ_EQ(oplog[0], doc);
}
-TEST_F(OplogBufferCollectionTest, PeekFillsCacheWithDocumentsFromCollection) {
+TEST_F(OplogBufferCollectionTest, SeekToTimestampInexact) {
auto nss = makeNamespace(_agent);
- std::size_t peekCacheSize = 3U;
OplogBufferCollection oplogBuffer(_storageInterface, nss, _makeOptions(3));
- ASSERT_EQUALS(peekCacheSize, oplogBuffer.getOptions().peekCacheSize);
oplogBuffer.startup(_opCtx.get());
std::vector<BSONObj> oplog;
- for (int i = 0; i < 5; ++i) {
- oplog.push_back(makeOplogEntry(i + 1));
- };
+ oplog.push_back(makeOplogEntry(2));
+ oplog.push_back(makeOplogEntry(3));
oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
- _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
-
- // Before any peek operations, peek cache should be empty.
- _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
+ oplog.push_back(makeOplogEntry(5));
+ oplog.push_back(makeOplogEntry(6));
+ oplogBuffer.push(_opCtx.get(), oplog.cbegin() + 2, oplog.cend());
- // First peek operation should trigger a read of 'peekCacheSize' documents from the collection.
BSONObj doc;
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(oplog[0], doc);
- _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
-
- // Repeated peek operation should not modify the cache.
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(oplog[0], doc);
- _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
- // Pop operation should remove the first element in the cache
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(oplog[0], doc);
- _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
+ // Seek past end.
+ ASSERT_OK(oplogBuffer.seekToTimestamp(
+ _opCtx.get(), Timestamp(7, 7), RandomAccessOplogBuffer::SeekStrategy::kInexact));
+ ASSERT_FALSE(oplogBuffer.peek(_opCtx.get(), &doc));
- // Next peek operation should not modify the cache.
+ // Seek to non-existent timestamp in middle.
+ ASSERT_OK(oplogBuffer.seekToTimestamp(
+ _opCtx.get(), Timestamp(4, 4), RandomAccessOplogBuffer::SeekStrategy::kInexact));
ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(oplog[1], doc);
- _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest());
-
- // Pop the rest of the items in the cache.
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(oplog[1], doc);
- _assertDocumentsEqualCache({oplog[2]}, oplogBuffer.getPeekCache_forTest());
-
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
ASSERT_BSONOBJ_EQ(oplog[2], doc);
- _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
- // Next peek operation should replenish the cache.
- // Cache size will be less than the configured 'peekCacheSize' because
- // there will not be enough documents left unread in the collection.
+ // Seek before beginning
+ ASSERT_OK(oplogBuffer.seekToTimestamp(
+ _opCtx.get(), Timestamp(1, 1), RandomAccessOplogBuffer::SeekStrategy::kInexact));
ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(oplog[3], doc);
- _assertDocumentsEqualCache({oplog[3], oplog[4]}, oplogBuffer.getPeekCache_forTest());
-
- // Pop the remaining documents from the buffer.
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(oplog[3], doc);
- _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest());
-
- // Verify state of cache between pops using peek.
- ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(oplog[4], doc);
- _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest());
+ ASSERT_BSONOBJ_EQ(oplog[0], doc);
+}
- ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- ASSERT_BSONOBJ_EQ(oplog[4], doc);
- _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
+TEST_F(OplogBufferCollectionTest, CannotGetSizeAfterSeek) {
+ auto nss = makeNamespace(_agent);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
+ oplogBuffer.startup(_opCtx.get());
- // Nothing left in the collection.
- ASSERT_FALSE(oplogBuffer.peek(_opCtx.get(), &doc));
- _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
+ std::vector<BSONObj> oplog;
+ oplog.push_back(makeOplogEntry(2));
+ oplog.push_back(makeOplogEntry(3));
+ oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
- ASSERT_FALSE(oplogBuffer.tryPop(_opCtx.get(), &doc));
- _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest());
+ // Seek to last entry..
+ ASSERT_OK(oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(3, 3)));
+ ASSERT_THROWS(oplogBuffer.getSize(), AssertionException);
}
} // namespace