diff options
-rw-r--r-- | src/mongo/db/repl/oplog_batcher_test_fixture.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_batcher_test_fixture.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.cpp | 112 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.h | 50 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection_test.cpp | 892 |
6 files changed, 334 insertions, 737 deletions
diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp index fcc08cec359..b8b9a8ea88f 100644 --- a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp @@ -147,7 +147,9 @@ StatusWith<OplogBufferMock::Value> OplogBufferMock::findByTimestamp(OperationCon str::stream() << "No such timestamp in collection: " << ts.toString()}; } -Status OplogBufferMock::seekToTimestamp(OperationContext* opCtx, const Timestamp& ts, bool exact) { +Status OplogBufferMock::seekToTimestamp(OperationContext* opCtx, + const Timestamp& ts, + SeekStrategy exact) { stdx::unique_lock<Latch> lk(_mutex); for (std::size_t i = 0; i < _data.size(); i++) { if (_data[i]["ts"].timestamp() == ts) { @@ -158,7 +160,7 @@ Status OplogBufferMock::seekToTimestamp(OperationContext* opCtx, const Timestamp break; } } - if (!exact) + if (exact != SeekStrategy::kExact) return Status::OK(); return {ErrorCodes::KeyNotFound, str::stream() << "Timestamp not found: " << ts.toString()}; } diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.h b/src/mongo/db/repl/oplog_batcher_test_fixture.h index ec39e84275b..735e0c56195 100644 --- a/src/mongo/db/repl/oplog_batcher_test_fixture.h +++ b/src/mongo/db/repl/oplog_batcher_test_fixture.h @@ -62,7 +62,9 @@ public: bool peek(OperationContext* opCtx, Value* value) final; boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const final; StatusWith<Value> findByTimestamp(OperationContext* opCtx, const Timestamp& ts) final; - Status seekToTimestamp(OperationContext* opCtx, const Timestamp& ts, bool exact = true) final; + Status seekToTimestamp(OperationContext* opCtx, + const Timestamp& ts, + SeekStrategy exact = SeekStrategy::kExact) final; private: mutable Mutex _mutex = MONGO_MAKE_LATCH("OplogBufferMock::_mutex"); diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h index b8dc879286f..96faaf7f37f 100644 --- a/src/mongo/db/repl/oplog_buffer.h +++ b/src/mongo/db/repl/oplog_buffer.h @@ -215,6 +215,11 @@ public: */ class RandomAccessOplogBuffer : public OplogBuffer { public: + enum SeekStrategy { + kInexact = 0, + kExact = 1, + }; + /** * Retrieves an oplog entry by timestamp. Returns ErrorCodes::NoSuchKey if no such entry is * found. Does not change current position of oplog buffer. @@ -228,7 +233,7 @@ public: */ virtual Status seekToTimestamp(OperationContext* opCtx, const Timestamp& ts, - bool exact = true) = 0; + SeekStrategy exact = SeekStrategy::kExact) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp index b01659be904..e6b9a97f643 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection.cpp @@ -40,6 +40,7 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/db/catalog/collection_options.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/util/assert_util.h" @@ -52,7 +53,6 @@ const StringData kDefaultOplogCollectionNamespace = "local.temp_oplog_buffer"_sd const StringData kOplogEntryFieldName = "entry"_sd; const StringData kIdFieldName = "_id"_sd; const StringData kTimestampFieldName = "ts"_sd; -const StringData kSentinelFieldName = "s"_sd; const StringData kIdIdxName = "_id_"_sd; } // namespace @@ -61,21 +61,12 @@ NamespaceString OplogBufferCollection::getDefaultNamespace() { return NamespaceString(kDefaultOplogCollectionNamespace); } -std::tuple<BSONObj, Timestamp, std::size_t> OplogBufferCollection::addIdToDocument( - const BSONObj& orig, const Timestamp& lastTimestamp, std::size_t sentinelCount) { - if (orig.isEmpty()) { - return std::make_tuple( - BSON(kIdFieldName << BSON(kTimestampFieldName - << lastTimestamp << kSentinelFieldName - << static_cast<long long>(sentinelCount + 1))), - lastTimestamp, - sentinelCount + 1); - } +std::tuple<BSONObj, Timestamp> OplogBufferCollection::addIdToDocument(const BSONObj& orig) { + invariant(!orig.isEmpty()); const auto ts = orig[kTimestampFieldName].timestamp(); invariant(!ts.isNull()); - auto doc = BSON(kIdFieldName << BSON(kTimestampFieldName << ts << kSentinelFieldName << 0) - << kOplogEntryFieldName << orig); - return std::make_tuple(doc, ts, 0); + auto doc = BSON(_keyForTimestamp(ts).firstElement() << kOplogEntryFieldName << orig); + return std::make_tuple(doc, ts); } BSONObj OplogBufferCollection::extractEmbeddedOplogDocument(const BSONObj& orig) { @@ -111,6 +102,7 @@ void OplogBufferCollection::startup(OperationContext* opCtx) { auto sizeResult = _storageInterface->getCollectionSize(opCtx, _nss); fassert(40403, sizeResult); _size = sizeResult.getValue(); + _sizeIsValid = true; auto countResult = _storageInterface->getCollectionCount(opCtx, _nss); fassert(40404, countResult); @@ -124,7 +116,6 @@ void OplogBufferCollection::startup(OperationContext* opCtx) { _peekCache = std::queue<BSONObj>(); if (_count == 0) { - _sentinelCount = 0; _lastPushedTimestamp = {}; return; } @@ -135,13 +126,8 @@ void OplogBufferCollection::startup(OperationContext* opCtx) { fassert( 40405, bsonExtractTimestampField(lastPushedId, kTimestampFieldName, &_lastPushedTimestamp)); - long long countAtTimestamp = 0; - fassert(40406, - bsonExtractIntegerField(lastPushedId, kSentinelFieldName, &countAtTimestamp)); - _sentinelCount = countAtTimestamp--; } else { _lastPushedTimestamp = {}; - _sentinelCount = 0; } } @@ -151,7 +137,6 @@ void OplogBufferCollection::shutdown(OperationContext* opCtx) { _dropCollection(opCtx); _size = 0; _count = 0; - _sentinelCount = 0; _lastPushedTimestamp = {}; _lastPoppedKey = {}; _peekCache = std::queue<BSONObj>(); @@ -168,12 +153,12 @@ void OplogBufferCollection::push(OperationContext* opCtx, std::vector<InsertStatement> docsToInsert(numDocs); stdx::lock_guard<Latch> lk(_mutex); auto ts = _lastPushedTimestamp; - auto sentinelCount = _sentinelCount; - std::transform(begin, end, docsToInsert.begin(), [&sentinelCount, &ts](const Value& value) { + std::transform(begin, end, docsToInsert.begin(), [&ts](const Value& value) { BSONObj doc; auto previousTimestamp = ts; - std::tie(doc, ts, sentinelCount) = addIdToDocument(value, ts, sentinelCount); - invariant(value.isEmpty() ? ts == previousTimestamp : ts > previousTimestamp); + std::tie(doc, ts) = addIdToDocument(value); + invariant(!value.isEmpty()); + invariant(ts > previousTimestamp); return InsertStatement(doc); }); @@ -181,11 +166,12 @@ void OplogBufferCollection::push(OperationContext* opCtx, fassert(40161, status); _lastPushedTimestamp = ts; - _sentinelCount = sentinelCount; _count += numDocs; - _size += std::accumulate(begin, end, 0U, [](const size_t& docSize, const Value& value) { - return docSize + size_t(value.objsize()); - }); + if (_sizeIsValid) { + _size += std::accumulate(begin, end, 0U, [](const size_t& docSize, const Value& value) { + return docSize + size_t(value.objsize()); + }); + } _cvNoLongerEmpty.notify_all(); } @@ -202,6 +188,7 @@ std::size_t OplogBufferCollection::getMaxSize() const { std::size_t OplogBufferCollection::getSize() const { stdx::lock_guard<Latch> lk(_mutex); + uassert(4940100, "getSize() called on OplogBufferCollection after seek", _sizeIsValid); return _size; } @@ -215,8 +202,8 @@ void OplogBufferCollection::clear(OperationContext* opCtx) { _dropCollection(opCtx); _createCollection(opCtx); _size = 0; + _sizeIsValid = true; _count = 0; - _sentinelCount = 0; _lastPushedTimestamp = {}; _lastPoppedKey = {}; _peekCache = std::queue<BSONObj>(); @@ -260,6 +247,60 @@ boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed( return boost::none; } +/* static */ +BSONObj OplogBufferCollection::_keyForTimestamp(const Timestamp& ts) { + return BSON(kIdFieldName << BSON(kTimestampFieldName << ts)); +} + +StatusWith<BSONObj> OplogBufferCollection::_getDocumentWithTimestamp(OperationContext* opCtx, + const Timestamp& ts) { + return _storageInterface->findById(opCtx, _nss, _keyForTimestamp(ts).firstElement()); +} + +StatusWith<OplogBuffer::Value> OplogBufferCollection::findByTimestamp(OperationContext* opCtx, + const Timestamp& ts) { + auto docWithStatus = _getDocumentWithTimestamp(opCtx, ts); + if (!docWithStatus.isOK()) { + return docWithStatus.getStatus(); + } + return extractEmbeddedOplogDocument(docWithStatus.getValue()).getOwned(); +} + +Status OplogBufferCollection::seekToTimestamp(OperationContext* opCtx, + const Timestamp& ts, + SeekStrategy exact) { + stdx::lock_guard<Latch> lk(_mutex); + BSONObj docWithTimestamp; + auto docWithStatus = _getDocumentWithTimestamp(opCtx, ts); + if (docWithStatus.isOK()) { + docWithTimestamp = std::move(docWithStatus.getValue()); + } else if (exact == SeekStrategy::kExact) { + return docWithStatus.getStatus(); + } + _peekCache = std::queue<BSONObj>(); + auto key = _keyForTimestamp(ts); + if (docWithTimestamp.isEmpty()) { + // The document with the requested timestamp was not found. Set _lastPoppedKey to + // the key for that document, so next time we pop we will read the next document after + // the requested timestamp. + _lastPoppedKey = key; + } else { + // The document with the requested timestamp was found. _lastPoppedKey will be set to that + // document's timestamp once the document is popped from the peek cache in _pop_inlock(). + _lastPoppedKey = {}; + _peekCache.push(docWithTimestamp); + } + // Unfortunately StorageInterface and InternalPlanner don't support count-by-index, so we're + // stuck with DBDirectClient. + DBDirectClient client(opCtx); + auto query = BSON(kIdFieldName << BSON("$gte" << key[kIdFieldName])); + _count = client.count(_nss, query); + + // We have no way of accurately determining the size remaining after the seek + _sizeIsValid = false; + return Status::OK(); +} + boost::optional<OplogBuffer::Value> OplogBufferCollection::_lastDocumentPushed_inlock( OperationContext* opCtx) const { if (_count == 0) { @@ -289,9 +330,11 @@ bool OplogBufferCollection::_pop_inlock(OperationContext* opCtx, Value* value) { _peekCache.pop(); invariant(_count > 0); - invariant(_size >= std::size_t(value->objsize())); + if (_sizeIsValid) { + invariant(_size >= std::size_t(value->objsize())); + _size -= value->objsize(); + } _count--; - _size -= value->objsize(); return true; } @@ -354,11 +397,6 @@ void OplogBufferCollection::_dropCollection(OperationContext* opCtx) { fassert(40155, _storageInterface->dropCollection(opCtx, _nss)); } -std::size_t OplogBufferCollection::getSentinelCount_forTest() const { - stdx::lock_guard<Latch> lk(_mutex); - return _sentinelCount; -} - Timestamp OplogBufferCollection::getLastPushedTimestamp_forTest() const { stdx::lock_guard<Latch> lk(_mutex); return _lastPushedTimestamp; diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h index 40356c834be..30cee7fc9cf 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.h +++ b/src/mongo/db/repl/oplog_buffer_collection.h @@ -46,7 +46,7 @@ class StorageInterface; * Oplog buffer backed by a temporary collection. This collection is created in startup() and * removed in shutdown(). The documents will be popped and peeked in timestamp order. */ -class OplogBufferCollection : public OplogBuffer { +class OplogBufferCollection : public RandomAccessOplogBuffer { public: /** * Structure used to configure an instance of OplogBufferCollection. @@ -72,30 +72,16 @@ public: /** * Creates and returns a document suitable for storing in the collection together with the - * associated timestamp and sentinel count that determines the position of this document in the - * _id index. + * associated timestamp that determines the position of this document in the _id index. * - * If 'orig' is a valid oplog entry, the '_id' field of the returned BSONObj will be: + * The '_id' field of the returned BSONObj will be: * { * ts: 'ts' field of the provided document, - * s: 0 * } - * The timestamp returned will be equal to as the 'ts' field in the BSONObj. - * Assumes there is a 'ts' field in the original document. * - * If 'orig' is an empty document (ie. we're inserting a sentinel value), the '_id' field will - * be generated based on the timestamp of the last document processed and the total number of - * sentinels with the same timestamp (including the document about to be inserted. For example, - * the first sentinel to be inserted after a valid oplog entry will have the following '_id' - * field: - * { - * ts: 'ts' field of the last inserted valid oplog entry, - * s: 1 - * } - * The sentinel counter will be reset to 0 on inserting the next valid oplog entry. + * The oplog entry itself will be stored in the 'entry' field of the returned BSONObj. */ - static std::tuple<BSONObj, Timestamp, std::size_t> addIdToDocument( - const BSONObj& orig, const Timestamp& lastTimestamp, std::size_t sentinelCount); + static std::tuple<BSONObj, Timestamp> addIdToDocument(const BSONObj& orig); explicit OplogBufferCollection(StorageInterface* storageInterface, Options options = Options()); OplogBufferCollection(StorageInterface* storageInterface, @@ -128,8 +114,15 @@ public: bool peek(OperationContext* opCtx, Value* value) override; boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override; + // ---- Random access API ---- + StatusWith<Value> findByTimestamp(OperationContext* opCtx, const Timestamp& ts) final; + // Note: once you use seekToTimestamp, calling getSize() is no longer legal. + Status seekToTimestamp(OperationContext* opCtx, + const Timestamp& ts, + SeekStrategy exact = SeekStrategy::kExact) final; + + // ---- Testing API ---- - std::size_t getSentinelCount_forTest() const; Timestamp getLastPushedTimestamp_forTest() const; Timestamp getLastPoppedTimestamp_forTest() const; std::queue<BSONObj> getPeekCache_forTest() const; @@ -166,6 +159,16 @@ private: */ boost::optional<Value> _lastDocumentPushed_inlock(OperationContext* opCtx) const; + /** + * Returns the document with the given timestamp, or ErrorCodes::NoSuchKey if not found. + */ + StatusWith<BSONObj> _getDocumentWithTimestamp(OperationContext* opCtx, const Timestamp& ts); + + /** + * Returns the key for the document with the given timestamp. + */ + static BSONObj _keyForTimestamp(const Timestamp& ts); + // The namespace for the oplog buffer collection. const NamespaceString _nss; @@ -185,9 +188,6 @@ private: // Size of documents in buffer. std::size_t _size = 0; - // Number of sentinel values inserted so far with the same timestamp as '_lastPoppedKey'. - std::size_t _sentinelCount = 0; - Timestamp _lastPushedTimestamp; BSONObj _lastPoppedKey; @@ -195,6 +195,10 @@ private: // Used by _peek_inlock() to hold results of the read ahead query that will be used for pop/peek // results. std::queue<BSONObj> _peekCache; + + // Whether or not the size() method can be called. This is set to false on seek, because + // we do not know how much we skipped when seeking. + bool _sizeIsValid = true; }; } // namespace repl diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp index 81921cee85a..c609ba52849 100644 --- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp @@ -60,7 +60,6 @@ protected: protected: ServiceContext::UniqueOperationContext makeOperationContext() const; - void pushSentinel(OplogBuffer& oplogBuffer); StorageInterface* _storageInterface = nullptr; ServiceContext::UniqueOperationContext _opCtx; @@ -70,11 +69,6 @@ private: void tearDown() override; }; -void OplogBufferCollectionTest::pushSentinel(OplogBuffer& oplogBuffer) { - const std::vector<BSONObj> oplog = {BSONObj()}; - oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); -} - void OplogBufferCollectionTest::setUp() { ServiceContextMongoDTest::setUp(); auto service = getServiceContext(); @@ -182,64 +176,30 @@ TEST_F(OplogBufferCollectionTest, extractEmbeddedOplogDocumentChangesIdToTimesta } void _assertOplogDocAndTimestampEquals( - const BSONObj& oplog, - const std::tuple<BSONObj, Timestamp, std::size_t>& actualDocTimestampSentinelTuple) { + const BSONObj& oplog, const std::tuple<BSONObj, Timestamp>& actualDocTimestampTuple) { auto expectedTimestamp = oplog["ts"].timestamp(); - auto expectedDoc = - BSON("_id" << BSON("ts" << expectedTimestamp << "s" << 0) << "entry" << oplog); - ASSERT_BSONOBJ_EQ(expectedDoc, std::get<0>(actualDocTimestampSentinelTuple)); - ASSERT_EQUALS(expectedTimestamp, std::get<1>(actualDocTimestampSentinelTuple)); - ASSERT_EQUALS(0U, std::get<2>(actualDocTimestampSentinelTuple)); + auto expectedDoc = BSON("_id" << BSON("ts" << expectedTimestamp) << "entry" << oplog); + ASSERT_BSONOBJ_EQ(expectedDoc, std::get<0>(actualDocTimestampTuple)); + ASSERT_EQUALS(expectedTimestamp, std::get<1>(actualDocTimestampTuple)); } -void _assertSentinelDocAndTimestampEquals( - const Timestamp& expectedTimestamp, - std::size_t expectedSentinelCount, - const std::tuple<BSONObj, Timestamp, std::size_t>& actualDocTimestampSentinelTuple) { - auto expectedDoc = - BSON("_id" << BSON("ts" << expectedTimestamp << "s" << int(expectedSentinelCount))); - ASSERT_BSONOBJ_EQ(expectedDoc, std::get<0>(actualDocTimestampSentinelTuple)); - ASSERT_EQUALS(expectedTimestamp, std::get<1>(actualDocTimestampSentinelTuple)); - ASSERT_EQUALS(expectedSentinelCount, std::get<2>(actualDocTimestampSentinelTuple)); +void _assertDocAndTimestampEquals(const Timestamp& expectedTimestamp, + const std::tuple<BSONObj, Timestamp>& actualDocTimestampTuple) { + auto expectedDoc = BSON("_id" << BSON("ts" << expectedTimestamp)); + ASSERT_BSONOBJ_EQ(expectedDoc, std::get<0>(actualDocTimestampTuple)); + ASSERT_EQUALS(expectedTimestamp, std::get<1>(actualDocTimestampTuple)); } TEST_F(OplogBufferCollectionTest, addIdToDocumentChangesTimestampToId) { const BSONObj originalOp = makeOplogEntry(1); _assertOplogDocAndTimestampEquals(originalOp, - OplogBufferCollection::addIdToDocument(originalOp, {}, 0)); -} - -TEST_F(OplogBufferCollectionTest, addIdToDocumentGeneratesIdForSentinelFromLastPushedTimestamp) { - const BSONObj oplog1 = makeOplogEntry(1); - _assertOplogDocAndTimestampEquals(oplog1, - OplogBufferCollection::addIdToDocument(oplog1, {}, 0U)); - auto ts1 = oplog1["ts"].timestamp(); - - _assertSentinelDocAndTimestampEquals( - ts1, 1U, OplogBufferCollection::addIdToDocument({}, ts1, 0U)); - _assertSentinelDocAndTimestampEquals( - ts1, 2U, OplogBufferCollection::addIdToDocument({}, ts1, 1U)); - _assertSentinelDocAndTimestampEquals( - ts1, 3U, OplogBufferCollection::addIdToDocument({}, ts1, 2U)); - - // Processing valid oplog entry resets the sentinel count. - const BSONObj oplog2 = makeOplogEntry(2); - _assertOplogDocAndTimestampEquals(oplog1, - OplogBufferCollection::addIdToDocument(oplog1, ts1, 3U)); - auto ts2 = oplog2["ts"].timestamp(); - - _assertSentinelDocAndTimestampEquals( - ts2, 1U, OplogBufferCollection::addIdToDocument({}, ts2, 0U)); - _assertSentinelDocAndTimestampEquals( - ts2, 2U, OplogBufferCollection::addIdToDocument({}, ts2, 1U)); - _assertSentinelDocAndTimestampEquals( - ts2, 3U, OplogBufferCollection::addIdToDocument({}, ts2, 2U)); + OplogBufferCollection::addIdToDocument(originalOp)); } DEATH_TEST_REGEX_F(OplogBufferCollectionTest, addIdToDocumentWithMissingTimestampFieldTriggersInvariantFailure, R"#(Invariant failure.*!ts.isNull\(\))#") { - OplogBufferCollection::addIdToDocument(BSON("x" << 1), {}, 0); + OplogBufferCollection::addIdToDocument(BSON("x" << 1)); } /** @@ -250,12 +210,10 @@ void _assertDocumentsInCollectionEquals(OperationContext* opCtx, const std::vector<BSONObj>& docs) { std::vector<BSONObj> transformedDocs; Timestamp ts; - std::size_t sentinelCount = 0; for (const auto& doc : docs) { auto previousTimestamp = ts; BSONObj newDoc; - std::tie(newDoc, ts, sentinelCount) = - OplogBufferCollection::addIdToDocument(doc, ts, sentinelCount); + std::tie(newDoc, ts) = OplogBufferCollection::addIdToDocument(doc); transformedDocs.push_back(newDoc); if (doc.isEmpty()) { ASSERT_EQUALS(previousTimestamp, ts); @@ -277,7 +235,7 @@ TEST_F(OplogBufferCollectionTest, StartupWithExistingCollectionInitializesCorrec ASSERT_OK(_storageInterface->insertDocument( _opCtx.get(), nss, - TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0)), + TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0])), Timestamp(0)}, OpTime::kUninitializedTerm)); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); @@ -288,7 +246,6 @@ TEST_F(OplogBufferCollectionTest, StartupWithExistingCollectionInitializesCorrec oplogBuffer.startup(_opCtx.get()); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); ASSERT_NOT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest()); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); @@ -319,7 +276,6 @@ TEST_F(OplogBufferCollectionTest, StartupWithEmptyExistingCollectionInitializesC oplogBuffer.startup(_opCtx.get()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest()); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {}); @@ -398,23 +354,6 @@ DEATH_TEST_REGEX_F(OplogBufferCollectionTest, oplogBuffer.startup(_opCtx.get()); } -DEATH_TEST_REGEX_F(OplogBufferCollectionTest, - StartupWithExistingCollectionFailsWhenEntryHasNoSentinelCount, - R"#(Fatal assertion.*40406.*NoSuchKey: Missing expected field \\"s\\")#") { - auto nss = makeNamespace(_agent); - ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions())); - ASSERT_OK(_storageInterface->insertDocument( - _opCtx.get(), - nss, - TimestampedBSONObj{BSON("_id" << BSON("ts" << Timestamp(1, 1))), Timestamp(1)}, - OpTime::kUninitializedTerm)); - - OplogBufferCollection::Options opts; - opts.dropCollectionAtStartup = false; - OplogBufferCollection oplogBuffer(_storageInterface, nss, opts); - oplogBuffer.startup(_opCtx.get()); -} - TEST_F(OplogBufferCollectionTest, PeekWithExistingCollectionReturnsEmptyObjectWhenEntryHasNoEntry) { auto nss = makeNamespace(_agent); ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions())); @@ -442,7 +381,7 @@ TEST_F(OplogBufferCollectionTest, ASSERT_OK(_storageInterface->insertDocument( _opCtx.get(), nss, - TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0)), + TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0])), Timestamp(0)}, OpTime::kUninitializedTerm)); @@ -451,7 +390,6 @@ TEST_F(OplogBufferCollectionTest, oplogBuffer.startup(_opCtx.get()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest()); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {}); @@ -518,7 +456,7 @@ TEST_F(OplogBufferCollectionTest, PeekingFromExistingCollectionReturnsDocument) ASSERT_OK(_storageInterface->insertDocument( _opCtx.get(), nss, - TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(entry1, {}, 0)), + TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(entry1)), Timestamp(0)}, OpTime::kUninitializedTerm)); @@ -667,14 +605,14 @@ TEST_F(OplogBufferCollectionTest, ASSERT_OK(_storageInterface->insertDocument( _opCtx.get(), nss, - TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(firstDoc, {}, 0)), + TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(firstDoc)), Timestamp(0)}, OpTime::kUninitializedTerm)); auto secondDoc = makeOplogEntry(2); ASSERT_OK(_storageInterface->insertDocument( _opCtx.get(), nss, - TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(secondDoc, {}, 0)), + TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(secondDoc)), Timestamp(0)}, OpTime::kUninitializedTerm)); @@ -726,7 +664,6 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) { oplogBuffer.startup(_opCtx.get()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); @@ -734,48 +671,34 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) { oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog[0].objsize())); - ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog}); - pushSentinel(oplogBuffer); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog[0].objsize() + BSONObj().objsize())); - ASSERT_EQUALS(1U, oplogBuffer.getSentinelCount_forTest()); - ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); - ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); - - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], BSONObj()}); - const std::vector<BSONObj> oplog2 = {makeOplogEntry(2)}; oplogBuffer.push(_opCtx.get(), oplog2.cbegin(), oplog2.cend()); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); - ASSERT_EQUALS(oplogBuffer.getSize(), - std::size_t(oplog[0].objsize() + BSONObj().objsize() + oplog2[0].objsize())); - ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest()); + ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); + ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog[0].objsize() + oplog2[0].objsize())); ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], BSONObj(), oplog2[0]}); + _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], oplog2[0]}); BSONObj poppedDoc; ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &poppedDoc)); ASSERT_BSONOBJ_EQ(oplog[0], poppedDoc); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(BSONObj().objsize() + oplog2[0].objsize())); - ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest()); + ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); + ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog2[0].objsize())); ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], BSONObj(), oplog2[0]}); + _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], oplog2[0]}); oplogBuffer.clear(_opCtx.get()); ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); @@ -794,9 +717,8 @@ TEST_F(OplogBufferCollectionTest, WaitForDataReturnsImmediatelyWhenStartedWithEx ASSERT_OK(_storageInterface->insertDocument( _opCtx.get(), nss, - TimestampedBSONObj{ - std::get<0>(OplogBufferCollection::addIdToDocument(makeOplogEntry(1), {}, 0)), - Timestamp(0)}, + TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(makeOplogEntry(1))), + Timestamp(0)}, OpTime::kUninitializedTerm)); OplogBufferCollection::Options opts; @@ -902,45 +824,9 @@ TEST_F(OplogBufferCollectionTest, WaitForDataBlocksAndTimesOutWhenItDoesNotFindD ASSERT_EQUALS(count, 0UL); } -void _testPushSentinelsProperly(OperationContext* opCtx, - const NamespaceString& nss, - StorageInterface* storageInterface, - std::function<void(OperationContext* opCtx, - OplogBufferCollection* oplogBuffer, - const std::vector<BSONObj>& oplog)> pushDocsFn) { - OplogBufferCollection oplogBuffer(storageInterface, nss); - oplogBuffer.startup(opCtx); - const std::vector<BSONObj> oplog = { - BSONObj(), - makeOplogEntry(1), - BSONObj(), - BSONObj(), - makeOplogEntry(2), - BSONObj(), - }; - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - pushDocsFn(opCtx, &oplogBuffer, oplog); - ASSERT_EQUALS(oplogBuffer.getCount(), 6UL); - _assertDocumentsInCollectionEquals(opCtx, nss, oplog); -} - -TEST_F(OplogBufferCollectionTest, PushAllNonBlockingPushesOnSentinelsProperly) { - auto nss = makeNamespace(_agent); - _testPushSentinelsProperly(_opCtx.get(), - nss, - _storageInterface, - [](OperationContext* opCtx, - OplogBufferCollection* oplogBuffer, - const std::vector<BSONObj>& oplog) { - oplogBuffer->push(opCtx, oplog.cbegin(), oplog.cend()); - ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest()); - }); -} - -DEATH_TEST_REGEX_F( - OplogBufferCollectionTest, - PushAllNonBlockingWithOutOfOrderDocumentsTriggersInvariantFailure, - R"#(Invariant failure.*value.isEmpty\(\) \? ts == previousTimestamp : ts > previousTimestamp)#") { +DEATH_TEST_REGEX_F(OplogBufferCollectionTest, + PushAllNonBlockingWithOutOfOrderDocumentsTriggersInvariantFailure, + R"#(Invariant failure.*ts > previousTimestamp)#") { auto nss = makeNamespace(_agent); OplogBufferCollection oplogBuffer(_storageInterface, nss); @@ -953,634 +839,294 @@ DEATH_TEST_REGEX_F( oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end()); } -TEST_F(OplogBufferCollectionTest, SentinelInMiddleIsReturnedInOrder) { - auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(_storageInterface, nss); - - oplogBuffer.startup(_opCtx.get()); - const std::vector<BSONObj> oplog = { - makeOplogEntry(1), - makeOplogEntry(2), - BSONObj(), - makeOplogEntry(3), - }; - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); - - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - - BSONObj doc; - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[0]); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[0]); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[1]); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[1]); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[3]); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[3]); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - - // tryPop does not remove documents from collection. - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); +OplogBufferCollection::Options _makeOptions(std::size_t peekCacheSize) { + OplogBufferCollection::Options options; + options.peekCacheSize = peekCacheSize; + return options; } -TEST_F(OplogBufferCollectionTest, SentinelAtBeginningIsReturnedAtBeginning) { - auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(_storageInterface, nss); - - oplogBuffer.startup(_opCtx.get()); - const std::vector<BSONObj> oplog = {BSONObj(), makeOplogEntry(1)}; - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - - BSONObj doc; - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[1]); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[1]); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - - // tryPop does not remove documents from collection. - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); +/** + * Converts expectedDocs to collection format (with _id field) and compare with peek cache contents. + */ +void _assertDocumentsEqualCache(const std::vector<BSONObj>& expectedDocs, + std::queue<BSONObj> actualDocsInCache) { + for (const auto& doc : expectedDocs) { + ASSERT_FALSE(actualDocsInCache.empty()); + ASSERT_BSONOBJ_EQ( + doc, OplogBufferCollection::extractEmbeddedOplogDocument(actualDocsInCache.front())); + actualDocsInCache.pop(); + } + ASSERT_TRUE(actualDocsInCache.empty()); } -TEST_F(OplogBufferCollectionTest, SentinelAtEndIsReturnedAtEnd) { +TEST_F(OplogBufferCollectionTest, PeekFillsCacheWithDocumentsFromCollection) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(_storageInterface, nss); - + std::size_t peekCacheSize = 3U; + OplogBufferCollection oplogBuffer(_storageInterface, nss, _makeOptions(3)); + ASSERT_EQUALS(peekCacheSize, oplogBuffer.getOptions().peekCacheSize); oplogBuffer.startup(_opCtx.get()); - const std::vector<BSONObj> oplog = {makeOplogEntry(1), BSONObj()}; - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - - BSONObj doc; - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[0]); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[0]); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - - // tryPop does not remove documents from collection. - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); -} - -TEST_F(OplogBufferCollectionTest, MultipleSentinelsAreReturnedInOrder) { - auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(_storageInterface, nss); - - oplogBuffer.startup(_opCtx.get()); - const std::vector<BSONObj> oplog = { - BSONObj(), - makeOplogEntry(1), - BSONObj(), - BSONObj(), - makeOplogEntry(2), - BSONObj(), + std::vector<BSONObj> oplog; + for (int i = 0; i < 5; ++i) { + oplog.push_back(makeOplogEntry(i + 1)); }; - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); - ASSERT_EQUALS(oplogBuffer.getCount(), 6UL); - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - BSONObj doc; - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 6UL); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 5UL); + // Before any peek operations, peek cache should be empty. + _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); + // First peek operation should trigger a read of 'peekCacheSize' documents from the collection. + BSONObj doc; ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[1]); - ASSERT_EQUALS(oplogBuffer.getCount(), 5UL); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[1]); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); + ASSERT_BSONOBJ_EQ(oplog[0], doc); + _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); + // Repeated peek operation should not modify the cache. ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); + ASSERT_BSONOBJ_EQ(oplog[0], doc); + _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); + // Pop operation should remove the first element in the cache ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); + ASSERT_BSONOBJ_EQ(oplog[0], doc); + _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); + // Next peek operation should not modify the cache. ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); + ASSERT_BSONOBJ_EQ(oplog[1], doc); + _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); + // Pop the rest of the items in the cache. ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[4]); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); + ASSERT_BSONOBJ_EQ(oplog[1], doc); + _assertDocumentsEqualCache({oplog[2]}, oplogBuffer.getPeekCache_forTest()); ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[4]); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); + ASSERT_BSONOBJ_EQ(oplog[2], doc); + _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); + // Next peek operation should replenish the cache. + // Cache size will be less than the configured 'peekCacheSize' because + // there will not be enough documents left unread in the collection. ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); + ASSERT_BSONOBJ_EQ(oplog[3], doc); + _assertDocumentsEqualCache({oplog[3], oplog[4]}, oplogBuffer.getPeekCache_forTest()); + // Pop the remaining documents from the buffer. ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - - // tryPop does not remove documents from collection. - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); -} - -TEST_F(OplogBufferCollectionTest, SentinelAtFrontOfExistingCollectionIsReturnedProperly) { - auto nss = makeNamespace(_agent); - const std::vector<BSONObj> oplog = {BSONObj(), makeOplogEntry(1)}; - { - OplogBufferCollection::Options opts; - opts.dropCollectionAtShutdown = false; - OplogBufferCollection oplogBuffer(_storageInterface, nss, opts); - - oplogBuffer.startup(_opCtx.get()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - oplogBuffer.shutdown(_opCtx.get()); - } - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - - OplogBufferCollection::Options opts; - opts.dropCollectionAtStartup = false; - OplogBufferCollection oplogBuffer(_storageInterface, nss, opts); - oplogBuffer.startup(_opCtx.get()); - BSONObj doc; - - ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest()); - ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest()); - ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest()); + ASSERT_BSONOBJ_EQ(oplog[3], doc); + _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); + // Verify state of cache between pops using peek. ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_BSONOBJ_EQ(*oplogBuffer.lastObjectPushed(_opCtx.get()), oplog[1]); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - - // Push and pop another sentinel to make sure that they're counted correctly. - pushSentinel(oplogBuffer); - ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); + ASSERT_BSONOBJ_EQ(oplog[4], doc); + _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest()); ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); + ASSERT_BSONOBJ_EQ(oplog[4], doc); + _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[1]); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); + // Nothing left in the collection. + ASSERT_FALSE(oplogBuffer.peek(_opCtx.get(), &doc)); + _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); + ASSERT_FALSE(oplogBuffer.tryPop(_opCtx.get(), &doc)); + _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); } -TEST_F(OplogBufferCollectionTest, TwoSentinelsAtFrontOfExistingCollectionAreReturnedProperly) { +TEST_F(OplogBufferCollectionTest, FindByTimestampFindsDocuments) { auto nss = makeNamespace(_agent); - const std::vector<BSONObj> oplog = {BSONObj(), BSONObj(), makeOplogEntry(1), BSONObj()}; - { - OplogBufferCollection::Options opts; - opts.dropCollectionAtShutdown = false; - OplogBufferCollection oplogBuffer(_storageInterface, nss, opts); - - oplogBuffer.startup(_opCtx.get()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end()); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); - oplogBuffer.shutdown(_opCtx.get()); - } - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - - OplogBufferCollection::Options opts; - opts.dropCollectionAtStartup = false; - OplogBufferCollection oplogBuffer(_storageInterface, nss, opts); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_opCtx.get()); - BSONObj doc; - ASSERT_EQUALS(1UL, oplogBuffer.getSentinelCount_forTest()); - ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest()); - ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest()); - - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); - - // Push and pop another sentinel to make sure that they're counted correctly. - pushSentinel(oplogBuffer); - ASSERT_EQUALS(oplogBuffer.getCount(), 5UL); - ASSERT_EQUALS(2UL, oplogBuffer.getSentinelCount_forTest()); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); + std::vector<BSONObj> oplog; + for (int i = 0; i < 5; ++i) { + oplog.push_back(makeOplogEntry(i + 1)); + }; + oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); + _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[2]); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); + // We should find the documents before popping them. + for (int i = 0; i < 5; ++i) { + Timestamp ts(i + 1, i + 1); + auto docWithStatus = oplogBuffer.findByTimestamp(_opCtx.get(), ts); + ASSERT_OK(docWithStatus.getStatus()); + ASSERT_BSONOBJ_EQ(oplog[i], docWithStatus.getValue()); + } - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); + // We can still pop the documents + for (int i = 0; i < 5; ++i) { + BSONObj doc; + ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[i], doc); + } - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); + // We should find the documents after popping them. + for (int i = 0; i < 5; ++i) { + Timestamp ts(i + 1, i + 1); + auto docWithStatus = oplogBuffer.findByTimestamp(_opCtx.get(), ts); + ASSERT_OK(docWithStatus.getStatus()); + ASSERT_BSONOBJ_EQ(oplog[i], docWithStatus.getValue()); + } } -TEST_F(OplogBufferCollectionTest, SentinelAtBackOfExistingCollectionIsReturnedProperly) { +TEST_F(OplogBufferCollectionTest, FindByTimestampNotFound) { auto nss = makeNamespace(_agent); - const std::vector<BSONObj> oplog = {makeOplogEntry(1), BSONObj()}; - { - OplogBufferCollection::Options opts; - opts.dropCollectionAtShutdown = false; - OplogBufferCollection oplogBuffer(_storageInterface, nss, opts); - - oplogBuffer.startup(_opCtx.get()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - oplogBuffer.shutdown(_opCtx.get()); - } - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - - OplogBufferCollection::Options opts; - opts.dropCollectionAtStartup = false; - OplogBufferCollection oplogBuffer(_storageInterface, nss, opts); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_opCtx.get()); - BSONObj doc; - ASSERT_EQUALS(1UL, oplogBuffer.getSentinelCount_forTest()); - ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest()); - ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest()); - - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[0]); - ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - - // Push and pop another sentinel to make sure that they're counted correctly. - pushSentinel(oplogBuffer); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); - ASSERT_EQUALS(2UL, oplogBuffer.getSentinelCount_forTest()); + std::vector<BSONObj> oplog; + oplog.push_back(makeOplogEntry(2)); + oplog.push_back(makeOplogEntry(3)); + oplog.push_back(makeOplogEntry(5)); + oplog.push_back(makeOplogEntry(6)); + oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); + _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[0]); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); + // Timestamp 1 not found. + Timestamp ts1(1, 1); + auto docWithStatus = oplogBuffer.findByTimestamp(_opCtx.get(), ts1); + ASSERT_EQ(ErrorCodes::NoSuchKey, docWithStatus.getStatus().code()); - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); + // Timestamp 4 not found. + Timestamp ts4(4, 4); + docWithStatus = oplogBuffer.findByTimestamp(_opCtx.get(), ts4); + ASSERT_EQ(ErrorCodes::NoSuchKey, docWithStatus.getStatus().code()); - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); + // Timestamp 7 not found. + Timestamp ts7(7, 7); + docWithStatus = oplogBuffer.findByTimestamp(_opCtx.get(), ts7); + ASSERT_EQ(ErrorCodes::NoSuchKey, docWithStatus.getStatus().code()); } -TEST_F(OplogBufferCollectionTest, TwoSentinelsAtBackOfExistingCollectionAreReturnedProperly) { +TEST_F(OplogBufferCollectionTest, SeekToTimestamp) { auto nss = makeNamespace(_agent); - const std::vector<BSONObj> oplog = {BSONObj(), makeOplogEntry(1), BSONObj(), BSONObj()}; - { - OplogBufferCollection::Options opts; - opts.dropCollectionAtShutdown = false; - OplogBufferCollection oplogBuffer(_storageInterface, nss, opts); - - oplogBuffer.startup(_opCtx.get()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end()); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); - oplogBuffer.shutdown(_opCtx.get()); - } - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - - OplogBufferCollection::Options opts; - opts.dropCollectionAtStartup = false; - OplogBufferCollection oplogBuffer(_storageInterface, nss, opts); + OplogBufferCollection oplogBuffer(_storageInterface, nss, _makeOptions(3)); oplogBuffer.startup(_opCtx.get()); - BSONObj doc; - - ASSERT_EQUALS(2UL, oplogBuffer.getSentinelCount_forTest()); - ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest()); - ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest()); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); - - // Push and pop another sentinel to make sure that they're counted correctly. - pushSentinel(oplogBuffer); - ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 5UL); - ASSERT_EQUALS(3UL, oplogBuffer.getSentinelCount_forTest()); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); + std::vector<BSONObj> oplog; + oplog.push_back(makeOplogEntry(2)); + oplog.push_back(makeOplogEntry(3)); + oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); + oplog.push_back(makeOplogEntry(5)); + oplog.push_back(makeOplogEntry(6)); + oplogBuffer.push(_opCtx.get(), oplog.cbegin() + 2, oplog.cend()); - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[1]); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); + BSONObj doc; + // Seek to last entry.. + ASSERT_OK(oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(6, 6))); ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); + ASSERT_BSONOBJ_EQ(oplog[3], doc); + ASSERT_FALSE(oplogBuffer.tryPop(_opCtx.get(), &doc)); + // Seek to middle and read entire buffer. + ASSERT_OK(oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(3, 3))); ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - + ASSERT_BSONOBJ_EQ(oplog[1], doc); ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); -} - -TEST_F(OplogBufferCollectionTest, SentinelInMiddleOfExistingCollectionIsReturnedProperly) { - auto nss = makeNamespace(_agent); - const std::vector<BSONObj> oplog = {makeOplogEntry(1), BSONObj(), makeOplogEntry(2)}; - { - OplogBufferCollection::Options opts; - opts.dropCollectionAtShutdown = false; - OplogBufferCollection oplogBuffer(_storageInterface, nss, opts); - - oplogBuffer.startup(_opCtx.get()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.push(_opCtx.get(), oplog.begin(), oplog.end()); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); - oplogBuffer.shutdown(_opCtx.get()); - } - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - - OplogBufferCollection::Options opts; - opts.dropCollectionAtStartup = false; - OplogBufferCollection oplogBuffer(_storageInterface, nss, opts); - oplogBuffer.startup(_opCtx.get()); - BSONObj doc; - - ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest()); - ASSERT_EQUALS(Timestamp(2, 2), oplogBuffer.getLastPushedTimestamp_forTest()); - ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest()); - - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[0]); - ASSERT_BSONOBJ_EQ(*oplogBuffer.lastObjectPushed(_opCtx.get()), oplog[2]); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); - - // Push and pop another sentinel to make sure that they're counted correctly. - pushSentinel(oplogBuffer); - ASSERT_TRUE((*oplogBuffer.lastObjectPushed(_opCtx.get())).isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); - ASSERT_EQUALS(1UL, oplogBuffer.getSentinelCount_forTest()); - + ASSERT_BSONOBJ_EQ(oplog[2], doc); ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[0]); - ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); + ASSERT_BSONOBJ_EQ(oplog[3], doc); + ASSERT_FALSE(oplogBuffer.tryPop(_opCtx.get(), &doc)); + // Seek to beginning. + ASSERT_OK(oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(2, 2))); ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - + ASSERT_BSONOBJ_EQ(oplog[0], doc); ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(doc, oplog[2]); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); + ASSERT_BSONOBJ_EQ(oplog[1], doc); + // With the readahead cache containing documents, seek forward. + _assertDocumentsEqualCache({oplog[2], oplog[3]}, oplogBuffer.getPeekCache_forTest()); + ASSERT_OK(oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(5, 5))); ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); + ASSERT_BSONOBJ_EQ(oplog[2], doc); } - -TEST_F(OplogBufferCollectionTest, WaitForDataBlocksAndFindsSentinel) { +TEST_F(OplogBufferCollectionTest, SeekToTimestampFails) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(_storageInterface, nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss, _makeOptions(3)); oplogBuffer.startup(_opCtx.get()); - unittest::Barrier barrier(2U); - BSONObj doc; - bool success = false; - std::size_t count = 0; + std::vector<BSONObj> oplog; + oplog.push_back(makeOplogEntry(2)); + oplog.push_back(makeOplogEntry(3)); + oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); + oplog.push_back(makeOplogEntry(5)); + oplog.push_back(makeOplogEntry(6)); + oplogBuffer.push(_opCtx.get(), oplog.cbegin() + 2, oplog.cend()); - stdx::thread peekingThread([&]() { - Client::initThread("peekingThread"); - barrier.countDownAndWait(); - success = oplogBuffer.waitForData(Seconds(30)); - count = oplogBuffer.getCount(); - }); + BSONObj doc; - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - barrier.countDownAndWait(); - pushSentinel(oplogBuffer); - peekingThread.join(); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - ASSERT_TRUE(success); + // Seek past end. + auto status = oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(7, 7)); + ASSERT_EQ(ErrorCodes::NoSuchKey, status.code()); + // Failed seeks do not affect the oplog buffer. ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(count, 1UL); -} - -TEST_F(OplogBufferCollectionTest, TwoWaitForDataInvocationsBlockAndFindSameSentinel) { - auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(_storageInterface, nss); - oplogBuffer.startup(_opCtx.get()); - - unittest::Barrier barrier(3U); - bool success1 = false; - std::size_t count1 = 0; - - bool success2 = false; - std::size_t count2 = 0; - - stdx::thread peekingThread1([&]() { - Client::initThread("peekingThread1"); - barrier.countDownAndWait(); - success1 = oplogBuffer.waitForData(Seconds(30)); - count1 = oplogBuffer.getCount(); - }); - - stdx::thread peekingThread2([&]() { - Client::initThread("peekingThread2"); - barrier.countDownAndWait(); - success2 = oplogBuffer.waitForData(Seconds(30)); - count2 = oplogBuffer.getCount(); - }); + ASSERT_BSONOBJ_EQ(oplog[0], doc); - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - barrier.countDownAndWait(); - pushSentinel(oplogBuffer); - peekingThread1.join(); - peekingThread2.join(); - ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - ASSERT_TRUE(success1); - BSONObj doc; + // Seek to non-existent timestamp in middle. + status = oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(4, 4)); + ASSERT_EQ(ErrorCodes::NoSuchKey, status.code()); ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_TRUE(doc.isEmpty()); - ASSERT_EQUALS(count1, 1UL); - ASSERT_TRUE(success2); - ASSERT_EQUALS(count2, 1UL); -} - -OplogBufferCollection::Options _makeOptions(std::size_t peekCacheSize) { - OplogBufferCollection::Options options; - options.peekCacheSize = peekCacheSize; - return options; -} + ASSERT_BSONOBJ_EQ(oplog[0], doc); -/** - * Converts expectedDocs to collection format (with _id field) and compare with peek cache contents. - */ -void _assertDocumentsEqualCache(const std::vector<BSONObj>& expectedDocs, - std::queue<BSONObj> actualDocsInCache) { - for (const auto& doc : expectedDocs) { - ASSERT_FALSE(actualDocsInCache.empty()); - ASSERT_BSONOBJ_EQ( - doc, OplogBufferCollection::extractEmbeddedOplogDocument(actualDocsInCache.front())); - actualDocsInCache.pop(); - } - ASSERT_TRUE(actualDocsInCache.empty()); + // Seek before beginning + status = oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(1, 1)); + ASSERT_EQ(ErrorCodes::NoSuchKey, status.code()); + ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); + ASSERT_BSONOBJ_EQ(oplog[0], doc); } -TEST_F(OplogBufferCollectionTest, PeekFillsCacheWithDocumentsFromCollection) { +TEST_F(OplogBufferCollectionTest, SeekToTimestampInexact) { auto nss = makeNamespace(_agent); - std::size_t peekCacheSize = 3U; OplogBufferCollection oplogBuffer(_storageInterface, nss, _makeOptions(3)); - ASSERT_EQUALS(peekCacheSize, oplogBuffer.getOptions().peekCacheSize); oplogBuffer.startup(_opCtx.get()); std::vector<BSONObj> oplog; - for (int i = 0; i < 5; ++i) { - oplog.push_back(makeOplogEntry(i + 1)); - }; + oplog.push_back(makeOplogEntry(2)); + oplog.push_back(makeOplogEntry(3)); oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); - _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); - - // Before any peek operations, peek cache should be empty. - _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); + oplog.push_back(makeOplogEntry(5)); + oplog.push_back(makeOplogEntry(6)); + oplogBuffer.push(_opCtx.get(), oplog.cbegin() + 2, oplog.cend()); - // First peek operation should trigger a read of 'peekCacheSize' documents from the collection. BSONObj doc; - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(oplog[0], doc); - _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); - - // Repeated peek operation should not modify the cache. - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(oplog[0], doc); - _assertDocumentsEqualCache({oplog[0], oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); - // Pop operation should remove the first element in the cache - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(oplog[0], doc); - _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); + // Seek past end. + ASSERT_OK(oplogBuffer.seekToTimestamp( + _opCtx.get(), Timestamp(7, 7), RandomAccessOplogBuffer::SeekStrategy::kInexact)); + ASSERT_FALSE(oplogBuffer.peek(_opCtx.get(), &doc)); - // Next peek operation should not modify the cache. + // Seek to non-existent timestamp in middle. + ASSERT_OK(oplogBuffer.seekToTimestamp( + _opCtx.get(), Timestamp(4, 4), RandomAccessOplogBuffer::SeekStrategy::kInexact)); ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(oplog[1], doc); - _assertDocumentsEqualCache({oplog[1], oplog[2]}, oplogBuffer.getPeekCache_forTest()); - - // Pop the rest of the items in the cache. - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(oplog[1], doc); - _assertDocumentsEqualCache({oplog[2]}, oplogBuffer.getPeekCache_forTest()); - - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); ASSERT_BSONOBJ_EQ(oplog[2], doc); - _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); - // Next peek operation should replenish the cache. - // Cache size will be less than the configured 'peekCacheSize' because - // there will not be enough documents left unread in the collection. + // Seek before beginning + ASSERT_OK(oplogBuffer.seekToTimestamp( + _opCtx.get(), Timestamp(1, 1), RandomAccessOplogBuffer::SeekStrategy::kInexact)); ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(oplog[3], doc); - _assertDocumentsEqualCache({oplog[3], oplog[4]}, oplogBuffer.getPeekCache_forTest()); - - // Pop the remaining documents from the buffer. - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(oplog[3], doc); - _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest()); - - // Verify state of cache between pops using peek. - ASSERT_TRUE(oplogBuffer.peek(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(oplog[4], doc); - _assertDocumentsEqualCache({oplog[4]}, oplogBuffer.getPeekCache_forTest()); + ASSERT_BSONOBJ_EQ(oplog[0], doc); +} - ASSERT_TRUE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - ASSERT_BSONOBJ_EQ(oplog[4], doc); - _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); +TEST_F(OplogBufferCollectionTest, CannotGetSizeAfterSeek) { + auto nss = makeNamespace(_agent); + OplogBufferCollection oplogBuffer(_storageInterface, nss); + oplogBuffer.startup(_opCtx.get()); - // Nothing left in the collection. - ASSERT_FALSE(oplogBuffer.peek(_opCtx.get(), &doc)); - _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); + std::vector<BSONObj> oplog; + oplog.push_back(makeOplogEntry(2)); + oplog.push_back(makeOplogEntry(3)); + oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); - ASSERT_FALSE(oplogBuffer.tryPop(_opCtx.get(), &doc)); - _assertDocumentsEqualCache({}, oplogBuffer.getPeekCache_forTest()); + // Seek to last entry.. + ASSERT_OK(oplogBuffer.seekToTimestamp(_opCtx.get(), Timestamp(3, 3))); + ASSERT_THROWS(oplogBuffer.getSize(), AssertionException); } } // namespace |